使用Canal将数据库中的变更同步到ES中,实现数据同步功能
1. 前言
1.1 什么是Canal?
Canal 组件是一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(如 Kafka、RocketMQ 等)或者存储(如 Elasticsearch、HBase 等)的组件。
Canal 感知到MySQL数据变动,然后解析变动数据,将变动数据发送到MQ或者同步到其他数据库,等待进一步业务逻辑处理
1.2 Canal的工作原理
Canal的工作原理
2. Canal在项目中的作用
项目地址
Canal 的主要作用是实现 MySQL → Elasticsearch 的实时数据同步
drug 微服务中的药品数据存储在 MySQL,而搜索功能(aimin-search 微服务)依赖的是 Elasticsearch 索引库 DrugIndex。
如果:
- 后台管理员修改药品信息(MySQL 更新)
- 新增/删除药品记录(MySQL 插入/删除)
如果 ES 不同步,就会出现:
- 搜索不到新药品
- 搜索结果无变化
- 搜索字段是旧数据
- 药品名改了,但搜索结果不变
所以项目使用 Canal 监听 MySQL binlog,实现数据库变化事件驱动的实时ES同步。
项目中 Canal 的完整链路:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 管理员在 aimin-admin 中修改药品 ↓ MySQL drug 表执行 UPDATE ↓ MySQL 写入 binlog ↓ Canal Server 监听 binlog ↓ CanalReceiver(drug 微服务)接收到事件 ↓ 解析 rowData 转为 DrugIndex ↓ 调用 ElasticsearchTemplate.save() ↓ ES 索引库实现实时更新 ↓ aimin-search 搜索接口返回最新结果
|
3. 如何集成Canal实现ES的数据同步
3.1 引入 Canal 客户端依赖(POM)
在 aimin-drug 的 pom.xml 中引入 Canal 客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.8</version> </dependency>
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.8</version> </dependency>
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.common</artifactId> <version>1.1.8</version> </dependency>
|
这一步让 drug 微服务具备连接 Canal Server 并解析 binlog 的能力。
3.2 在 bootstrap.yml 中配置 Canal 客户端
1 2 3 4 5 6 7 8
| canal: server: ip: localhost port: 11111 destination: example initialDelay: 5000 fixedDelay: 5000 subscribe: .*\..*
|
| 配置项 |
含义 |
ip、port |
Canal Server 地址 |
destination |
默认 Canal 实例名 example |
subscribe |
订阅所有表 |
initialDelay |
延迟 5 秒启动同步 |
fixedDelay |
每 5 秒轮询一次 Canal |
3.3 CanalProperties 将 YAML 配置绑定为 Java 对象
这里将 YAML 配置映射为 POJO,用于建立 CanalConnector:
1 2 3 4 5 6 7 8 9 10
| @ConfigurationProperties(prefix = "canal") @Component @Data public class CanalProperties { private String destination; private Server server; private Long initialDelay; private Long fixedDelay; private String subscribe; }
|
3.4 用 CanalConfig 创建 CanalConnector(核心配置类)
1 2 3 4 5 6 7 8
| @Bean public CanalConnector newSingleConnector() { return CanalConnectors.newSingleConnector( new InetSocketAddress(canalProperties.getServer().getIp(), canalProperties.getServer().getPort()), canalProperties.getDestination(), "canal", "canal"); }
|
创建Canal客户端监听binlog
newSingleConnector() 会创建一个直连 Canal Server 的客户端,用 canal/canal 登录,并指定 destination=example。
之后,所有 binlog 消息都通过此连接读取。
3.5 CanalReceiver
真正消费 MySQL binlog 的地方(核心类)
3.5.1 @PostConstruct:项目启动后连接 Canal Server
1 2 3 4 5 6
| @PostConstruct public void connect() { canalConnector.connect(); canalConnector.subscribe(canalProperties.getSubscribe()); canalConnector.rollback(); }
|
流程:
- connect():连接 canal server
- subscribe(“...”):订阅所有 schema 下的所有表
- rollback():回滚到上次未 ack 的位置,保证不丢 binlog
3.5.2 @Scheduled + @Async 定时异步拉取 binlog
1 2 3
| @Async @Scheduled(initialDelayString = "${canal.initialDelay}", fixedDelayString = "${canal.fixedDelay}") public void processData() {}
|
| 功能 |
实现方式 |
| 定时任务 |
@Scheduled 每 5 秒执行一次 |
| 异步执行 |
@Async 不阻塞主线程 |
| 自动重连 |
canalConnector.checkValid() |
3.5.3 从 Canal 读取 binlog 数据
1 2
| Message message = canalConnector.getWithoutAck(100, 100L, TimeUnit.MILLISECONDS); batchId = message.getId();
|
3.5.4 解析 binlog RowChange(真实业务处理)
得到事件类型:
1 2
| CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType();
|
3.5.5 根据 eventType 对药品表进行处理
在 handleData() 中处理不同表:
- 处理 t_drug 表(药品信息表)
- 处理 t_drug_img 表(药品图片表)
3.5.6 插入/更新/删除药品:同步到 Elasticsearch
1 2 3 4 5 6 7 8 9 10
| private void insertDrug(Map<String, Object> afterParam){ DrugIndex bean = BeanUtil.toBean(afterParam, DrugIndex.class);
if(bean.getCategoryId() != null){ DrugCategory category = categoryService.getByIdDeep(bean.getCategoryId()); bean.setCategoryName(category.getName()); }
elasticsearchTemplate.save(bean); }
|
- MySQL 新增药品 → 立即写入 ES 索引库
- 自动补充分类名称(从 categoryService 查询)
3.5.7 最终 ack
1
| canalConnector.ack(batchId);
|
- 当前 binlog batch 已正确消费
- 下次采集从下一批开始
3.6 数据流动图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| MySQL drug 表数据变化 ↓ 写入 binlog Canal Server 监听 binlog ↓ 发送给 Canal 客户端 aimin-drug 中的 CanalReceiver 拉取消息 ↓ 解析表名 + 字段 + eventType 根据事件类型处理: INSERT → ES 新增索引 UPDATE → ES 更新索引 DELETE → ES 删除索引 ↓ Elasticsearch 保存最新药品索引信息 ↓ aimin-search 搜索服务即时返回最新数据
|
参考文献:
一篇文章让你上手Canal数据同步神技~
END