Your cart is currently empty!
Ksqldb: Table
Create a Table from Kafka Topic:
Table created this way will not be queryable!
CREATE TABLE `my-topic-table` (uid STRING PRIMARY KEY, amount DOUBLE) WITH (kafka_topic='my-topic', value_format='json');
To make it queryable, you have to derive it to a new Table:
CREATE TABLE `my-topic-queryable-table` AS SELECT * FROM `my-topic-table`;
Or you can make the Table read-only by adding SOURCE keyword, then it will be queryable:
CREATE SOURCE TABLE `my-topic-table` (uid STRING PRIMARY KEY, amount DOUBLE) WITH (kafka_topic='my-topic', value_format='json');
If you are using kafka-console-producer
to produce a message, then:
- The PRIMARY KEY needs to be STRING when creating the (source) table
- Add
--property parse.key=true --property key.seperator=:
- Send the message as
1: {"name": "Ryan"}
CAVEATS:
- PRIMARY KEY will map to Kafka Key
- When you are using INT as the PRIMARY KEY, then you only can send exactly 4-byte characters as the KEY.
- When you are using BIGINT as the PRIMARY KEY, then you only can send exactly 8-byte characters as the KEY.
- Inserting into derived Table will not work (nothing changes)
- When you have – (dash) as part of the name, then you will need to enclose it with ` (tick)
- Filter by offset by using WHERE ROWOFFSET=n. You can check it with SELECT ROWOFFSET FROM … EMIT CHANGES. This only works in push query.
- Filter by partition by using WHERE ROWPARTITION=n. You can check it with SELECT ROWOFFSET FROM … EMIT CHANGES. This only works in push query.
- Filter by time by using WHERE ROWTIME=n. You can check it with SELECT ROWTIME FROM …
The prefered way to create a Table is from a Stream by using CREATE TABLE sometable AS SELECT
…
References: