简介
debezium是一个为了捕获数据变更(cdc)的开源的分布式平台。启动并指向数据库,当其他应用对此数据库执行inserts
、updates
、delete
操作时,此应用快速得到响应。debezium是持久化和快速响应的,因此你的应用可以快速响应且不会丢失任意一条事件。debezium记录是数据库表的行级别的变更事件。同时debezium是构建在kafka之上的,同时与kafka深度耦合,所以提供kafka connector来使用,debezium sink。支持的数据库有mysql、MongoDB、PostgreSQL、Oracle、SQL server。本篇以mysql作为数据源来实现功能,监听msyql的binlog,还需要修改。当前版本是0.9.5.Final,0.10版本正在开发中。
配置
本篇文章主要使用Embedding
形式监听事件不借助MQ消息中间介,并同步更新到数据库。
下篇主要使用kafka connector
来同步更新到数据库。
mysql需要如下开启binlog。但是如果使用的是debezium/mysql镜像,自动已经配置好了。
1 | log-bin=mysql-bin #添加这一行就ok |
开胃菜
先来一个效果,主要是配置kafka connector来获取debezium事件记录。需要3个服务,zookeeper、kakfa和debezium connector。这里使用docker来启动的,所以需要先安装docker。
启动zookeeper
1 | docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9 |
启动kafka
1 | docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9 |
启动mysql
1 | docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9 |
启动kafka connect
1 | docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9 |
通过connect的http请求创建debezium connector
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }' |
mysql客户端操作
通过invertory
数据库了的任一表的数据
创建监听可以查看debezium事件记录
1 | docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers |
内嵌式
这里主要使用内嵌式的方式获取cdc事件而不需要使用kafka,直接消费debezium事件流。场景是在某一个mysql数据库里的table发生变更,把变更同步到另一mysql数据库。本次使用的是监听inventory
数据库并将数据同步到inventory_back
。
debezium配置
1 | connector.class=io.debezium.connector.mysql.MySqlConnector |
属性和convert配置
1 | 4j |
同步DDL和DML
这里主要是利用CommandLineRunner特性,启动debezium的EmbeddedEngine引擎,获取到cdc事件后由handleRecord
处理DDL和DML,需要去解析cdc的事件SourceRecord
的key和value。
1 | 4j |
provider和table字段解析器太多,这里就不在一一列出来了,如下图所示,支持mysql大部分字段类型。如果有需要的可以关注微信公众号或者邮件以及评论回复。
测试表结构
1 | CREATE TABLE `demo` ( |
结果
DDL事件
可以看出将数据库表的bigint_id
字段长度改为21,监听到事件后:执行了ddl语句,inventory_back
库中的demo
表的bigint_id
字段长度改为21了。
1 | Publishing Topic --> dbserver1 |
DML的insert事件
在inventory
库中的demo
新增一条记录后有如下日志记录,能查看到topic,key,payload以及dml的insert语句。结果会把数据同步到inventory_back
库中的demo
。
1 | 2019-06-24 16:27:14.735 INFO 14995 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader : 1 records sent during previous 00:04:53.506, last recorded offset: {ts_sec=1561364834, file=mysql-bin.000006, pos=23002, row=1, server_id=223344, event=2} |
DML的update事件
在inventory
库中的demo
修改刚刚新增的记录后有如下日志记录,能查看到topic,key,payload以及先delete再insert语句。结果会把数据同步到inventory_back
库中的demo
。
DML的delete事件
在inventory
库中的demo
修改刚刚修改的记录给删除掉后有如下日志记录,能查看到topic,key,payload以及先delete语句。结果会把数据同步到inventory_back
库中的demo
将其删掉。这里有2个事件,第二条事件是一种标致,这里不处理。
日志:
参考
debezium官网
Tutorial
Embedding Debezium