How to Pipe Your Data with Kafka Connect

As software engineers we tend to like green field development—the process of starting from scratch without reusing existing code—because it allows us to start fresh with new sets of technologies and proper architectures that are more closely tied to the requirements of the business. But in more realistic scenarios, you don’t start developing software from scratch very often. There is a lot of infrastructure already built that you need to consider in your new designs. In this more typical form of development, your project probably involves migrating data between an old database system and a more modern one, or you might be migrating to a simpler database to reduce licensing costs. Within this article I will discuss the benefits of using Kafka Connect for complex data migrations such as these.

 

To Script or Not—Consider the Case of Syncing Data Between Old and New Systems

So let’s say the company that is hiring you wants to move its relational database to a non-relational one. During my young impulsive years as a developer, I would have gone straight away to write some scripts to move data around. That would do the trick—if moving the data was a one time thing. But what if it turned out that, because the old database couldn’t be shut down, you needed to move that data every month, or every week, or every day? 

Let’s make it even more complex. The data you are moving is critical to the business and you cannot mess it up. Even more complex, the data syncing needs to escalate properly as data size increases. 

Would you still trust your cool scripts? Are you confident enough they will be reliable, resilient, and scalable?

 

Beyond Scripts: A Cool New Way to Pipe Data

Our experience tells us that old and new systems often live in conjunction for quite some time before the old system can be retired. Our experience also tells us that it’s not so easy to go big-bang-switching database systems. These are very good reasons to consider Kafka, one of the current, coolest ways to stream data among systems or applications. 

You may already know something about the producer-consumer capabilities of Kafka, but you should take a look at a sharp component of this platform: Kafka Connect.  Kafka Connect gives you toolsets to interconnect data pipes with all sorts of different types of valves. These valves come in the form of connectors that can either grab data from a source, or insert data into another one. 

One of the main advantages of Kafka Connect is the simplicity. Forget about those Python scripts you were already compiling in your head. With Kafka Connect, you just need to write configuration files in the form of JSON or properties format. Choose from a variety of already developed connectors and then tune your settings according to the needs of your data syncing. 

 

Kafka Connect: A Sample Project to Sync Data

Let’s put on our plumber gloves and pipe some example data.

Docker Compose Project

Apache Kafka is an open source project that Confluent has packed in an elegant platform. You can easily configure the different dependencies in a docker-compose project. In the following GitHub project, I put all the required components for this project using the Confluent sample project, adding an extra configuration to initialize a MySQL docker image with the Employees Sample Database, and a MongoDB docker image to be used as data destination.

Let’s run the project and inflate the MySQL Employees database:

docker-compose up --build

Run the project and inflate the MySQL Employees database

The project also has the required connector classes to extract data from JDBC source (along with MySQL driver), and put it into MongoDB. Now let’s write the connectors.

Source Connector

This is the JDBC source connector that will be used to extract the data from the MySQL database.

{
 "name":"samples.employees.source.sync.connector",
 "config":{
   "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
   "tasks.max":"1",
   "key.converter":"org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable":"false",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter.schemas.enable":"false",
   "connection.url":"jdbc:mysql://mysql-employees:3306/employees",
   "connection.user":"root",
   "connection.password":"123456",
   "mode":"incrementing",
   "incrementing.column.name": "emp_no",
   "query": "select * from (select e.emp_no, birth_date, first_name, last_name, gender, hire_date, max(s.salary) as top_salary from employees e join salaries s on e.emp_no = s.emp_no group by e.emp_no) as employee_salaries",
   "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
   "numeric.mapping":"best_fit",
   "validate.non.null":"false",
   "poll.interval.ms":3000,
   "batch.max.rows":10000,
   "timestamp.delay.interval.ms":0,
   "topic.prefix":"samples.employees",
   "errors.log.include.messages": true,
   "errors.log.enable": true
 }
}

