工具对比

对比项 Canal Maxwell mysql_streamer
实现语言 Java Java Python
社区活跃度 活跃 活跃 不活跃
高可用 支持 定制 支持
数据落地 定制 落地到kafka 落地到kafka
分区 支持 不支持 不支持
bootstrap 不支持 支持 支持
数据格式 格式自由 固定JSON 固定JSON
文档 较详细 较详细 略粗
随机读 支持 支持 支持

Canal

背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

原理

avatar

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

    高可用

  • 基于zookeeper,使用其watcher和EPHEMERAL节点(和session生命周期绑定)特性实现
  • canal的ha分为两部分,canal server和canal client分别有对应的ha实现
    1. canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
    2. canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
      avatar
  • 大致步骤
    1. canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
    2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
    3. 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
    4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

      部署[部分摘自AdminGuide]

  • MySQL配置

    1. mysql要求 当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48),ps. mysql4.x版本没有经过严格测试,理论上是可以兼容
    2. canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      [mysqld]
      log-bin=mysql-bin #添加这一行就ok
      binlog-format=ROW #选择row模式
      server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

      重启MySQL后检查配置
      mysql> show variables like 'binlog_format';
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | binlog_format | ROW |
      +---------------+-------+

      mysql> show variables like 'log_bin';
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | log_bin | ON |
      +---------------+-------+

      创建用户并授权
      CREATE USER canal IDENTIFIED BY 'canal';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
      FLUSH PRIVILEGES;

      检查权限
      show grants for 'canal' ;
      +----------------------------------------------------------------------------------------------------------------------------------------------+
      | Grants for canal@% |
      +----------------------------------------------------------------------------------------------------------------------------------------------+
      | GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |
      +----------------------------------------------------------------------------------------------------------------------------------------------+
  • Canal配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    下载 canal.deployer-1.1.2.tar.gz  https://github.com/alibaba/canal/releases
    mkdir -p /opt/canal.deployer && tar zxvf canal.deployer-1.1.2.tar.gz -C /opt/canal.deployer
    cd /opt/canal.deployer/conf
    [root@localhost conf]# tree
    .
    ├── canal.properties ## 系统配置
    ├── example ## instance配置
    │   └── instance.properties
    ├── logback.xml ## 日志配置文件
    ├── metrics
    │   └── Canal_instances_tmpl.json
    └── spring ## spring instance
    ├── base-instance.xml
    ├── default-instance.xml
    ├── file-instance.xml
    ├── group-instance.xml
    ├── memory-instance.xml
    └── tsdb
    ├── h2-tsdb.xml
    ├── mysql-tsdb.xml
    ├── sql
    │   └── create_table.sql
    └── sql-map
    ├── sqlmap-config.xml
    ├── sqlmap_history.xml
    └── sqlmap_snapshot.xml

    由于conf/canal.properties默认开启了canal.auto.scan,server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance

    修改conf/example/instance.properties
    # MySQL地址
    canal.instance.master.address=10.72.9.200:3308
    # 账号密码
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****

    启动
    bin/startup.sh

    查看日志: logs/canal/canal.log 显示如下
    Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
    Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
    Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
    2019-03-13 16:38:54.693 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2019-03-13 16:38:54.774 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2019-03-13 16:38:54.788 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2019-03-13 16:38:54.906 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.72.29.137:11111]
    2019-03-13 16:38:56.245 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
    2019-03-13 16:38:56.552 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
    2019-03-13 16:38:57.029 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    2019-03-13 16:38:57.125 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
    {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3308}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":677,"serverId":1,"timestamp":1552465288000}}
    2019-03-13 16:38:57.166 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    2019-03-13 16:38:57.580 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=677,serverId=1,gtid=,timestamp=1552465288000] cost : 523ms , the next step is binlog dump

    logs/example/example.log 显示如下
    2019-03-13 16:38:55.975 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2019-03-13 16:38:55.984 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2019-03-13 16:38:56.245 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
    2019-03-13 16:38:56.307 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2019-03-13 16:38:56.308 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2019-03-13 16:38:56.552 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
    2019-03-13 16:38:56.911 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
    2019-03-13 16:38:57.029 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    2019-03-13 16:38:57.091 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to .*\..*
    2019-03-13 16:38:57.091 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
    2019-03-13 16:38:57.125 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
    {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3308}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":677,"serverId":1,"timestamp":1552465288000}}
    2019-03-13 16:38:57.580 [destination = example , address = /127.0.0.1:3308 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=677,serverId=1,gtid=,timestamp=1552465288000] cost : 523ms , the next step is binlog dump
    2019-03-13 16:38:58.866 [New I/O server worker #1-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to .*\..*
  • Java客户端 wiki

    1. 其他参考代码 ClientAPI SimpleCanalClientTest.java
    2. 采用wiki示例代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      maven 添加依赖:
      <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.2</version>
      </dependency>

      CancalTest.java
      package charles.org.canal;

      import java.net.InetSocketAddress;
      import java.util.List;


      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.protocol.Message;
      import com.alibaba.otter.canal.protocol.CanalEntry.Column;
      import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
      import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
      import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
      import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
      import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


      /**
      * @Auther: Charles
      * @Date: 2019-03-12 19:33
      * @Description:
      */
      public class CanalTest {
      public static void main(String args[]) {
      // 创建链接
      CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");
      int batchSize = 1000;
      connector.connect();
      connector.subscribe(".*\\..*");
      connector.rollback();
      while (true) {
      Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
      long batchId = message.getId();
      int size = message.getEntries().size();
      if (batchId == -1 || size == 0) {
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      }
      } else {
      printEntry(message.getEntries());
      }

      connector.ack(batchId); // 提交确认
      // connector.rollback(batchId); // 处理失败, 回滚数据
      }
      // connector.disconnect();
      }

      private static void printEntry(List<Entry> entrys) {
      for (Entry entry : entrys) {
      if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
      continue;
      }

      RowChange rowChage = null;
      try {
      rowChage = RowChange.parseFrom(entry.getStoreValue());
      } catch (Exception e) {
      throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
      e);
      }

      EventType eventType = rowChage.getEventType();
      System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
      entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
      entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
      eventType));

      for (RowData rowData : rowChage.getRowDatasList()) {
      if (eventType == EventType.DELETE) {
      printColumn(rowData.getBeforeColumnsList());
      } else if (eventType == EventType.INSERT) {
      printColumn(rowData.getAfterColumnsList());
      } else {
      System.out.println("-------&gt; before");
      printColumn(rowData.getBeforeColumnsList());
      System.out.println("-------&gt; after");
      printColumn(rowData.getAfterColumnsList());
      }
      }
      }
      }

      private static void printColumn(List<Column> columns) {
      for (Column column : columns) {
      System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
      }
      }
      }

      更改数据: update vb_credential set last_auth_time = now() where id =1
      可以看到Java控制台打印出log如下:
      ================&gt; binlog[mysql-bin.000001:1470] , name[data-center,vb_credential] , eventType : UPDATE
      -------&gt; before
      id : 1 update=false
      app_account : vbim.sjxt update=false
      app_key : 5E7502F0DCCF1A451C27C64773441449DF628669F9436FF2389F0E577F7F9780F6A25D28C7E2D59F8B17CE8D39E5EDA5 update=false
      app_secret : NzAwNlQ2aGMzMjhVMllhQzNsMThQMjY1UTRZbjNZVUM= update=false
      type : 1 update=false
      expire_time : update=false
      last_auth_time : update=false
      is_del : 0 update=false
      creater_time : 2018-09-10 10:15:43 update=false
      creater_id : update=false
      update_time : update=false
      update_id : update=false
      -------&gt; after
      id : 1 update=false
      app_account : vbim.sjxt update=false
      app_key : 5E7502F0DCCF1A451C27C64773441449DF628669F9436FF2389F0E577F7F9780F6A25D28C7E2D59F8B17CE8D39E5EDA5 update=false
      app_secret : NzAwNlQ2aGMzMjhVMllhQzNsMThQMjY1UTRZbjNZVUM= update=false
      type : 1 update=false
      expire_time : update=false
      last_auth_time : 2019-03-13 08:40:41 update=true
      is_del : 0 update=false
      creater_time : 2018-09-10 10:15:43 update=false
      creater_id : update=false
      update_time : update=false
      update_id : update=false

