使用Canal将数据库中的变更同步到ES中,实现数据同步功能

使用Canal将数据库中的变更同步到ES中,实现数据同步功能

1. 前言

1.1 什么是Canal?

Canal 组件是一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(如 Kafka、RocketMQ 等)或者存储(如 Elasticsearch、HBase 等)的组件。
Canal 感知到MySQL数据变动,然后解析变动数据,将变动数据发送到MQ或者同步到其他数据库,等待进一步业务逻辑处理

1.2 Canal的工作原理

Canal的工作原理

2. Canal在项目中的作用

项目地址

Canal 的主要作用是实现 MySQL → Elasticsearch 的实时数据同步

drug 微服务中的药品数据存储在 MySQL,而搜索功能(aimin-search 微服务)依赖的是 Elasticsearch 索引库 DrugIndex

如果:

  • 后台管理员修改药品信息(MySQL 更新)
  • 新增/删除药品记录(MySQL 插入/删除)

如果 ES 不同步,就会出现:

  • 搜索不到新药品
  • 搜索结果无变化
  • 搜索字段是旧数据
  • 药品名改了,但搜索结果不变

所以项目使用 Canal 监听 MySQL binlog,实现数据库变化事件驱动的实时ES同步

项目中 Canal 的完整链路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
管理员在 aimin-admin 中修改药品 

MySQL drug 表执行 UPDATE

MySQL 写入 binlog

Canal Server 监听 binlog

CanalReceiver(drug 微服务)接收到事件

解析 rowData 转为 DrugIndex

调用 ElasticsearchTemplate.save()

ES 索引库实现实时更新

aimin-search 搜索接口返回最新结果

3. 如何集成Canal实现ES的数据同步

3.1 引入 Canal 客户端依赖(POM)

aimin-drugpom.xml 中引入 Canal 客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.8</version>
</dependency>

<!-- 必须手动补上协议依赖,否则 Message 类找不到 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.8</version>
</dependency>

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
<version>1.1.8</version>
</dependency>

这一步让 drug 微服务具备连接 Canal Server 并解析 binlog 的能力。

3.2 在 bootstrap.yml 中配置 Canal 客户端

1
2
3
4
5
6
7
8
canal:
server:
ip: localhost
port: 11111
destination: example
initialDelay: 5000
fixedDelay: 5000
subscribe: .*\..*
配置项 含义
ipport Canal Server 地址
destination 默认 Canal 实例名 example
subscribe 订阅所有表
initialDelay 延迟 5 秒启动同步
fixedDelay 每 5 秒轮询一次 Canal

3.3 CanalProperties 将 YAML 配置绑定为 Java 对象

这里将 YAML 配置映射为 POJO,用于建立 CanalConnector:

1
2
3
4
5
6
7
8
9
10
@ConfigurationProperties(prefix = "canal")
@Component
@Data
public class CanalProperties {
private String destination;
private Server server;
private Long initialDelay;
private Long fixedDelay;
private String subscribe;
}

3.4 用 CanalConfig 创建 CanalConnector(核心配置类)

1
2
3
4
5
6
7
8
@Bean
public CanalConnector newSingleConnector() {
return CanalConnectors.newSingleConnector(
new InetSocketAddress(canalProperties.getServer().getIp(),
canalProperties.getServer().getPort()),
canalProperties.getDestination(),
"canal", "canal");
}

创建Canal客户端监听binlog

newSingleConnector() 会创建一个直连 Canal Server 的客户端,用 canal/canal 登录,并指定 destination=example。
之后,所有 binlog 消息都通过此连接读取。

3.5 CanalReceiver

真正消费 MySQL binlog 的地方(核心类)

3.5.1 @PostConstruct:项目启动后连接 Canal Server

1
2
3
4
5
6
@PostConstruct
public void connect() {
canalConnector.connect();
canalConnector.subscribe(canalProperties.getSubscribe());
canalConnector.rollback();
}

流程:

  • connect():连接 canal server
  • subscribe(“...”):订阅所有 schema 下的所有表
  • rollback():回滚到上次未 ack 的位置,保证不丢 binlog

3.5.2 @Scheduled + @Async 定时异步拉取 binlog

1
2
3
@Async
@Scheduled(initialDelayString = "${canal.initialDelay}", fixedDelayString = "${canal.fixedDelay}")
public void processData() {}
功能 实现方式
定时任务 @Scheduled 每 5 秒执行一次
异步执行 @Async 不阻塞主线程
自动重连 canalConnector.checkValid()

3.5.3 从 Canal 读取 binlog 数据

1
2
Message message = canalConnector.getWithoutAck(100, 100L, TimeUnit.MILLISECONDS);
batchId = message.getId();

3.5.4 解析 binlog RowChange(真实业务处理)

得到事件类型:

  • INSERT
  • UPDATE
  • DELETE
1
2
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();

3.5.5 根据 eventType 对药品表进行处理

handleData() 中处理不同表:

  • 处理 t_drug 表(药品信息表)
    • updateDrug(afterParam);
  • 处理 t_drug_img 表(药品图片表)

3.5.6 插入/更新/删除药品:同步到 Elasticsearch

1
2
3
4
5
6
7
8
9
10
private void insertDrug(Map<String, Object> afterParam){
DrugIndex bean = BeanUtil.toBean(afterParam, DrugIndex.class);

if(bean.getCategoryId() != null){
DrugCategory category = categoryService.getByIdDeep(bean.getCategoryId());
bean.setCategoryName(category.getName());
}

elasticsearchTemplate.save(bean);
}
  • MySQL 新增药品 → 立即写入 ES 索引库
  • 自动补充分类名称(从 categoryService 查询)

3.5.7 最终 ack

1
canalConnector.ack(batchId);
  • 当前 binlog batch 已正确消费
  • 下次采集从下一批开始

3.6 数据流动图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MySQL drug 表数据变化
↓ 写入 binlog
Canal Server 监听 binlog
↓ 发送给 Canal 客户端
aimin-drug 中的 CanalReceiver 拉取消息
↓ 解析表名 + 字段 + eventType
根据事件类型处理:
INSERT → ES 新增索引
UPDATE → ES 更新索引
DELETE → ES 删除索引

Elasticsearch 保存最新药品索引信息

aimin-search 搜索服务即时返回最新数据

参考文献:
一篇文章让你上手Canal数据同步神技~

END