KSQL – The SQL for kafka

Kafka is a popular choice among developers and architects when designing micro-service based applications. The powerful event streaming nature and reliability are the main considerations when choosing Kafka. As we all know Kafka follows producer – broker – consumer flow. When it comes to stream processing we two options

  1. Write a seperate consumer application to process the incoming stream and publish bask to broker.
  2. Use KSQL to produce streams by manipulating streams or topics.

In second case, we do not need a separate consumer and instead we can manipulate streams withing ksql.

KSQL comes with few main components

  1. KSQL Server
  2. KSQL CLI
  3. Schema Registry (Optional)

We can use KSQL CLI to execute commands in KSQL server. The use of Schema registry is to hold metadata about schemas (JSON/AVRO) and it is kind of a database that act as a caching server for low latency data access.

Lets see how to setup ksql with kafka, here is the docker-compose file

version: '3.5'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.30.0-kafka-3.2.0
container_name: zookeeper
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
– "2181:2181"
environment:
LOG_DIR: /tmp/logs
networks:
– kafka-network
kafka:
image: quay.io/strimzi/kafka:0.30.0-kafka-3.2.0
container_name: kafka
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties –override listeners=$${KAFKA_LISTENERS} –override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} –override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
– zookeeper
ports:
– "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
#KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.7:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.7
networks:
– kafka-network
schema-registry:
image: confluentinc/cp-schema-registry:5.4.10
container_name: schema-registry
depends_on:
– zookeeper
– kafka
ports:
– 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
networks:
– kafka-network
ksql-server:
image: confluentinc/ksqldb-server:0.28.2
container_name: ksql-server
ports:
– 8088:8088
environment:
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SERVICE_ID: ksql_service_2_
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
depends_on:
– kafka
– schema-registry
networks:
– kafka-network
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksql-cli
entrypoint: /bin/sh
tty: true
volumes:
– ./ksql/queries.sql:/ksql/queries.sql
– ./ksql/wait-for-ksql-server.sh:/ksql/wait-for-ksql-server.sh
command: ["/ksql/wait-for-ksql-server.sh"]
depends_on:
– kafka
– ksql-server
networks:
– kafka-network
networks:
kafka-network:
name: kafkanet

In order to ksql-cli to operate, we have to wait until ksql-server startup.

#!/bin/sh
# wait-for-ksql-server.sh
set -e
while [ $(curl -s -o response.txt -w "%{http_code}" http://ksql-server:8088/info) -ne 200 ];
do
>&2 echo "KSQL Server is unavailable – retrying"
sleep 1
done
>&2 echo "KSQL Server is up"
ksql –file /ksql/queries.sql — http://ksql-server:8088
# Print and execute all other arguments starting with `$1`
# So `exec "$1" "$2" "$3" …`
keepgoing=1
trap '{ echo "sigint"; keepgoing=0; }' SIGINT
while (( keepgoing )); do
#echo "sleeping"
sleep 10
done

Note that we can execute SQL inside CLI, after ksql-server is ready, so we can use below command to manipulate data

`ksql --file /ksql/queries.sql -- http://ksql-server:8088`

or we can directly execute queries in cli

ksql -- http://ksql-server:8088

Queries

Create a new stream using a Topic

CREATE STREAM new_stream(product_name VARCHAR, active_substance VARCHAR, route_of_administration VARCHAR, product_authorisation_country VARCHAR) 
WITH (kafka_topic = 'topic_1', value_format = 'JSON');

Create a stream using existing stream, and transform data

CREATE STREAM formatted_stream
AS SELECT product_name, split(route_of_administration, ' ')[1] AS route, split(active_substance, ',')[1] AS active_substance_1, 
split(active_substance, ',')[2] AS active_substance_2 FROM new_stream;

Change value format on the fly

CREATE STREAM new_avro_stream WITH ( value_format = 'AVRO') AS SELECT * FROM formatted_stream;

There are much more operations you can try using KSQL, the complete reference is here : https://docs.ksqldb.io/en/latest/There are much more operations you can try using KSQL, the complete reference is here : https://docs.ksqldb.io/en/latest/

Leave a comment

Comments (

0

)