Kommentar von Constantin Gonzalez und Florian Mair, Amazon Web Services Low-Code-Echtzeit-Anwendungen mit Apache Flink SQL

Autor / Redakteur: Constantin Gonzalez und Florian Mair * / Nico Litzel

Wenn Entwickler Streaming-Anwendungen erstellen, haben sie die Wahl zwischen mehreren Stream-Processing-Frameworks wie Apache Flink, Apache Spark und Apache Kafka Streams. Diese Werkzeuge sind weit verbreitet und werden von Start-ups bis hin zu Großunternehmen eingesetzt, um echtzeitnahe Anwendungen in einer Vielzahl von Branchen zu entwickeln.

Firmen zum Thema

Der Einstieg in Apache Flink ist ohne Java-Kenntnisse nicht einfach. Mit Flink SQL wurde eine alternative Schnittstelle geschaffen, welche einen leichten Einstieg in die Entwicklung von Echtzeitapplikationen mit Apache Flink ermöglicht.
Der Einstieg in Apache Flink ist ohne Java-Kenntnisse nicht einfach. Mit Flink SQL wurde eine alternative Schnittstelle geschaffen, welche einen leichten Einstieg in die Entwicklung von Echtzeitapplikationen mit Apache Flink ermöglicht.
(Bild: © Blue Planet Studio - stock.adobe.com)

Streaming-Anwendungen, die mit diesen Frameworks erstellt werden, sind in der Regel gerichtete azyklische Graphen (DAG), wodurch Aufgaben in mehrere Schritte aufgeteilt werden. Das gilt auch für Apache-Flink -Jobs, die dadurch skalierbar sind und in einer verteilten Umgebung ausgeführt werden können.

Der Einstieg in Apache Flink ist für Entwickler ohne Java-Kenntnisse nicht immer einfach. Mit Flink SQL wurde daher eine alternative Schnittstelle geschaffen, welche einen leichten Einstieg in die Entwicklung von Echtzeitapplikationen mit Apache Flink ermöglicht. Im Verlauf dieses Artikels wird eine produktionsreife Beispielapplikation erstellt, die nur mit SQL-Code entwickelt werden kann.

Anatomie einer Flink-Applikation

Apache Flink ist ein Framework für zustandsabhängige Berechnungen in unbegrenzten Datenströmen. Apache-Flink-Applikationen haben drei essenzielle Komponenten: Datenstrom, Zustand und Zeit.

Apache Flink unterstützt eine Vielzahl von Quellen für unbegrenzte Datenströme, unter anderem Apache Kafka, Amazon Kinesis und RabbitMQ. Wenn die Daten mit Apache Flink weiterverarbeitet werden, unterscheidet man zwischen trivialen und nicht-trivialen Applikationen. Triviale Applikationen führen Transformationen oder Berechnungen basierend auf einzelnen Elementen aus – etwa eine Änderung des Zeitstempelformats. Nicht-triviale Applikationen können dagegen mehrere Elemente für eine Berechnung beziehungsweise Transformation betrachten. Beispielsweise lässt sich die Summe eines Werts über mehrere Elemente hinweg bilden.

Zeit ist neben dem Zustand ein weiterer wichtiger Faktor in Flink-Applikationen. Konkret unterscheidet man zwei verschiedene Arten von Zeiten bzw. Zeitstempeln: Beim Verwenden von Verarbeitungszeit („Processing Time“) wird der Zeitstempel zum Zeitpunkt der Verarbeitung in der Applikation gesetzt. Häufiger wird jedoch die Ereigniszeit („Event Time“) zum Verarbeiten verwendet. Dabei wird der Zeitstempel gesetzt, wenn die Daten an der Quelle (z. B. einem IoT-Sensor) entstehen. Die Ereigniszeit ist unabhängig von externen Faktoren wie Übertragungsgeschwindigkeit zwischen Quelle und Applikation. Vor allem bei Quellen wie Mobilgeräten, IoT-Sensoren etc. können externe Faktoren die Ankunftszeit beeinflussen. Innerhalb der Applikation lässt sich mit sogenanntem Watermarks definieren, wie spät Ereignisse eintreffen dürfen und wie diese behandelt werden sollen.

Entwicklung eines Flink-SQL-Jobs

Flink-SQL-Befehle können sowohl direkt ausgeführt werden als auch innerhalb einer Java-/Scala-Applikation, welche die Flink Library verwendet. Bei der direkten Methode können SQL-Statements mittels FlinkSQL Client oder über einen Flink Interpreter in Apache Zeppelin ausgeführt werden.

