10分钟了解变换数据的捕获-CDC

1 概念

变化数据捕获 (Change Data Capture,缩写CDC) 是指识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除等),然后将这些更改按发生的顺序完整记录下来,并实时通过消息中间件传送到下游流程或系统的过程。通过这种方式,CDC能够向数据仓库提供高效、低延迟的数据传输,以便信息被及时转换并交付给专供分析的应用程序。

img

2 常见的开源方案

img

1.Maxwell,这个框架是基于kafka的,然后通过修改MySQL配置文件可以获取其二进制日志文件,然后通过producer/consumer机制可以实现对biolog(二进制文件)的解析,并输出至kafka。这个的缺点是主要面向MySQL至不同中间管道的输出,而不是各种数据库至中间管道的输出。

2.Debezium,这个框架大概原理上和上面的一致,不同点在于:它是基于confluent platform下的kafka connect,算是数据采集的一种。同时kafka connect前景蛮不错,比flume,sqoop好很多。而且现在这个框架不仅仅有MySQL的采集,还有其他的,例如orale,progressSQL等。

3.canal,这个是阿里出来一个框架,通过伪装可以获取MySQL的二进制文件,但是它有个缺点是需要使用自己的消费端去消费解析后的数据。不过这个很成熟,而且说明文件也是中文的,很好理解。

3 DebeziumEngine(嵌入)

使用嵌入式开发,可以将其集成在项目中,从而跳过原客户端的kafka

注:

  • 版本应选择为1.5版本,1.5以后的版本最低jdk支持需要jdk11

POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--Debezium-->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>1.5.4.Final</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.5.4.Final</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.5.4.Final</version>
</dependency>

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@org.springframework.context.annotation.Configuration
public class DebeziumConfigure {

@Bean
public Configuration configuration(){
return initConfig();
}

public static Configuration initConfig() {
return io.debezium.config.Configuration.create()
// 连接器的Java类名称
.with("connector.class", MySqlConnector.class.getName())
// 偏移量持久化,用来容错 默认值
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
// 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
// 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
.with("offset.storage.file.filename", "./tmp/offsets.dat")
// 捕获偏移量的周期
.with("offset.flush.interval.ms", "6000")
// 连接器的唯一名称
.with("name", "mysql-connector")
// 数据库的hostname
.with("database.hostname", "127.0.0.1")
.with("database.serverTimezone", "Asia/Shanghai")
// 端口
.with("database.port", "3306")
// 用户名
.with("database.user", "root")
// 密码
.with("database.password", "19981104")
// 包含的数据库列表
.with("database.include.list", "beadhouse_survey")
// 监听的数据表list
.with("table.include.list", "beadhouse_survey.pro_account_client")
// 是否包含数据库表结构层面的变更,建议使用默认值true
.with("include.schema.changes", "false")
// mysql.cnf 配置的 server-id
.with("database.server.id", "123454")
// MySQL 服务器或集群的逻辑名称
.with("database.server.name", "customer-mysql-db-server")
// 历史变更记录
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
// 历史变更记录存储位置
.with("database.history.file.filename", "./tmp/dbhistory.dat")
.build();
}
}

连接初始化类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Component
public class DebeziumContent implements InitializingBean, SmartLifecycle {

// 日志
private static final Logger log = LoggerFactory.getLogger(DebeziumContent.class);

// 单线程定时执行器
private final Executor executor = Executors.newSingleThreadExecutor();

// Debezium引擎
private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

// 注入配置类
@Autowired
private Configuration debeziumConfigure;

// 初始化配置,创建引擎
public void initContent() {
debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(debeziumConfigure.asProperties())
.notifying(this::handlePayload)
.build();
}

// 事件处理
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
recordChangeEvents.forEach(r -> {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置
Object obj = sourceRecordChangeValue.get(OPERATION);
if (obj == null) {
return;
}
Envelope.Operation operation = Envelope.Operation.forCode((String) obj);

if (operation != Envelope.Operation.READ) {
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
// 获取增删改对应的结构体数据
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
// 打印
String[] split = struct.schema().name().split("\\.");
log.info("数据库:{},表:{},操作:{},结果:{}", split[1], split[2], operation, payload);
}
}
});
}

// 配置注入后回调
@Override
public void afterPropertiesSet() throws Exception {
// 配置初始化好后,执行初始化引擎
this.initContent();
Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
}

@Override
public void start() {
// bean准备好后,执行定时任务,监听数据库
executor.execute(debeziumEngine);
}

@Override
public void stop() {
// 容器关闭后,先对引擎进行关闭
try {
debeziumEngine.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public boolean isRunning() {
// 返回当前bean的状态
return debeziumEngine != null;
}
}

测试

  • 启动项目

  • 对监听的数据库进行操作

  • 观察控制台,成功打印出该数据库的操作