In a nutshell, we are saying that we want to extract data from the database specified in connection.url, specifically the data from the query. This will be done every single time (poll.intervall.ms), but we will only pull new incrementing data according to the incrementing.column.name.

In this query, we are joining two tables: employees and employees_salaries. We can also imagine a scenario where this could be used for reporting purposes. We can deserialize the data with some joins in the source connector, import the data into the MongoDB collection, and query it to avoid impacting the business transactional database. 

 

Sink Connector

Now let’s take a look at the MongoDB sink connector:

{
 "name": "samples.employees.target.sync.connecto",
 "config": {
   "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
   "tasks.max": "2",
   "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable":"false",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter.schemas.enable":"false",
   "topics": "samples.employees",
   "mongodb.connection.uri": "mongodb://employees:employees123@mongo:27017/Employees",
   "mongodb.collection": "Employees",
   "mongodb.post.processor.chain": "at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByMapping",
   "mongodb.field.renamer.mapping": "[{\"oldName\":\"value.emp_no\",\"newName\":\"employeeNumber\"},{\"oldName\":\"value.first_name\",\"newName\":\"firstName\"},{\"oldName\":\"value.lastName\",\"newName\":\"lastName\"},{\"oldName\":\"value.birth_date\",\"newName\":\"birthDate\"},{\"oldName\":\"value.hire_date\",\"newName\":\"hireDate\"},{\"oldName\":\"value.top_salary\",\"newName\":\"topSalary\"}]",
   "transforms": "convertBirthDate,convertHireDate",
   "transforms.convertBirthDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
   "transforms.convertBirthDate.target.type": "Timestamp",
   "transforms.convertBirthDate.field": "birth_date",
   "transforms.convertBirthDate.format": "dd-MM-yyyy HH:mm:ss",
   "transforms.convertHireDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
   "transforms.convertHireDate.target.type": "Timestamp",
   "transforms.convertHireDate.field": "hire_date",
   "transforms.convertHireDate.format": "dd-MM-yyyy HH:mm:ss",
   "errors.log.enable": true
 }
}

Here we are saying that we want our data to be imported into the MongoDB specified by mongodb.connection.uri in the mongodb.collection. Apart from that, we are renaming the columns to follow a camel case format, and doing a few integer and timestamp transformations to overcome certain format migration details.

With these two basic JSON files, we are ready to run Kafka Connect. We can use the REST API to start the process:

 

curl -H $CT -d "@samples.employees.source.json" -X POST http://localhost:8083/connectors
curl -H $CT -d "@samples.employees.target.json" -X POST http://localhost:8083/connectors

 

After a few seconds, the data will start flowing and we can start monitoring our progress in the Control Center.

Here’s an example snapshot of the consumer lag. At this point 125,234 messages were pending to be consumed by the sink connector.

Example snapshot of the consumer lag

 

We can also take a look at the content of the messages by inspecting the topic:

 

Inspect the topic to look at content of the messages

 

At the end, we can see our data got migrated to the MongoDB collection:

 

Data migrated to the MongoDB collection

 

Free Yourself from Mundane Data Translation Scripts

The options with Kafka Connect are extensive. Imagine you need to index your data for fast text searches. You could plug a ElasticSearch connector to the same topic and have your data always indexed. Or, let’s say you need to do some data operations in AWS Lambda, just use the AWS lambda connector. Or you might need to detect deletions in our database row and keep the data fully in sync. You could use debezium connector  to do that. Next time you need to keep some data from different data systems in sync, leave your scripts behind and give Kafka Connect a try. Forget about the mundane task of translating data among data systems, and focus more on your new architecture. 

 

Subscribe to our Blog

Avatar
Gabriel Solano
Gabriel is a back-end developer with 12 years of experience. He specializes in providing solutions for web applications developed with Java frameworks. For the past seven years he has been a part of Agile teams that provide solutions for highly demanded e-commerce sites.

Deliver off-the-chart results.

WordPress Video Lightbox Plugin