Damit die Flink-Applikation Daten von Quellen lesen und Ergebnisse in Ziele speichern kann, müssen sie über passenden Konnektoren verfügen. Diese werden als Parameter beim Start angegeben. Beispiele dazu finden sich in der Flink-SQL-Dokumentation. In diesem Beispiel werden die Amazon Kinesis Data Streams und Elasticsearch-Konnektoren verwendet. Der Elasticsearch-Konnektor ist auch mit OpenSearch kompatibel.

Als Beispieldatensatz wird das „TLC Trip Records“ Dataset verwendet, welches Taxifahrten in New York City, USA beschreibt. Die dabei relevanten Felder sind PULocationID (Abholpunkt), DOLocationID (Fahrtziel), tpep_pickup_datetime (Abholzeit) und tpep_dropoff_datetime (Ankunftszeit).

Das Ziel dieses ersten Flink Jobs ist es, herauszufinden, wie lange eine Fahrt durchschnittlich zum Flughafen EWR dauert. Der Durchschnitt wird jeweils über ein einstündiges Zeitfenster berechnet.

Schema Definieren / Tabellen Erstellen

Um mit den Daten aus dem Datenstrom arbeiten zu können, muss der Flink-SQL-Applikation das Schema bekannt sein. Dazu wird eine Tabelle mittels SQL-Befehlen angelegt, welche den Amazon Kinesis Connector verwendet.

Erstellen einer Tabelle mit SQL-Befehlen. Die Zeile WATERMARK definiert die Logik für verspätete Elemente.
Erstellen einer Tabelle mit SQL-Befehlen. Die Zeile WATERMARK definiert die Logik für verspätete Elemente.
(Bild: AWS / Florian Mair)

Die Zeile WATERMARK definiert die Logik für verspätete Elemente. In diesem Fall wartet die Applikation bis zu fünf Sekunden nach Ende des Zeitfensters, bevor das Ergebnis ausgegeben wird. Dadurch lassen sich auch Elemente inkludieren, die verspätet eintreffen – zum Beispiel aufgrund von höherer Latenz.

Um die Ergebnisse zu visualisieren, wird Elasticsearch verwendet, wofür ebenfalls eine Tabelle erstellt werden muss. Datenausgaben werden in Apache Flink auch als „Sink“ (übersetzt: Senke) bezeichnet. Die Tabelle für die durchschnittliche Fahrzeit enthält die Abholposition, die durchschnittliche Fahrzeit und den Beginn des Zeitfensters.

Tabelle für berechnete Durchschnittszeiten
Tabelle für berechnete Durchschnittszeiten
(Bild: AWS / Florian Mair)

Datenverarbeitung mit Zeitfenstern

Um Berechnungen über ein definiertes Zeitfenster durchzuführen, stellt Flink SQL die „Windows“ Funktionen zur Verfügung. Am häufigsten verwendet werden „Tumble“ und „Sliding / HOP“.

Tumbling Windows (übersetzt: Rollende Zeitfenster) definieren ein Zeitfenster, welches sich nicht mit einem anderen überschneidet, zum Beispiel 11:00 bis 12:00, während Sliding Windows (gleitende Zeitfenster) sich überschneiden können. Letztere verwenden zusätzlich einen Abstandparameter für das Zeitfenster, im Vergleich zu rollenden Zeitfenstern, die nur einen Parameter für die Dauer des Fensters haben. Ein gleitendes Zeitfenster mit fünf Minuten Abstand und einer Stunde Dauer, deckt folgende Zeiträume ab: 11:00 - 12:00, 11:05 - 12:05, 11:10- 12:10.

Für das Beispiel wird ein „Tumbling Window“ verwendet. Der folgende SQL-Befehl errechnet den Durchschnitt von Fahrten zum EWR-Flughafen (LocationID 1) in einem einstündigen Zeitfenster. Um potenzielle Ausreißer zu vermeiden, werden nur Fahrten verwendet, welche maximal vier Stunden dauern.

Berechnung der Durchschnittszeit mittels SQL-Befehl
Berechnung der Durchschnittszeit mittels SQL-Befehl
(Bild: AWS / Florian Mair)

Für die Darstellung der Daten wird Kibana, das Dashboard aus dem Elasticsearch-Paket, verwendet. Mit Kibana kann direkt auf die Daten im Elasticsearch Cluster zugegriffen werden.

Visualisierung der Taxifahrten
Visualisierung der Taxifahrten
(Bild: AWS / Florian Mair)

