Flink SQL
Apache Flink is powerful streaming tool
![]() |
---|
Fl |
In this article we will discuss step by step how to build architecture and then useful Flink concepts, which may seem unfamiliar may come at start.
Streaming architecture
Streaming application also known as near real time (NRT) are . They are widely used in credit scoring, geoanalytics and mobile.
Current popular solutions are
In this article I share my heuristics for building streaming application. I’ll touch upon:
- ways to aggregate
Assumptions:
- we are provided with enough of kafka
Such assumptions helps us to:
We get triggers of all cats
Onion architecture
I’ll give a quick overview over architecture
Layer | Credo | Principles | Tech realisation |
---|---|---|---|
Feature extraction layer | Extract as much as possible |
Teach realisation:
- feature extraction
- every side streaming source will have it own kafka topic
- that topic should be filter only by your domain
- yet we don’t enrich it yet with our domain info
- feature level
- we merge all semantics group in one
- model scheme should have one datamodel
- enrichment layer
- we enrich
- strategy layer
- only nessary info for side developer
- all strategies are merged to one output topic
Architecture judgement
Arhitecture brings useful decomposition.
Feature level
On feature level we are interested on aggregating in planar format.
Suppose we have nested structure
Aggregation level
There are two ways to aggregate info with common schema or without. I’ll share pros and cons of both approach.
Common schema is beneficial for side developer. As he can. Also common schema allows to use AVRO for effective.
Yet standardization can bring serious obstacles:
- fields with common
First
Flink SQL syntax
They have awesome documentation
Yet some reasonable features requires ad-hoc. I’ll share you with some of my favorite.
Understanding time in Flink
- processed time
- event time
Processed time
proc_time AS PROCTIME()
Event time
As kafka is queue events are read sequentially. If we use event time we require
When events doesn’t follow shedule they are called
For mitigating late events you can use watermark
event_ts timestamp(3),
WATERMARK FOR events_ts AS event_ts - INTERVAL '1' MINUTE
With such definition we’ll wait for late events for one minute.
In brief watermark say operators that events before mark were already processed.
Example
Suppose we send our dataset of three events happend every minute
id | ts |
---|---|
1 | 2023-01-12 10:30:00.000 |
1 | 2023-01-12 10:31:00.000 |
1 | 2023-01-12 10:32:00.000 |
Using proctime
CREATE TABLE source(
id,
ts,
proc_time AS PROCTIME()
)
FROM source_table;
SELECT id
FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(pc), interval '1' minute));
If we’ll process it through using processed_time
we’ll get 3. Yet with use of
Field unpacking
Work with array
Difficult calculations.
Calculations are better perfomed sequentialy
Haversine formula is used for calculating distance between two points defined by latitude and longitude.
This formula is especially in geo-streaming applications.
Exact formula is given by:
\[2 r \arcsin\left(\sqrt{\sin^2(\frac{\phi_1-\phi_2}{2}) + \cos \phi_1 \cos \phi_2 \sin^2 (\frac{\lambda_2-\lambda_1}{2})}\right)\]It’s really difficult to
I advise you to use chaining for facilitation of your work.
power(sin(locationLng1-locationLng2) * 3.14/360.,2) AS dlng,
power(sin(locationLat1-locationLat2)* 3.14/360.,2) AS dlat,
cos(locationLat1 * 3.14/ 180.) AS cos_loc1,
cos(locationLat2 * 3.14/ 180.) AS cos_loc2,
I multiply by 3.14/180.
to convert gradus for radians. Earth radius 🌎 is approximately 6371 km. So final distance will be
2*6371*asin(sqrt(dlat+cos_loc1*cos_loc2*dlng))
Use of Cassandra
Flink provides
Not all tables can b
Deduplication
Deduplication is essential in cases when you have a lot of events but you don’t want to overload.
First I’ll provide my approach and
SELECT id,
FIRST_VALUE(os) AS os
FROM TABLE(
TUBMLE(
TABLE mob_events,
DESCRIPTOR(proc_time),
INTERVAL '20' SECOND
)
)
)
GROUP BY 1
Alternative
SELECT id, FIRST_VALUE(os) AS os
FROM TABLE( TUBMLE( TABLE mob_events, DESCRIPTOR(proc_time), INTERVAL ‘20’ SECOND ) ) ) GROUP BY 1
This script
First of all you need to know about time in Flink There are three types:
- Processing time:
- Event time”:
- Ingestion time: time when event was ingested to operator
CREATE TABLE (
...
proc_time AS PROCTIME(),
row_time AS LOCALTIMESTAMP -- normal timestamp(3)
)
That
Also you need to know more about time windows in Flink
Time transformation
There a lot of formats of representing of time
- UNIX: in milliseconds or seconds from
- TIMESTAMP:
Flink also specefies difference between timestamp.
- TIMESTAMP
- TIMESTAMP(3) - seconds
- TIMESTAMP(6) - miliseconds Probable cavets also can be timezones.
For some cases it’s beneficial to convert them to each other
CREATE TABLE kafka_source(
-- first you declare field
eventUnixTime BIGINT,
-- then you transform it
-- UNIXTIME work with seconds not ms
eventTimeStamp AS TO_TIMESTAMP(FROM_UNIXTIME(eventTimestamp / 1000))
)
You can cast time like 03-00-00
to TIME
via simple:
-- now it's TIME format
CAST(time_zone_tm AS TIME) AS time_zone_tm
Let’s some up with working case of selecting events from 9 to 20 o’clock with correction of timezone time_zone_tm
-- I'll write in where
WHERE TIMESTAMPDIFF(MINUTE)
-- counts minutes from 0:00:00 to current time
TIMESTAMPDIFF(MINUTE, CAST(CURRENT_DATE as timestmap), LocalTimestmap )
EXTRACT(HOUR FROM time_zone_tm) * 60 + 7 * 60 + 30 EXTRACT(MINUTE FROM time_zone_tm) + 19 * 60 +15
Escaping fields
FlinkSQL allows to escape field with backticks like that
CREATE TABLE kafka_source {
id STRING,
-- event STRING, can throw a mistake :(
`event` STRING
}
It can be a leverage in situation with overloaded words like group
or event
JSON unpacking
Sometimes data in json is provided in encrypted format.
{
"time_info": 170123456,
"Data": "asesdasd"
}
First of all you nee
Just use
JSON_VALUE
Flink has documentation, yet it’s syntaxis can look unfamiliar. Actually it’s just a way to navigate through hierarchical structures.
I’ll guide you through.
-
$
- meanself
in Python otthis
in JavaScript sense. It - [
a
] - helps to select field in map - [0] - helps to select element in list. Recall that list is ordered structure.
Unfortunatelly you can’t cast extracted field to your desire type inline like that
JSON_VALUE('DataJson')
Sometimes
Suppose our message has
Yet a lot of
Join types
Joins can mess order of events!
-
regular join Can bring OOM errors
-
interval join Flink automatically removes events
SELECT id
FROM source1 AS t1
JOIN source2 AS t2 ON
s1.id = s2.id AND
s1.ts BETWEEN t2.ts - INTERVAL '5' minute AND
t2.ts + INTERVAL '5' minute
- temporal joins Allows to work with time versioned
SELECT
FROM events AS t1
LEFT JOIN temporal_table FOR SYSTEM_TIME AS OF t1.event_time AS t2 ON
t1.id = t2.id
That join automatically choses maximum time before event.
- temporal join