Introducing ksqlDB

@Frank Oh · April 02, 2023 · 13 min read

What

ksqlDB (formerly Kafka SQL, KSQL) is a streaming SQL engine for Kafka. It provides an SQL interface that allows developers to easily perform streaming processing in Kafka using familiar SQL syntax. The feature provided by ksqlDB includes:

1.1 Feature

  • Real-time streaming processing made possible through a familiar and lightweight SQL syntax, similar to accessing relational databases
  • ksqlDB is designed to be fault-tolerant and scalable
  • ksqlDB provides management functionality for Kafka Connect within the ksqlDB environment
  • Support various functions for wide-ranging streaming tasks, including data filtering, transformation, aggregation, joining, windowing, and sessionization

    • e.g. SUM, COUNT, UCASE, REPLACE, TRIM
  • Support user-defined functions for KSQL

    • Also support lambda functions

Reference

1.2 ksqlDB Architecture

Diagram showing architecture of ksqlDB
Diagram showing architecture of ksqlDB

ksqlDB client

  • ksqlDB CLI

    • Provide a command interface (CLI) similar to the console provided by MySQL or PostgreSQL.
  • ksqlDB UI

    • Control Center (paid version) is a GUI that allows you to manage and monitor key components including Kafka cluster, broker, topic, connector, and ksqlDB in one place.

REST Interface

  • ksqlDB client uses REST API to access ksqlDB engine

ksqlDB Engine

  • Execute KSQL statements and queries
  • User defines application logic using KSQL statements, and the engine parses and builds the statements and executes them on the KSQL server
  • Each KSQL server runs an instance of the KSQL engine
  • The engine uses RocksDB as its internal state store

    • ksqlDB uses RocksDB to store Materialized Views locally on disk
    • RocksDB is a fast embedded key-value store provided as a library

"RocksDB is an open-source database development project started by Facebook, optimized for large-scale data processing workloads such as server workloads, and optimized for high performance on fast storage, especially flash storage."

Reference

Why

Let's find out why we should use ksqlDB.

1.Three Approaches for Processing Kafka Stream

Three Approaches for Processing Kafka Stream
Three Approaches for Processing Kafka Stream

2. ksqlDB vs Kafka Streams

The Confluent Platform stack, with ksqlDB built on Kafka Streams
The Confluent Platform stack, with ksqlDB built on Kafka Streams

  • ksqlDB

    • Developed based on the Kafka Streams library
    • Enable interactive streaming processing directly through the KSQL CLI
    • Allow developers to use familiar SQL syntax to perform streaming processing quickly
  • Kafka Streams

    • A library that enables streaming processing based on Kafka
    • May be better suited for more complex streaming processing needs
    • Require a higher level of understanding and experience with learning compared to ksqlDB

Reference

Who

ksqlDB has been developed by Confluent since 2017.

History

Kafka

  • Kafka was developed around 2010 at LinkedIn
  • Open-sourced as Apache Kafka in 2011
  • Confluent was founded in 2014 by one of the Kafka co-founders who left LinkedIn

Kafka Connect

  • Included in Kafka 0.9.0.0 release in 2015

Kafka Stream

  • Included in Kafka 0.10.0.0 release in 2016

ksqlDB

  • Release as KSQl developer preview in 2017
  • Renamed to ksqlDB in 2019 from KSQL (Kafka SQL) for rebranding

Reference

Where

Several companies are officially using ksqlDB. In Korea, LINE used ksqlDB to improve its AB Test Report system.

  • Naver LINE

    • AB Test Report
    • The existing system was implemented the AB test report system by creating the join window from fetched the event log which stored in Redis
    • By using ksqlDB, the original architecture got much simplier by just writing ksql (no redis server)
  • ticketmaster - Ticket sales company
  • Nuuly - Clothing rental and trial service
  • ACERTUS - Car pickup/delivery service
  • optimove - A private company that develops and sells CRM marketing software (with AI) as a service
  • Bosch - Leading company in the field of automotive and industrial technology, consumer goods, and building technology
  • Voicebridge: voice-based systems for rural populations in developing countries that lack internet access

Company using ksqlDB
Company using ksqlDB

Company using ksqlDB
Company using ksqlDB

Company using ksqlDB
Company using ksqlDB

Reference

How

1. Installation on local-machine

To run multiple Kafka components in a local environment, we are going to use docker-compose. First, download the docker-compose.yml file.

$ curl --output docker-compose.yml \
  https://raw.githubusercontent.com/kenshin579/tutorials-go/master/kafka/confluent/docker-compose.yml

Start the Confluent Platform server.

$ docker-compose up -d

Start KSQL Interactive CLI by using ksqldb-cli docker

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.2.2, Server v7.2.2 located at http://ksqldb-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

2. KSQL Usage

Let's learn more about ksqlDB through some examples.

2.1 Collections : Stream vs Table

