canal实战(二):使用canal-kafka实现数据库增量实时更新

15 篇文章 28 订阅
7 篇文章 0 订阅

canal是阿里的一款开源工具,纯java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。

安装canal

下载安装包:
https://github.com/alibaba/canal/releases
canal.kafka-1.1.0.tar.gz

解压到固定目录:

tar -zxvf canal.kafka-1.1.0.tar.gz

修改配置

vi conf/example/instance.properties

#数据库地址
canal.instance.master.address=192.168.56.104:3306

#数据库用户密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal

#数据库字符集
canal.instance.connectionCharset=UTF-8

#默认数据库
canal.instance.defaultDatabaseName =zgai_db

vim /usr/local/canal/conf/canal.properties

#canalid
canal.id= 1

#canal地址
canal.ip=192.168.56.102

#zookeeper地址
canal.zkServers=192.168.56.102:2181

vim /usr/local/canal/conf/kafka.yml

#kafka地址
servers: 192.168.56.101:9092

# canal的批次大小,单位 k
canalBatchSize: 50

#topic
    topic: mytopic

配置详细介绍

canal.properties

canal配置主要分为两部分定义:

  1. instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)
    |参数名字 |参数说明 |默认值|
    | ------------- |:-------------😐 -----😐
    |canal.destinations| 当前server上部署的instance列表| 无
    |canal.conf.dir| conf/目录所在的路径 |…/conf
    |canal.auto.scan| 开启instance自动扫描如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动b. instance目录删除:卸载对应instance配置,如已启动则进行关闭c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 | true
    |canal.auto.scan.interval |instance自动扫描的间隔时间,单位秒| 5
    |canal.instance.global.mode| 全局配置加载方式 |spring
    |canal.instance.global.lazy| 全局lazy模式 |false
    |canal.instance.global.manager.address| 全局的manager配置方式的链接信息| 无
    |canal.instance.global.spring.xml| 全局的spring配置方式的组件文件 |classpath:spring/memory-instance.xml (spring目录相对于canal.conf.dir)
    |canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml… | instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式命名规则:canal.instance.{name}.xxx |无
    |canal.instance.tsdb.spring.xml| v1.0.25版本新增,全局的tsdb配置方式的组件文件 |classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir)

  2. common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享. 【instance.properties配置定义优先级高于canal.properties】

参数名字参数说明默认值
canal.id每个canal server实例的唯一标识,暂无实际意义1
canal.ip canalserver绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.portcanal server提供socket服务的端口11111
canal.zkServerscanal server链接zookeeper集群的链接信息 例子:10.20.144.22:2181,10.20.144.51:2181
canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000
canal.instance.memory.batch.modecanal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小MEMSIZE
canal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384
canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024
canal.instance.transactionn.size最大事务完整解析的长度支持超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性1024
canal.instance.fallbackIntervalInSecondscanal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢60
canal.instance.detecting.enable是否开启心跳检查false
canal.instance.detecting.sql心跳检查sql insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.interval.time 心跳检查频率,单位秒3
canal.instance.detecting.retry.threshold心跳检查失败重试次数3
canal.instance.detecting.heartbeatHaEnable心跳检查失败后,是否开启自动mysql自动切换说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据false
canal.instance.network.receiveBufferSize网络链接参数,SocketOptions.SO_RCVBUF16384
canal.instance.network.sendBufferSize网络链接参数,SocketOptions.SO_SNDBUF16384
canal.instance.network.soTimeout网络链接参数,SocketOptions.SO_TIMEOUT30
canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名true
canal.instance.filter.query.dcl是否忽略dcl语句false
canal.instance.filter.query.dml是否忽略dml语句(mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)false
canal.instance.filter.query.ddl是否忽略ddl语句false
canal.instance.filter.table.error是否忽略binlog表结构获取失败的异常(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)false
canal.instance.filter.rows是否dml的数据变更事件(主要针对用户只订阅ddl/dcl的操作)false
canal.instance.filter.transaction.entry是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件false
canal.instance.binlog.format支持的binlog format格式列表(otter会有支持format格式限制)ROW,STATEMENT,MIXED
canal.instance.binlog.image支持的binlog image格式列表(otter会有支持format格式限制)FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolationddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)false
canal.instance.parser.parallel是否开启binlog并行解析模式(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)true
canal.instance.parser.parallelBufferSizebinlog并行解析的异步ringbuffer队列(必须为2的指数)256
canal.instance.tsdb.enable是否开启tablemeta的tsdb能力true
canal.instance.tsdb.dir主要针对h2-tsdb.xml时对应h2文件的存放目录,默认conf/xx/h2.mv.db
canal.instance.tsdb.urljdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)jdbc:h2: ${canal.instance.tsdb.dir} /h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsernamejdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canal
canal.instance.tsdb.dbPasswordjdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canal
canal.instance.rds.accesskeyaliyun账号的ak信息 (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
canal.instance.rds.secretkeyaliyun账号的sk信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

instance.properties

参数名字参数说明默认值
canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定)
canal.instance.master.addressmysql主库链接地址127.0.0.1:3306
canal.instance.master.journal.namemysql主库链接时起始的binlog文件
canal.instance.master.positionmysql主库链接时起始的binlog偏移量
canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳
canal.instance.gtidon是否启用mysql gtid的订阅模式false
canal.instance.master.gtidmysql主库链接时对应的gtid位点
canal.instance.dbUsernamemysql数据库帐号canal
canal.instance.dbPasswordmysql数据库密码canal
canal.instance.defaultDatabaseNamemysql链接时默认schema
canal.instance.connectionCharsetmysql 数据解析编码UTF-8
canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 常见例子: 1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打头的表:canal\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔).\…
canal.instance.filter.black.regexmysql 数据解析表的黑名单,表达式规则见白名单的规则
canal.instance.rds.instanceIdaliyun rds对应的实例id信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
说明:
mysql链接时的起始位置
  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
    #####mysql解析关注表定义
  • 标准的Perl正则,注意转义时需要双斜杠:\
    #####mysql链接的编码
  • 目前canal版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码

