Kafka Connect: JDBC MariaDB/MySQL

Once you fire up Kafka Connect, you will need to install JDBC plugin from confluent hub:

confluent-hub install confluentinc/kafka-connect-jdbc:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

Then you will need to download MariaDB/MySQL connector (it is not included with the JDBC) and drop it into the plugin directory.

If you are using MySQL connector, choose the “Platform Independent” and extract the tar/zip. You will find the JAR file inside. Move/copy the JAR file into the plugin directory.

wget https://dlm.mariadb.com/3752081/Connectors/java/connector-java-3.3.3/mariadb-java-client-3.3.3.jar \
	-P /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

Restart!

Send POST request to Kafka Connect REST API to create a new connector.

If you are connecting to MySQL server change the “connection.url” to “jdbc:mysql://…”

{
	"name": "mysql-example",
	"config": {
		"name": "mysql-example",
		"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
		"connection.url": "jdbc:mariadb://localhost:3306/example",
		"connection.user": "root",
		"connection.password": "secret",
      	"table.whitelist": "products",
		"mode": "timestamp",
		"timestamp.column.name": "updated_at"
      	# "mode": "incrementing",
      	# "incrementing.column.name": "id"
	}
}

Things to note here is “mode”. I use “timestamp” as the value. It fits my use case because it records when the timestamp column changed/updated and when there’s a new row.

You will need to set these constraints to the “timestamp.column.name”:

  • NOT NULL
  • DEFAULT CURRENT_TIMESTAMP

References:

  • https://docs.confluent.io/platform/current/connect/confluent-hub/command-reference/confluent-hub-install.html
  • https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/
  • https://dev.mysql.com/downloads/connector/j/
  • https://docs.confluent.io/platform/current/connect/userguide.html#connect-installing-plugins
  • https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html
  • https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#incremental-query-modes
  • https://docs.confluent.io/platform/current/connect/confluent-hub/command-reference/confluent-hub-install.html