Java Java 10分钟了解变换数据的捕获-CDC Moxy 2024-02-29 2024-02-29 1 概念
变化数据捕获 (Change Data Capture,缩写CDC) 是指识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除等),然后将这些更改按发生的顺序完整记录下来,并实时通过消息中间件传送到下游流程或系统的过程。通过这种方式,CDC能够向数据仓库提供高效、低延迟的数据传输,以便信息被及时转换并交付给专供分析的应用程序。
2 常见的开源方案
1.Maxwell,这个框架是基于kafka的,然后通过修改MySQL配置文件可以获取其二进制日志文件,然后通过producer/consumer机制可以实现对biolog(二进制文件)的解析,并输出至kafka。这个的缺点是主要面向MySQL至不同中间管道的输出,而不是各种数据库至中间管道的输出。
2.Debezium,这个框架大概原理上和上面的一致,不同点在于:它是基于confluent platform下的kafka connect,算是数据采集的一种。同时kafka connect前景蛮不错,比flume,sqoop好很多。而且现在这个框架不仅仅有MySQL的采集,还有其他的,例如orale,progressSQL等。
3.canal,这个是阿里出来一个框架,通过伪装可以获取MySQL的二进制文件,但是它有个缺点是需要使用自己的消费端去消费解析后的数据。不过这个很成熟,而且说明文件也是中文的,很好理解。
3 DebeziumEngine(嵌入)
使用嵌入式开发,可以将其集成在项目中,从而跳过原客户端的kafka
注:
版本应选择为1.5版本,1.5以后的版本最低jdk支持需要jdk11
POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <!--Debezium--> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>1.5.4.Final</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>1.5.4.Final</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>1.5.4.Final</version> </dependency>
配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @org .springframework.context.annotation.Configurationpublic class DebeziumConfigure { @Bean public Configuration configuration () { return initConfig(); } public static Configuration initConfig () { return io.debezium.config.Configuration.create() .with("connector.class" , MySqlConnector.class.getName()) .with("offset.storage" , "org.apache.kafka.connect.storage.FileOffsetBackingStore" ) .with("offset.storage.file.filename" , "./tmp/offsets.dat" ) .with("offset.flush.interval.ms" , "6000" ) .with("name" , "mysql-connector" ) .with("database.hostname" , "127.0.0.1" ) .with("database.serverTimezone" , "Asia/Shanghai" ) .with("database.port" , "3306" ) .with("database.user" , "root" ) .with("database.password" , "19981104" ) .with("database.include.list" , "beadhouse_survey" ) .with("table.include.list" , "beadhouse_survey.pro_account_client" ) .with("include.schema.changes" , "false" ) .with("database.server.id" , "123454" ) .with("database.server.name" , "customer-mysql-db-server" ) .with("database.history" , "io.debezium.relational.history.FileDatabaseHistory" ) .with("database.history.file.filename" , "./tmp/dbhistory.dat" ) .build(); } }
连接初始化类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 @Component public class DebeziumContent implements InitializingBean , SmartLifecycle { private static final Logger log = LoggerFactory.getLogger(DebeziumContent.class); private final Executor executor = Executors.newSingleThreadExecutor(); private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine; @Autowired private Configuration debeziumConfigure; public void initContent () { debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(debeziumConfigure.asProperties()) .notifying(this ::handlePayload) .build(); } private void handlePayload (List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) { recordChangeEvents.forEach(r -> { SourceRecord sourceRecord = r.record(); Struct sourceRecordChangeValue = (Struct) sourceRecord.value(); if (sourceRecordChangeValue != null ) { Object obj = sourceRecordChangeValue.get(OPERATION); if (obj == null ) { return ; } Envelope.Operation operation = Envelope.Operation.forCode((String) obj); if (operation != Envelope.Operation.READ) { String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER; Struct struct = (Struct) sourceRecordChangeValue.get(record); Map<String, Object> payload = struct.schema().fields().stream() .map(Field::name) .filter(fieldName -> struct.get(fieldName) != null ) .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); String[] split = struct.schema().name().split("\\." ); log.info("数据库:{},表:{},操作:{},结果:{}" , split[1 ], split[2 ], operation, payload); } } }); } @Override public void afterPropertiesSet () throws Exception { this .initContent(); Assert.notNull(debeziumEngine, "debeziumEngine must not be null" ); } @Override public void start () { executor.execute(debeziumEngine); } @Override public void stop () { try { debeziumEngine.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public boolean isRunning () { return debeziumEngine != null ; } }
测试
启动项目
对监听的数据库进行操作
观察控制台,成功打印出该数据库的操作