Apache Flink is powerful streaming tool

logo.png
Fl

In this article we will discuss step by step how to build architecture and then useful Flink concepts, which may seem unfamiliar may come at start.

Streaming architecture

Streaming application also known as near real time (NRT) are . They are widely used in credit scoring, geoanalytics and mobile.

Current popular solutions are

In this article I share my heuristics for building streaming application. I’ll touch upon:

  • ways to aggregate

Assumptions:

  • we are provided with enough of kafka

Such assumptions helps us to:

We get triggers of all cats

Onion architecture

I’ll give a quick overview over architecture

Layer Credo Principles Tech realisation
Feature extraction layer Extract as much as possible    

Teach realisation:

  • feature extraction
    • every side streaming source will have it own kafka topic
    • that topic should be filter only by your domain
    • yet we don’t enrich it yet with our domain info
  • feature level
    • we merge all semantics group in one
    • model scheme should have one datamodel
  • enrichment layer
    • we enrich
  • strategy layer
    • only nessary info for side developer
    • all strategies are merged to one output topic

Architecture judgement

Arhitecture brings useful decomposition.

Feature level

On feature level we are interested on aggregating in planar format.

Suppose we have nested structure

Aggregation level

There are two ways to aggregate info with common schema or without. I’ll share pros and cons of both approach.

Common schema is beneficial for side developer. As he can. Also common schema allows to use AVRO for effective.

Yet standardization can bring serious obstacles:

  • fields with common

First

They have awesome documentation

Yet some reasonable features requires ad-hoc. I’ll share you with some of my favorite.

  • processed time
  • event time

Processed time

proc_time AS PROCTIME()

Event time

As kafka is queue events are read sequentially. If we use event time we require

When events doesn’t follow shedule they are called

For mitigating late events you can use watermark

event_ts timestamp(3),
WATERMARK FOR events_ts AS event_ts - INTERVAL '1' MINUTE

With such definition we’ll wait for late events for one minute.

In brief watermark say operators that events before mark were already processed.

Example

Suppose we send our dataset of three events happend every minute

id ts
1 2023-01-12 10:30:00.000
1 2023-01-12 10:31:00.000
1 2023-01-12 10:32:00.000

Using proctime

CREATE TABLE source(
    id,
    ts,
    proc_time AS PROCTIME()
)
FROM source_table;

SELECT id 
FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(pc), interval '1' minute));

If we’ll process it through using processed_time we’ll get 3. Yet with use of

Field unpacking


Work with array

Difficult calculations.

Calculations are better perfomed sequentialy

Haversine formula is used for calculating distance between two points defined by latitude and longitude.

This formula is especially in geo-streaming applications.

Exact formula is given by:

\[2 r \arcsin\left(\sqrt{\sin^2(\frac{\phi_1-\phi_2}{2}) + \cos \phi_1 \cos \phi_2 \sin^2 (\frac{\lambda_2-\lambda_1}{2})}\right)\]

It’s really difficult to

I advise you to use chaining for facilitation of your work.

power(sin(locationLng1-locationLng2) * 3.14/360.,2) AS dlng,
power(sin(locationLat1-locationLat2)* 3.14/360.,2) AS dlat,
cos(locationLat1 * 3.14/ 180.) AS cos_loc1,
cos(locationLat2 * 3.14/ 180.) AS cos_loc2,

I multiply by 3.14/180. to convert gradus for radians. Earth radius 🌎 is approximately 6371 km. So final distance will be

2*6371*asin(sqrt(dlat+cos_loc1*cos_loc2*dlng))

Use of Cassandra

Flink provides

Not all tables can b

Deduplication

Deduplication is essential in cases when you have a lot of events but you don’t want to overload.

First I’ll provide my approach and

SELECT id,
    FIRST_VALUE(os) AS os  
FROM TABLE(
    TUBMLE(
        TABLE mob_events,
        DESCRIPTOR(proc_time),
        INTERVAL '20' SECOND
    )
    )
)
GROUP BY 1

Alternative

SELECT id, FIRST_VALUE(os) AS os
FROM TABLE( TUBMLE( TABLE mob_events, DESCRIPTOR(proc_time), INTERVAL ‘20’ SECOND ) ) ) GROUP BY 1

This script

First of all you need to know about time in Flink There are three types:

  • Processing time:
  • Event time”:
  • Ingestion time: time when event was ingested to operator
CREATE TABLE (
    ...
    proc_time AS PROCTIME(), 
    row_time AS LOCALTIMESTAMP -- normal timestamp(3)
) 

That

Also you need to know more about time windows in Flink

Time transformation

There a lot of formats of representing of time

  • UNIX: in milliseconds or seconds from
  • TIMESTAMP:

Flink also specefies difference between timestamp.

  • TIMESTAMP
  • TIMESTAMP(3) - seconds
  • TIMESTAMP(6) - miliseconds Probable cavets also can be timezones.

For some cases it’s beneficial to convert them to each other

CREATE TABLE kafka_source(
    -- first you declare field
    eventUnixTime BIGINT,
    -- then you transform it
    -- UNIXTIME work with seconds not ms
    eventTimeStamp AS TO_TIMESTAMP(FROM_UNIXTIME(eventTimestamp / 1000))
)

You can cast time like 03-00-00 to TIME via simple:

-- now it's TIME format
CAST(time_zone_tm AS TIME) AS time_zone_tm

Let’s some up with working case of selecting events from 9 to 20 o’clock with correction of timezone time_zone_tm

-- I'll write in where 
WHERE TIMESTAMPDIFF(MINUTE)
-- counts minutes from 0:00:00 to current time  
TIMESTAMPDIFF(MINUTE, CAST(CURRENT_DATE as timestmap), LocalTimestmap )

EXTRACT(HOUR FROM time_zone_tm) * 60 + 7 * 60 + 30 EXTRACT(MINUTE FROM time_zone_tm) + 19 * 60 +15

Escaping fields

FlinkSQL allows to escape field with backticks like that

CREATE TABLE kafka_source {
    id STRING,
    -- event STRING, can throw a mistake :( 
    `event` STRING 
}

It can be a leverage in situation with overloaded words like group or event

JSON unpacking

Sometimes data in json is provided in encrypted format.

{
    "time_info": 170123456,
    "Data": "asesdasd"
}

First of all you nee

Just use

JSON_VALUE

Flink has documentation, yet it’s syntaxis can look unfamiliar. Actually it’s just a way to navigate through hierarchical structures.

I’ll guide you through.

  • $ - mean self in Python ot this in JavaScript sense. It
  • [a] - helps to select field in map
  • [0] - helps to select element in list. Recall that list is ordered structure.

Unfortunatelly you can’t cast extracted field to your desire type inline like that

JSON_VALUE('DataJson')

Sometimes

Suppose our message has

Yet a lot of

Join types

Joins can mess order of events!

  • regular join Can bring OOM errors

  • interval join Flink automatically removes events

SELECT id

FROM source1 AS t1
JOIN source2 AS t2 ON
    s1.id = s2.id AND
    s1.ts BETWEEN t2.ts - INTERVAL '5' minute AND 
        t2.ts + INTERVAL '5' minute
  • temporal joins Allows to work with time versioned
SELECT  
FROM events AS t1
LEFT JOIN temporal_table FOR SYSTEM_TIME AS OF t1.event_time AS t2 ON
    t1.id = t2.id

That join automatically choses maximum time before event.

  • temporal join