腾讯大牛教你ClickHouse实时同步MySQL数据

100人浏览   2025-02-16 00:24:03



ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合Canal+Kafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎大家批评指正。

首先来看看我们的需求背景:

1. 实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数目亿级别,可以接受分钟级别的同步延迟;

2. 某些数据库表存在分库分表的操作,用户需要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中;

3. 现有的MySQL binlog开源组件(Canal),无法做到多张源数据表到一张目的表的映射关系。





基本原理




一、使用JDBC方式同步

1. 使用Canal组件完成binlog的解析和数据同步;

2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;

3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到ClickHouse;


优点:

1. Canal组件原生支持;

缺点:

1. Canal-Adpater写入时源表和目的表一一对应,灵活性不足;

2. 需要维护两个Canal组件进程;

二、Kafka+ClickHouse物化视图方式同步

1. Canal-Server完成binlog的解析,并且将解析后的json写入Kafka;

2. Canal-Server可以根据正则表达式过滤数据库和表名,并且根据规则写入Kafka的topic;

3. ClickHouse使用KafkaEngine和Materialized View完成消息消费,并写入本地表;


优点:

1. Kafka支持水平扩展,可以根据数据规模调整partition数目;

2. Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能;

3. Canal-Server支持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写入的Kafka topic名称;

缺点:

1. 需要维护Kafka和配置规则;

2. ClickHouse需要新建相关的视图、Kafka Engine的外表等;





具体步骤




一、准备工作

1. 如果使用TencentDB,则在控制台确认binlog_format为ROW,无需多余操作。


如果是自建MySQL,则在客户端中查询变量:


2. 创建账号canal,用于同步binlog


二、Canal组件部署

前置条件:

Canal组件部署的机器需要跟ClickHouse服务和MySQL网络互通;

需要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;

基本概念:



1. Canal-Server组件部署

Canal-Server的主要作用是订阅binlog信息并解析和定义instance相关信息,建议每个Canal-Server进程对应一个MySQL实例;

1)下载canal.deployer-1.1.4.tar.gz,解压

2)修改配置文件conf/canal.properties,需要关注的配置如下:


3)配置instance

可以参照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。

订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变更(例如tb_20200801 、 tb_20200802等),主要的步骤如下:

步骤1:创建example2实例:cddeployer/conf && cp -r example example2

步骤2:修改deployer/conf/example2/instance.properties文件

步骤3:在conf/canal.properties中修改 canal.destinations ,新增example2

订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,并且数据写入Kafka;

步骤1:创建example2实例:cddeployer/conf && cp -r example example3

步骤2:修改deployer/conf/example3/instance.properties文件


步骤3:在Kafka中新建topic employees_topic,指定分区数目为3


步骤4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服务模式为kafka,配置kafka相关信息;


2. Canal-Adapter组件部署(只针对方案一)

Canal-Adapter的主要作用是通过JDBC接口写入ClickHouse数据,可以配置多个表的写入;

1)下载canal.adapter-1.1.4.tar.gz,解压;

2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;


4)修改配置文件conf/rdb/mytest_user.yml文件

上述的配置文件中,由于开启了mirrorDb: true,目的端的ClickHouse必须有相同的数据库名和表名。

修改adapter的conf/rdb/mytest_user.yml配置文件,指定源数据库和目标数据库


在conf/rdb 目录配置多个yml文件,分别指明不同的table名称。









Kafka 服务配置




一、调整合理的producer参数

确认Canal-Server里的canal.properties文件,重要参数见下表;


二、新建相关的topic名称

根据Canal-Server里instance里配置文件instance.properties,注意分区数目与canal.mq.partitionsNum 保持一致;

partition数目需要考虑以下因素:

1. 上游的MySQL的数据量。原则上数据写入量越大,应该分配更多的partition数目;

2. 考虑下游ClickHouse的实例数目。topic的partition分区总数 最好 不大于 下游ClickHouse的总实例数目,保证每个ClickHouse实例都能至少分配到一个partition;









ClickHouse服务配置




根据上游MySQL实例的表的schema新建数据表;

引入Kafka时需要额外新建Engine=Kafka的外表以及相关的物化视图表;

建议:

1. 为每个外表新增不同的 kafka_group_name,防止相互影响;

2. 设置kafka_skip_broken_messages 参数为合理值,遇到无法解析数据会跳过;

3. 设置合理的kafka_num_consumers值,最好保证所有ClickHouse实例该值的总和大于 topic的partition数目;

新建相关的分布式查询表;





服务启动




启动相关的Canal组件进程;

1. canal-server: sh bin/startup.sh

2. canal-adapter: sh bin/startup.sh

在MySQL中插入数据,观察日志是否可以正常运行;

如果使用Kafka,可以通过kafka-console-consumer.sh脚本观察binlog数据解析;


观察ClickHouse数据表中是否正常写入数据;









实际案例




需求:实时同步MySQL实例的empdb_0.employees_20200801表和empdb_1.employees_20200802数据表

方案:使用方案二

环境及参数:


1.在MySQL新建相关表


2. Canal-Server配置

步骤1. 修改conf/canal.properties文件


步骤2. 新增employees实例,修改employees/instances.properties配置


3. Kafka配置

4. 新增topic employees_topic,分区数为3

5. ClickHouse建表


6. 启动Canal-Server服务

MySQL实例上游插入数据,观察数据是否在Canal-Server解析正常,是否在ClickHouse中完成同步。


相关推荐