Ksqldb: Table

Create a Table from Kafka Topic:

Table created this way will not be queryable!

CREATE TABLE `my-topic-table` (uid STRING PRIMARY KEY, amount DOUBLE) WITH (kafka_topic='my-topic', value_format='json');

To make it queryable, you have to derive it to a new Table:

CREATE TABLE `my-topic-queryable-table` AS SELECT * FROM `my-topic-table`;

Or you can make the Table read-only by adding SOURCE keyword, then it will be queryable:

CREATE SOURCE TABLE `my-topic-table` (uid STRING PRIMARY KEY, amount DOUBLE) WITH (kafka_topic='my-topic', value_format='json');

If you are using kafka-console-producer to produce a message, then:

  • The PRIMARY KEY needs to be STRING when creating the (source) table
  • Add --property parse.key=true --property key.seperator=:
  • Send the message as 1: {"name": "Ryan"}

CAVEATS:

  • PRIMARY KEY will map to Kafka Key
  • When you are using INT as the PRIMARY KEY, then you only can send exactly 4-byte characters as the KEY.
  • When you are using BIGINT as the PRIMARY KEY, then you only can send exactly 8-byte characters as the KEY.
  • Inserting into derived Table will not work (nothing changes)
  • When you have – (dash) as part of the name, then you will need to enclose it with ` (tick)
  • Filter by offset by using WHERE ROWOFFSET=n. You can check it with SELECT ROWOFFSET FROM … EMIT CHANGES. This only works in push query.
  • Filter by partition by using WHERE ROWPARTITION=n. You can check it with SELECT ROWOFFSET FROM … EMIT CHANGES. This only works in push query.
  • Filter by time by using WHERE ROWTIME=n. You can check it with SELECT ROWTIME FROM …

The prefered way to create a Table is from a Stream by using CREATE TABLE sometable AS SELECT

References: