Your cart is currently empty!
Kafka Connect: JDBC MariaDB/MySQL
Once you fire up Kafka Connect, you will need to install JDBC plugin from confluent hub:
confluent-hub install confluentinc/kafka-connect-jdbc:latest confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
Then you will need to download MariaDB/MySQL connector (it is not included with the JDBC) and drop it into the plugin directory.
If you are using MySQL connector, choose the “Platform Independent” and extract the tar/zip. You will find the JAR file inside. Move/copy the JAR file into the plugin directory.
wget https://dlm.mariadb.com/3752081/Connectors/java/connector-java-3.3.3/mariadb-java-client-3.3.3.jar \ -P /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
Restart!
Send POST request to Kafka Connect REST API to create a new connector.
If you are connecting to MySQL server change the “connection.url” to “jdbc:mysql://…”
{ "name": "mysql-example", "config": { "name": "mysql-example", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mariadb://localhost:3306/example", "connection.user": "root", "connection.password": "secret", "table.whitelist": "products", "mode": "timestamp", "timestamp.column.name": "updated_at" # "mode": "incrementing", # "incrementing.column.name": "id" } }
Things to note here is “mode”. I use “timestamp” as the value. It fits my use case because it records when the timestamp column changed/updated and when there’s a new row.
You will need to set these constraints to the “timestamp.column.name”:
- NOT NULL
- DEFAULT CURRENT_TIMESTAMP
References:
- https://docs.confluent.io/platform/current/connect/confluent-hub/command-reference/confluent-hub-install.html
- https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/
- https://dev.mysql.com/downloads/connector/j/
- https://docs.confluent.io/platform/current/connect/userguide.html#connect-installing-plugins
- https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html
- https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#incremental-query-modes
- https://docs.confluent.io/platform/current/connect/confluent-hub/command-reference/confluent-hub-install.html