2.1.1 Stream

  • A new stream can be created from a stream, table, or Kafka topic
  • Stream is an event collection that streams persistently and unlimitedly

    • Data is managed by partition
  • Once a row is created, it cannot be changed (immutable, append-only)

    • Each row is stored in a specific partition
    • Only INSERT is possible
 Creates a stream in an existing kafka topic, or it will be created the topic automatically if no topic exists
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', value_format='json', partitions=1);
  
# You can create a new stream from another stream
ksql> CREATE STREAM myCurrentLocation AS
SELECT profileId,
       latitude AS la,
       longitude AS lo
FROM riderlocations
WHERE profileId = 'c2309eec'
    EMIT CHANGES;
  • kafka_topic - Create a stream from an existing Kafka topic or automatically create a topic if there is none
  • value_format - Specify the encoding of the messages stored in the kafka topic
  • partitions - Specify the number of partitions in the kafka topic

2.1.2 Table (Materialized view)

  • Table data is a mutable event collection and the data has the current latest state
  • Rows can be changed and must have a primary key
  • INSERT, UPDATE, DELETE are possible
  • A new table can be created from a stream, table, or Kakfa Topic

Streams vs Table
Streams vs Table

# Create a materialized view that tracks the latest location of the riders
ksql> CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;
  
# Create a table that checks how far away a rider is from a given location
ksql> CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
       COLLECT_LIST(profileId) AS riders,
       COUNT(*) AS count
  FROM currentLocation
  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
  • LATEST_BY_OFFSET

    • It is an aggregate function that returns the latest value of a specific column
  • EMIT CHANGES

    • Adding the EMIT CHANGES clause allows receiving all changes continuously
  • COLLECT_LIST(col1)

    • Return an array containing all values of col1
Reference

2.2 Query

2.2.1 Push Query (Continous Query)

  • Push queries subscribes to real-time changing results
  • The EMIT clause continuously executes the query persistently
  • To terminate a push query started from the CLI, press ctrl+C
# Data in the Stream is continuously retrieved and queries are continuously executed
ksql> SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;

# Data in the table is continuously retrieved and queries are continuously executed
ksql> SELECT * FROM currentLocation EMIT CHANGES;

Let's insert actual data into ksqlDB. The following inserts into the riderLocations stream, which is equivalent to publishing to a kafka topic associated with riderLocations.

ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

You can also publish with Kafka CLI commands.

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic locations
> {"profileId": "c2309ee5", "latitude": 42.7877, "longitude": -122.4205}

2.2.2 Pull Query (Classic Query)

  • Pull Query retrieves the current state of a table
# Search for all riders currently within 10 miles of Mountain View
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
Reference

2. Control Center

So far, we have only used ksqlDB in the CLI, but let's use the Control Center web UI. Go to http://localhost:9021.

2.1 Datagen Source Connector

Datagen Source Connector is a connector that generates mock data for development and testing. Depending on the configured values, it periodically generates data, allowing for continuous simulation of receiving data.

2.1.1 Generate Mock Data

Let's generate mock data for pageviews and users.

After clicking Connect > Add Connector > DatagenConnector, enter the following information.

# generate pageviews mock data
{
  "name": "datagen-pageviews",
  "config": {
    "name": "datagen-pageviews",
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "pageviews",
    "max.interval": "100",
    "quickstart": "pageviews"
  }
}

# generate users mock data
{
  "name": "datagen-users",
  "config": {
    "name": "datagen-users",
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "users",
    "max.interval": "1000",
    "quickstart": "users"
  }
}

Click Topics > pageviews > Messages Tab. We can see the data is being published in real-time to the pageviews topic.

PageViews
PageViews

Reference

2.2 Joins Collections

The Join in ksqlDB and Join in traditional relational databases are similar in that they combine two or more sets of data. You can use the Join syntax to merge streams events that occur in real-time.

# 1.create stream
ksql> CREATE STREAM pageviews_stream
  WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

# 2. create table
ksql> CREATE TABLE users_table (id VARCHAR PRIMARY KEY)
    WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');

Joining the pageviews stream and the users table creates the user_pageviews.

# user_pageviews stream creates a USER_PAGEVIEWS topic
ksql> CREATE STREAM user_pageviews
  AS SELECT users_table.id AS userid, pageid, regionid, gender
     FROM pageviews_stream
              LEFT JOIN users_table ON pageviews_stream.userid = users_table.id
         EMIT CHANGES;

# Create separate pageviews with regionId ending in 8, 9 from the user_pageviews stream
ksql> CREATE STREAM pageviews_region_like_89
  WITH (KAFKA_TOPIC='pageviews_filtered_r8_r9', VALUE_FORMAT='AVRO')
    AS SELECT * FROM user_pageviews
       WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
           EMIT CHANGES;

2.4 Windows

2.4.1 Time

Diagram showing records in a ksqlDB stream
Diagram showing records in a ksqlDB stream

  • Each record contains a timestamp
  • The timestamp is set by the producer application or the Kafka broker
  • The timestamp is used for time-dependent operations such as aggregation and join

2.4.2 Window

Diagram showing the relationship between records and time in a ksqlDB stream
Diagram showing the relationship between records and time in a ksqlDB stream

  • ksqlDB provides Window queries that use streams to aggregate events within a specific period (Window) and send them as output
  • The period is represented by a Duration, which can be expressed as WINDOWSTART/WINDOWEND
  • WINDOWSTART/WINDOWEND can be declared in the SELECT clause when creating a Window query

2.4.1 Window Types

There are three ways to define Time Windows in KSQL:

  • Tumbling

    • Time-based
    • Fixed-duration, non-overlapping, gap-less windows
    • ex. WINDOW TUMBLING (SIZE 1 HOUR)
  • Hopping

    • Time-based
    • Fixed-duration, overlapping windows
    • ex. WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
  • Session

    • Session-based
    • Dynamically-sized, non-overlapping, data-driven windows
    • Session Windows are created based on the inactivity gap, separating active periods.
    • Session Windows are particularly useful for analyzing user behavior (e.g., number of visitors).
    • ex. WINDOW SESSION (60 SECONDS)

Diagram showing three types of time windows in ksqlDB streams: tumbling, hopping, and session
Diagram showing three types of time windows in ksqlDB streams: tumbling, hopping, and session

# Create a Table named pageviews_per_region_89 that counts the number of pageviews for regions 8 and 9 in a tumbling window of 30 seconds
ksql> CREATE TABLE pageviews_per_region_89 WITH (KEY_FORMAT='JSON')
  AS SELECT userid, gender, regionid, COUNT(*) AS numusers
    FROM pageviews_region_like_89
    WINDOW TUMBLING (SIZE 30 SECOND)
    GROUP BY userid, gender, regionid
    HAVING COUNT(*) > 1
EMIT CHANGES;
ksql> SELECT * FROM pageviews_per_region_89 EMIT CHANGES;

Reference

3. REST API

$ curl --location --request POST 'http://localhost:8088/ksql' \
--header 'Content-Type: application/json' \
--data-raw '{
  "ksql": "show streams;",
  "streamProperties":{}
}'

[
  {
    "@type": "streams",
    "statementText": "show streams;",
    "streams": [
      {
        "type": "STREAM",
        "name": "RIDERLOCATIONS",
        "topic": "locations",
        "keyFormat": "KAFKA",
        "valueFormat": "JSON",
        "isWindowed": false
      },
      ...생략...
      {
        "type": "STREAM",
        "name": "USER_PAGEVIEWS",
        "topic": "USER_PAGEVIEWS",
        "keyFormat": "KAFKA",
        "valueFormat": "AVRO",
        "isWindowed": false
      }
    ],
    "warnings": []
  }
]

The ksqlDB server provides a REST API, and the full API documentation is available at the link below.

4. Connector Management

ksqlDB can run connectors in two modes, which determine how and where the connectors are executed.

  • Embedded

    • In Embedded mode, ksqlDB runs the connectors directly on the server
  • External

    • This mode communicates with an external Kafka Connect cluster
ksql> CREATE SINK CONNECTOR `mongodb-test-sink-connector` WITH (
   "connector.class"='com.mongodb.kafka.connect.MongoSinkConnector',
   "key.converter"='org.apache.kafka.connect.json.JsonConverter',
   "value.converter"='org.apache.kafka.connect.json.JsonConverter',
   "key.converter.schemas.enable"='false',
   "value.converter.schemas.enable"='false',
   "tasks.max"='1',
   "connection.uri"='mongodb://MongoDBIPv4Address:27017/admin?readPreference=primary&appname=ksqldbConnect&ssl=false',
   "database"='local',
   "collection"='mongodb-connect',
   "topics"='test.topic'
);
Reference

5. KSQL Library

When

ksqlDB is used where real-time transformation, integration, and analysis of data are immediately required as it is based on Kafka for processing data. It can be used in various fields as follows:

  • Anomaly detection and fraud detection
  • Real-time analysis
  • Predictive analysis
  • Logistics and IoT management
  • Real-time alerts and notifications
  • Sensor data and IoT
  • Cybersecurity
Reference

FAQ

Refer to the link below for various ksqlDB FAQs.

1. Is ksqlDB Apache License 2.0?

  • No, it is not.
  • ksqlDB is licensed under the Confluent Community License and is managed as a Confluent company product.

2. Is there any constraints for Confluent Community License?

Apache 2.0 License | Confluent Community License | Confluent Enterprise License
Apache 2.0 License | Confluent Community License | Confluent Enterprise License

Wrap up

ksqlDB helps us easily process streaming data on Kafka with SQL syntax that we are already familiar with.

For a few stream and table creations, it is sufficient to create and use them in Control Center or CLI. However, when multiple pipelines between streams are required, it is recommended to use the Stream Designer UI, which allows for easier use.

Reference

@Frank Oh
안녕하세요. 방문해주셔서 감사합니다. 컴퓨터 관련 스터디한 내용 기록하는 블로그입니다.