云服务器网:购买云服务器和VPS必上的网站!

Mysql到Elasticsearch高效实时同步Debezium实现

题记
来自Elasticsearch中文社区的问题——MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎样使用logstash实现MySQL实时增量导数据到es中?
logstash和kafka_connector都仅支持基于自增

5.5 验证写入会不会成功。

查看kafka-topic

kafka-topics --list --zookeeper localhost:2181

此处会看到写入数据topic的信息。

注意新写入数据topic的格式:database.schema.table-smt 三部份组成。

本示例topic名称:full.kafka_base_db.account-smt。

5.6 验证消费数据验证写入会不会正常

./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium实现mysql同步kafka完成。

6、kafka-connector实现kafka同步Elasticsearch

6.1、Kafka-connector介绍

见官网:https://docs.confluent.io/current/connect.html

Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。
连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或Kafka数据写入目标数据库,也能够自己开发连接器。

6.2、kafka到ES connector同步配置

配置路径:

/home/confluent⑸.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置内容:

<div ,
"tasks.max": "1",
"topics": "full.kafka_base_db.account-smt",
"key.ignore": "true",
"connection.url": "http://192.168.1.22:9200",
"type.name": "_doc",
"name": "elasticsearch-sink-test"

6.3 kafka到ES启动connector

启动命令

confluent load elasticsearch-sink-test
-d /home/confluent⑸.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector详情信息可以借助postman或浏览器或命令行查看。

curl -X GET http://localhost:8083/connectors

7、坑复盘。

坑2 同步的进程中可能出现毛病,比如:kafka topic没法消费到数据。
排解思路以下:

1)确认消费的topic会不会是写入数据的topic;2)确认同步的进程中没有出错。可以借助connector以下命令查看。

curl -X GET http://localhost:8083/connectors-xxx/status

坑3 Mysql2ES出现日期格式不能辨认。
是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息便可。

坑4 kafka2ES,ES没有写入数据。
排解思路:

1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态辨认生成。

2)通过connetor/status排查出错缘由,一步步分析。

8、小结

binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已实现。

对照:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

参考:
[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn
[3] https://juejin.im/post/5b7c036bf265da43506e8cfd
[4] https://debezium.io/docs/connectors/mysql/#configuration
[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

本篇文章到此结束,如果您有相关技术方面疑问可以联系我们技术人员远程解决,感谢大家支持本站!

本文来源:https://www.yuntue.com/post/151357.html | 云服务器网,转载请注明出处!

关于作者: yuntue

云服务器(www.yuntue.com)是一家专门做阿里云服务器代金券、腾讯云服务器优惠券的网站,这里你可以找到阿里云服务器腾讯云服务器等国内主流云服务器优惠价格,以及海外云服务器、vps主机等优惠信息,我们会为你提供性价比最高的云服务器和域名、数据库、CDN、免费邮箱等企业常用互联网资源。

为您推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注