In dieser Visualisierung wurde exemplarisch der Zeitraum der ersten Maiwoche 2019 ausgewählt. Dabei lässt sich ein täglicher Anstieg der Fahrzeiten zwischen 17:00 und 18:00 deutlich erkennen. Die generierten Insights können vielfältig verwendet werden, um Prognosen zu erstellen oder Endkunden eine bessere Reiseplanung zu ermöglichen.

Apache Flink SQL auf AWS

Flink-Jobs, welche Java, Scala oder Python verwenden, werden meist in IDEs (Integrated Development Environment) wie IntelliJ entwickelt. Eine Alternative dazu sind Apache-Zeppelin-Notebooks, die Apache Flink als Interpreter unterstützen. Seit Mai 2021 bietet AWS Amazon Kinesis Data Analytics Studio (KDA Studio) an, einen Managed Service für Zeppelin Notebooks mit Flink-Interpreter. Durch das entfallende Setup der Entwicklungsumgebung wird der Einstieg in die Flink-Entwicklung wesentlich vereinfacht.

Ein weiterer Vorteil von KDA Studio ist die Möglichkeit, erstellte Jobs direkt zu Amazon S3 zu exportieren und in Kinesis Data Analytics zu betreiben.

Betrieb von Flink-Applikationen auf AWS

Apache-Flink-Applikationen können auf Clustern wie Kubernetes, Yarn oder Apache Mesos ausgeführt werden. Diese Cluster lassen sich sowohl on-premises als auch in der Cloud betreiben. Ein großer Vorteil gegenüber der selbst gehosteten Variante sind Managed Services, welche den Betrieb an den Cloud-Provider abgeben, sodass Kunden nur noch den Code bzw. die Applikation bereitstellen müssen.

Amazon Kinesis Data Analytics (KDA) ist ein Managed Service auf AWS und betreibt eine Flink-Applikation in einer hochverfügbaren Umgebung, ohne dass Server vom Benutzer dafür bereitgestellt werden müssen. Des Weiteren unterstützt KDA Autoscaling und integriert Amazon CloudWatch als Logging-Lösung. Apache Flink verwendet Savepoints, um den Zustand von Applikationen zu sichern. Wenn die Anwendung auf KDA betrieben wird, werden automatische Savepoints (auch Snapshots genannt) erstellt, wenn die Applikation aktualisiert, gestoppt oder skaliert wird. Per API-Aufruf können auch manuelle Snapshots erstellt werden.

Flink-Applikation auf AWS mit Amazon Kinesis Data Analytics
Flink-Applikation auf AWS mit Amazon Kinesis Data Analytics
(Bild: AWS / Florian Mair)

Fazit

Apache Flink ermöglicht Analysen von Datenströmen in Echtzeit unabhängig vom Datendurchsatz. Mit der Unterstützung für SQL-Befehle erleichtert Apache Flink den Einstieg in die Analyse von unbegrenzten Datenströmen deutlich. Mit Amazon Kinesis Data Analytics Studio stellt AWS einen Managed Service bereit, um Flink-Jobs in SQL, Python und Scala interaktiv zu entwickeln. Außerdem bietet AWS eine Integration an, um Applikationen in Notebooks in Amazon Kinesis Data Analytics zu deployen. Dabei kümmert sich Kinesis Data Analytics um den Betrieb, die Skalierung und das Management der darunterlegenden Infrastruktur.

Code

CREATE TABLE ride_events (
tpep_pickup_datetime TIMESTAMP(3),
tpep_dropoff_datetime TIMESTAMP(3),
PULocationID INTEGER,
DOLocationID INTEGER,
WATERMARK FOR tpep_pickup_datetime AS
tpep_pickup_datetime - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'STREAM_NAME',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'SQL'
);
CREATE TABLE trip_duration (
PULocationID INTEGER,
avg_trip_duration BIGINT NOT NULL,
window_start TIMESTAMP(3)
)
WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://ES_VPC_ENDPOINT:80',
'index' = 'trip_duration'
);
INSERT INTO trip_duration(
SELECT PULocationID,
AVG(UNIX_TIMESTAMP(CAST(tpep_dropoff_datetime AS STRING)) - UNIX_TIMESTAMP(CAST(tpep_pickup_datetime AS STRING))) AS avg_ride_duration,
TUMBLE_START(tpep_pickup_datetime, INTERVAL '60' minute) AS window_start
FROM ride_events
WHERE TIMESTAMPDIFF(HOUR,tpep_pickup_datetime, tpep_dropoff_datetime) <= 4
GROUP BY TUMBLE(tpep_pickup_datetime, INTERVAL '60' minute), PULocationID
)

* Constantin Gonzalez ist Principal Solutions Architect und Florian Mair Associate Solutions Architect bei Amazon Web Services

(ID:47716837)