更多文章关注公众号
![在这里插入图片描述](https://img-blog.csdnimg.cn/20210325093921176.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3hpYW95dV9CRA==,size_16,color_FFFFFF,t_70

更多:canal源码分析与应用专栏
——————————————————————————————————
作者:桃花惜春风
转载请标明出处,原文地址:
https://blog.csdn.net/xiaoyu_BD/article/details/82113861
如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!

  • 6
    点赞
  • 25
    收藏
    觉得还不错? 一键收藏
  • 0
    评论
好的,下面是一个基于 CanalKafka 和 PySpark Streaming 的实时案例,用于将 MySQL 数据库中的增量数据读取并发送到 Kafka,并通过 PySpark Streaming 进行实时计算。具体步骤如下: 1. 首先需要安装并启动 CanalKafka。 2. 创建一个 Python 脚本,使用 Canal 客户端连接到 Canal Server,并订阅指定的 MySQL 数据库和表。然后将解析出的增量数据发送到 Kafka。 ```python from canal.client import Client from kafka import KafkaProducer import json # 创建Canal客户端对象 client = Client() client.connect(host='127.0.0.1', port=11111) client.check_valid(username=b'', password=b'') # 创建Kafka生产者对象 producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') # 订阅指定的数据库和表 client.subscribe(client_id='1001', destination='example', filter='.*\\..*') try: while True: message = client.get(batch_size=100, timeout=100) for entry in message.get('entries'): if entry.entryType == 'ROWDATA': row_change = entry.rowChange event_type = row_change.eventType for row_data in row_change.rowDatas: row_dict = {} for column in row_data.afterColumns: row_dict[column.name] = column.value # 将解析出的增量数据发送到Kafka producer.send(topic='example', value=json.dumps(row_dict).encode('utf-8')) except Exception as e: print(e) finally: client.disconnect() ``` 上述代码中,首先创建了一个 Canal 客户端对象,并连接到 Canal Server。然后创建了一个 Kafka 生产者对象,用于将解析出的增量数据发送到 Kafka。接着,订阅了指定的 MySQL 数据库和表,并循环获取增量数据。对于每个增量数据,将其转换为字典格式,并使用 `json.dumps` 方法将字典编码为 JSON 字符串,并使用 Kafka 生产者将其发送到指定的 Kafka 主题中。 3. 创建一个 PySpark Streaming 应用程序,从 Kafka 主题中读取增量数据,并进行实时计算。例如,以下代码用于计算每隔5秒钟的单词计数: ```python from pyspark import SparkConf, SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import json # 创建SparkConf对象 conf = SparkConf().setAppName("Real-time Word Count") # 创建SparkContext对象 sc = SparkContext(conf=conf) # 创建StreamingContext对象,每隔5秒钟处理一次数据 ssc = StreamingContext(sc, 5) # 从Kafka读取数据 kafka_params = { "bootstrap.servers": "127.0.0.1:9092", "group.id": "example" } kafka_stream = KafkaUtils.createDirectStream( ssc, ["example"], kafka_params ) # 对Kafka中的每条消息进行解析并扁平化处理 messages = kafka_stream.map(lambda x: json.loads(x[1])) words = messages.flatMap(lambda x: x.values()) # 对每个单词进行计数 word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) # 打印计数结果 word_counts.pprint() # 启动StreamingContext并等待它完成 ssc.start() ssc.awaitTermination() ``` 上述代码中,首先创建了一个 `SparkConf` 对象,用于设置应用程序名称。然后创建了一个 `SparkContext` 对象,用于连接到 Spark 集群。接着,创建了一个 `StreamingContext` 对象,并设置每隔5秒钟处理一次数据。然后使用 `createDirectStream` 方法从 Kafka 主题中读取数据,并使用 `json.loads` 方法将 JSON 字符串转换为字典格式。接着,对字典中的值进行扁平化处理,并使用 `map` 和 `reduceByKey` 方法对每个单词进行计数。最后,使用 `pprint` 方法打印计数结果。最后启动 `StreamingContext` 并等待它完成。 运行上述代码后,您应该可以在终端看到类似以下的结果: ``` ------------------------------------------- Time: 2022-10-20 16:00:05 ------------------------------------------- (PySpark, 1) (Streaming, 1) (Hello, 1) (PySparkStreaming, 1) (World, 1) ------------------------------------------- Time: 2022-10-20 16:00:10 ------------------------------------------- (PySpark, 2) (Streaming, 1) (Hello, 2) (PySparkStreaming, 1) (World, 1) ``` 注意,以上代码仅提供了一个简单的示例,实际的 Canal 和 PySpark Streaming 应用程序可能需要更多的配置和代码来实现特定的需求。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值