MaxWell与Canal
条评论工具对比
对比项 | Canal | Maxwell | mysql_streamer |
---|---|---|---|
实现语言 | Java | Java | Python |
社区活跃度 | 活跃 | 活跃 | 不活跃 |
高可用 | 支持 | 定制 | 支持 |
数据落地 | 定制 | 落地到kafka | 落地到kafka |
分区 | 支持 | 不支持 | 不支持 |
bootstrap | 不支持 | 支持 | 支持 |
数据格式 | 格式自由 | 固定JSON | 固定JSON |
文档 | 较详细 | 较详细 | 略粗 |
随机读 | 支持 | 支持 | 支持 |
Canal
背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
原理
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
高可用
- 基于zookeeper,使用其watcher和EPHEMERAL节点(和session生命周期绑定)特性实现
- canal的ha分为两部分,canal server和canal client分别有对应的ha实现
- canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
- canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
- 大致步骤
- canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
- 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
部署[部分摘自AdminGuide]
MySQL配置
- mysql要求 当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48),ps. mysql4.x版本没有经过严格测试,理论上是可以兼容
- canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
重启MySQL后检查配置
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
创建用户并授权
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
检查权限
show grants for 'canal' ;
+----------------------------------------------------------------------------------------------------------------------------------------------+
| Grants for canal@% |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |
+----------------------------------------------------------------------------------------------------------------------------------------------+
Canal配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70下载 canal.deployer-1.1.2.tar.gz https://github.com/alibaba/canal/releases
mkdir -p /opt/canal.deployer && tar zxvf canal.deployer-1.1.2.tar.gz -C /opt/canal.deployer
cd /opt/canal.deployer/conf
[root@localhost conf]# tree
.
├── canal.properties ## 系统配置
├── example ## instance配置
│ └── instance.properties
├── logback.xml ## 日志配置文件
├── metrics
│ └── Canal_instances_tmpl.json
└── spring ## spring instance
├── base-instance.xml
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── memory-instance.xml
└── tsdb
├── h2-tsdb.xml
├── mysql-tsdb.xml
├── sql
│ └── create_table.sql
└── sql-map
├── sqlmap-config.xml
├── sqlmap_history.xml
└── sqlmap_snapshot.xml
由于conf/canal.properties默认开启了canal.auto.scan,server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance
修改conf/example/instance.properties
# MySQL地址
canal.instance.master.address=10.72.9.200:3308
# 账号密码
canal.instance.dbUsername=****
canal.instance.dbPassword=****
启动
bin/startup.sh
查看日志: logs/canal/canal.log 显示如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
2019-03-13 16:38:54.693 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-03-13 16:38:54.774 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-03-13 16:38:54.788 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2019-03-13 16:38:54.906 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.72.29.137:11111]
2019-03-13 16:38:56.245 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-03-13 16:38:56.552 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2019-03-13 16:38:57.029 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-03-13 16:38:57.125 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3308}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":677,"serverId":1,"timestamp":1552465288000}}
2019-03-13 16:38:57.166 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
2019-03-13 16:38:57.580 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=677,serverId=1,gtid=,timestamp=1552465288000] cost : 523ms , the next step is binlog dump
logs/example/example.log 显示如下
2019-03-13 16:38:55.975 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-03-13 16:38:55.984 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-03-13 16:38:56.245 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-03-13 16:38:56.307 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-03-13 16:38:56.308 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-03-13 16:38:56.552 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2019-03-13 16:38:56.911 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2019-03-13 16:38:57.029 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-03-13 16:38:57.091 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to .*\..*
2019-03-13 16:38:57.091 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2019-03-13 16:38:57.125 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3308}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":677,"serverId":1,"timestamp":1552465288000}}
2019-03-13 16:38:57.580 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=677,serverId=1,gtid=,timestamp=1552465288000] cost : 523ms , the next step is binlog dump
2019-03-13 16:38:58.866 [New I/O server worker #1-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to .*\..*Java客户端 wiki
- 其他参考代码 ClientAPI SimpleCanalClientTest.java
- 采用wiki示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128maven 添加依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
CancalTest.java
package charles.org.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* @Auther: Charles
* @Date: 2019-03-12 19:33
* @Description:
*/
public class CanalTest {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
int batchSize = 1000;
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
// connector.disconnect();
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
更改数据: update vb_credential set last_auth_time = now() where id =1
可以看到Java控制台打印出log如下:
================> binlog[mysql-bin.000001:1470] , name[data-center,vb_credential] , eventType : UPDATE
-------> before
id : 1 update=false
app_account : vbim.sjxt update=false
app_key : 5E7502F0DCCF1A451C27C64773441449DF628669F9436FF2389F0E577F7F9780F6A25D28C7E2D59F8B17CE8D39E5EDA5 update=false
app_secret : NzAwNlQ2aGMzMjhVMllhQzNsMThQMjY1UTRZbjNZVUM= update=false
type : 1 update=false
expire_time : update=false
last_auth_time : update=false
is_del : 0 update=false
creater_time : 2018-09-10 10:15:43 update=false
creater_id : update=false
update_time : update=false
update_id : update=false
-------> after
id : 1 update=false
app_account : vbim.sjxt update=false
app_key : 5E7502F0DCCF1A451C27C64773441449DF628669F9436FF2389F0E577F7F9780F6A25D28C7E2D59F8B17CE8D39E5EDA5 update=false
app_secret : NzAwNlQ2aGMzMjhVMllhQzNsMThQMjY1UTRZbjNZVUM= update=false
type : 1 update=false
expire_time : update=false
last_auth_time : 2019-03-13 08:40:41 update=true
is_del : 0 update=false
creater_time : 2018-09-10 10:15:43 update=false
creater_id : update=false
update_time : update=false
update_id : update=false
Maxwell
基本使用(采用kafka) 其他用法详见 QuickStart
- 开启MySQL bilog (和配置canal时一样)
创建MySQL用户
1
2
3CREATE USER 'maxwell'@'%' IDENTIFIED BY 'password';
-- GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';下载解压
1
2
3curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz \
| tar zxvf -
cd maxwell-1.20.0启动
- –output_ddl=true (默认false) 是否输出ddl语句,默认修改表结构/创建表/删除表等DDL操作,不会发送消息,可单独指定ddl发送的topic ddl_kafka_topic
- maxwell被强制kill掉 (kill -9) ,修改表数据后,重新启动maxwell 在maxwell宕掉这段时间的更新,能正常检测并发送消息,循环插入10000条数据,中途强制停止maxwell,然后再启动,会出现重复发送的消息,但是不会出现丢失,所以在消费时要做去重。
- maxwell在启动时 会在schema_database参数指定的库(默认maxwell)里创建同步信息,以便下次获取heartbeats,position。详见 com.zendesk.maxwell.schma.MysqlPositionStore类
- custom_producer.factory 属性指定producer类(自定义producer切入点) 详见CustomProducerFactory.java
- 关于事务: commit之后会收到kafka消息(多条变动就会收到多条消息,且保持修改的消息顺序)。若执行ROLLBACK ,也不会发送kafka消息。
- 关于消息去重: 将 output_binlog_position 设为true,可以依据 json中position字段来判断是否重复,即使是同一事务中的更新多条数据,也可以根据position来区分
1
bin/maxwell --user='maxwell' --password='password' --port=3308 --host='10.72.9.200' --replica_server_id=1 --schema_database=test_maxwell --producer=kafka --kafka.bootstrap.servers=10.72.9.200:9092,10.72.9.201:9092,10.72.9.202:9092 --kafka_topic=maxwell_master --output_ddl=true
kafka监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LogKafkaConsumer {
private static KafkaConsumer<String, String> consumer;
static {
consumer = createConsumer();
}
public static void main(String[] args) {
// consumer.subscribe(Arrays.asList("spring-kafka-logs"));
consumer.subscribe(Arrays.asList("maxwell_master"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// System.out.printf("LogKafkaConsumer:\toffset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
System.out.println("maxwell 消息时间戳:" + record.timestamp() + " offset:" + record.offset() + " " + record.value());
}
}
}
private static KafkaConsumer createConsumer() {
Properties properties = new Properties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, "10.72.9.200:9092,10.72.9.201:9092,10.72.9.202:9092");
properties.put(GROUP_ID_CONFIG, "maxwell_group");
properties.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer(properties);
}
}测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28SQL:
insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
kafka消息:
maxwell: {
"database": "test",
"table": "maxwell",
"type": "insert",
"ts": 1449786310,
"xid": 940752,
"commit": true,
"data": { "id":1, "daemon": "Stanislaw Lem" }
}
SQL:
update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
kafka消息:
maxwell: {
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1449786341,
"xid": 940786,
"commit": true,
"data": {"id":1, "daemon": "Firebus! Firebus!"},
"old": {"daemon": "Stanislaw Lem"}
}