Maxwell

基本使用(采用kafka) 其他用法详见 QuickStart

  • 开启MySQL bilog (和配置canal时一样)
  • 创建MySQL用户

    1
    2
    3
    CREATE USER 'maxwell'@'%' IDENTIFIED BY 'password';
    -- GRANT ALL ON maxwell.* TO 'maxwell'@'%';
    GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
  • 下载解压

    1
    2
    3
    curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz \
    | tar zxvf -
    cd maxwell-1.20.0
  • 启动

    1. –output_ddl=true (默认false) 是否输出ddl语句,默认修改表结构/创建表/删除表等DDL操作,不会发送消息,可单独指定ddl发送的topic ddl_kafka_topic
    2. maxwell被强制kill掉 (kill -9) ,修改表数据后,重新启动maxwell 在maxwell宕掉这段时间的更新,能正常检测并发送消息,循环插入10000条数据,中途强制停止maxwell,然后再启动,会出现重复发送的消息,但是不会出现丢失,所以在消费时要做去重。
    3. maxwell在启动时 会在schema_database参数指定的库(默认maxwell)里创建同步信息,以便下次获取heartbeats,position。详见 com.zendesk.maxwell.schma.MysqlPositionStore类
    4. custom_producer.factory 属性指定producer类(自定义producer切入点) 详见CustomProducerFactory.java
    5. 关于事务: commit之后会收到kafka消息(多条变动就会收到多条消息,且保持修改的消息顺序)。若执行ROLLBACK ,也不会发送kafka消息。
    6. 关于消息去重: 将 output_binlog_position 设为true,可以依据 json中position字段来判断是否重复,即使是同一事务中的更新多条数据,也可以根据position来区分
      1
      bin/maxwell --user='maxwell' --password='password' --port=3308 --host='10.72.9.200' --replica_server_id=1 --schema_database=test_maxwell --producer=kafka --kafka.bootstrap.servers=10.72.9.200:9092,10.72.9.201:9092,10.72.9.202:9092 --kafka_topic=maxwell_master --output_ddl=true
  • kafka监听

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    public class LogKafkaConsumer {
    private static KafkaConsumer<String, String> consumer;

    static {
    consumer = createConsumer();
    }

    public static void main(String[] args) {
    // consumer.subscribe(Arrays.asList("spring-kafka-logs"));
    consumer.subscribe(Arrays.asList("maxwell_master"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    // System.out.printf("LogKafkaConsumer:\toffset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    System.out.println("maxwell 消息时间戳:" + record.timestamp() + " offset:" + record.offset() + " " + record.value());
    }
    }
    }

    private static KafkaConsumer createConsumer() {
    Properties properties = new Properties();
    properties.put(BOOTSTRAP_SERVERS_CONFIG, "10.72.9.200:9092,10.72.9.201:9092,10.72.9.202:9092");
    properties.put(GROUP_ID_CONFIG, "maxwell_group");
    properties.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
    properties.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    return new KafkaConsumer(properties);
    }
    }
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    SQL:
    insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';

    kafka消息:
    maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "xid": 940752,
    "commit": true,
    "data": { "id":1, "daemon": "Stanislaw Lem" }
    }

    SQL:
    update test.maxwell set daemon = 'firebus! firebus!' where id = 1;

    kafka消息:
    maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus! Firebus!"},
    "old": {"daemon": "Stanislaw Lem"}
    }