在MacOS上使用Docker构建Canal集群

Canal基本使用

集群环境

  • 操作系统: MacOS 10.14.2
  • Docker版本: Server Version: 18.09.0
  • Canal版本: 1.1.2
  • Zookeeper版本: 3.4.13
  • MySQL版本: 5.6.43
  • Kafka版本: kafka_2.12-2.1.1

zookeeper集群

  • 下载zookeeper

    1
    curl -O https://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.13.tar.gz
  • Dockerfile

    1
    2
    3
    4
    5
    6
    7
    8
    FROM alpine/jre8
    MAINTAINER charles "amwfhv@yeah.net"
    RUN apk add --no-cache \
    bash \
    su-exec
    ADD zookeeper-3.4.13.tar.gz /
    ENTRYPOINT ["/zookeeper-3.4.13/bin/zkServer.sh"]
    CMD ["start-foreground"]
  • 配置文件

    1
    2
    3
    4
    5
    6
    7
    dataDir=/zookeeper-3.4.13/data
    dataLogDir=/zookeeper-3.4.13/log
    clientPort=2181
    #集群配置
    server.1=zk1:2888:3888
    server.2=zk2:2888:3888
    server.3=zk3:2888:3888
  • 单机运行(去掉zoo.cfg中集群配置)

    1
    docker run --rm -it -v `pwd`/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg zk /bin/bash
  • myid

    1
    2
    3
    echo "1" >> myid_1
    echo "2" >> myid_2
    echo "3" >> myid_3
  • docker-compose编排

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    version: '2'
    services:
    zk1:
    build:
    context: ./
    dockerfile: ./Dockerfile
    container_name: zk1
    ports:
    - "2181:2181"
    networks:
    zk-cluster: # 使用zk-cluster网络
    ipv4_address: 192.168.111.111 #固定容器IP为192.168.111.111
    volumes:
    - ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./myid_1:/zookeeper-3.4.13/data/myid
    zk2:
    build:
    context: ./
    dockerfile: ./Dockerfile
    container_name: zk2
    ports:
    - "2182:2181"
    networks:
    zk-cluster: # 使用zk-cluster网络
    ipv4_address: 192.168.111.112 #固定容器IP为192.168.111.112
    volumes:
    - ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./myid_2:/zookeeper-3.4.13/data/myid
    zk3:
    build:
    context: ./
    dockerfile: ./Dockerfile
    container_name: zk3
    ports:
    - "2183:2181"
    networks:
    zk-cluster: # 使用zk-cluster网络
    ipv4_address: 192.168.111.113 #固定容器IP为192.168.111.113
    volumes:
    - ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./myid_3:/zookeeper-3.4.13/data/myid
    networks:
    zk-cluster:
    driver: bridge
    # 自定义网络信息
    ipam:
    driver: default
    config:
    - subnet: 192.168.111.0/24 # 子网段
    gateway: 192.168.111.1 # 网关
  • 查看节点状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    docker exec -it 29f801542d70 /zookeeper-3.4.13/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
    Mode: follower

    docker exec -it 2325d87530f9 /zookeeper-3.4.13/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
    Mode: leader

    docker exec -it f5d8b209d22f /zookeeper-3.4.13/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
    Mode: follower

MySQL(单机)

  • 拉取镜像

    1
    docker pull mysql/mysql-server:5.6
  • 配置mysql文件(主要开启binlog)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    vi my.cnf
    # For advice on how to change settings please see
    # http://dev.mysql.com/doc/refman/5.6/en/server-configuration-defaults.html
    [mysql]
    default-character-set=utf8mb4
    [mysqld]
    #不加 user=root会报错: Fatal error: Please read "Security" section of the manual to find out how to run mysqld as root!
    user=root
    port = 3306
    character_set_server=utf8mb4
    #错误日志
    log-error=/logs/error.log
    #开启慢查询日志记录
    slow_query_log=1
    #查询时间超过2秒的sql语句会被记录
    long_query_time=2
    #记录没有使用索引的查询
    log_queries_not_using_indexes=1
    #记录慢查询日志的文件地址
    slow-query-log-file=/logs/slowquery.log
    #记录用户所有的操作
    general_log=ON
    #操作日志
    general_log_file=/logs/operation.log
    #bin Log
    #log-bin=/logs/bin-log.log

    ###开启binlog
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1


    #
    # Remove leading # and set to the amount of RAM for the most important data
    # cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
    # innodb_buffer_pool_size = 128M
    #
    # Remove leading # to turn on a very important data integrity option: logging
    # changes to the binary log between backups.
    # log_bin
    #
    # Remove leading # to set options mainly useful for reporting servers.
    # The server defaults are faster for transactions and fast SELECTs.
    # Adjust sizes as needed, experiment to find the optimal values.
    # join_buffer_size = 128M
    # sort_buffer_size = 2M
    # read_rnd_buffer_size = 2M
    datadir=/var/lib/mysql
    socket=/var/lib/mysql/mysql.sock

    # Disabling symbolic-links is recommended to prevent assorted security risks
    symbolic-links=0

    # Recommended in standard MySQL setup
    sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

    [mysqld_safe]
    pid-file=/var/run/mysqld/mysqld.pid
  • 测试单机运行

    1
    docker run -p 3306:3306 --name mysql5.6 -v `pwd`/my.cnf:/etc/my.cnf -v `pwd`/logs:/logs -v `pwd`/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=password -d mysql/mysql-server:5.6
  • 测试进入命令行(容器ID为:897495c2b60d)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    docker exec -it 897495c2b60d mysql -u root -p
    Enter password:

    Welcome to the MySQL monitor. Commands end with ; or \g.
    Your MySQL connection id is 355
    Server version: 5.6.42-log MySQL Community Server (GPL)

    Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

    Oracle is a registered trademark of Oracle Corporation and/or its
    affiliates. Other names may be trademarks of their respective
    owners.

    Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

    mysql>

Canal

  • 修改conf/canal.properties

    1. 增加zkServers配置,多个zk用逗号分隔 canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    2. 采用自行配置canal镜像,非官方docker处理方式,镜像体积缩小至167M。
      1
      2
      这种方式会报错与MySQLsocket通信异常...
      docker run -it --rm --link mysql5.6 -p 1111:11111 -p 1112:11112 -v `pwd`/canal-signle.properties:/canal/conf/canal.properties -v `pwd`/example:/canal/conf/example canal
  • 参考官方脚本,精简后运行

    1. -e参数里可以指定以前canal.properties/instance.properties里的所有配置的key和value,canal-server启动时会有限读取-e指定的变量
    2. 有相关容器目录需要挂载出来,请自行挂载。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      bootstrap.sh
      #!/bin/bash
      current_path=`pwd`
      case "`uname`" in
      Linux)
      bin_abs_path=$(readlink -f $(dirname $0))
      ;;
      *)
      bin_abs_path=`cd $(dirname $0); pwd`
      ;;
      esac
      if [ -z "$JAVA" ] ; then
      JAVA=$(which java)
      fi

      base=${bin_abs_path}/..
      canal_conf=$base/conf/canal.properties
      logback_configurationFile=$base/conf/logback.xml
      JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
      JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
      CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"

      for i in $base/lib/*;
      do CLASSPATH=$i:"$CLASSPATH";
      done
      CLASSPATH="$base/conf:$CLASSPATH";
      exec $JAVA $JAVA_OPTS $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher

      Dockerfile:
      FROM alpine/jre8
      MAINTAINER charles "amwfhv@yeah.net"
      RUN apk add --no-cache \
      bash \
      su-exec
      ADD canal.deployer-1.1.2.tar.gz /canal/
      ADD bootstrap.sh /canal/bin/
      EXPOSE 11111 11112
      ENTRYPOINT ["/canal/bin/bootstrap.sh"]


      运行:
      docker run -it \
      -e canal.zkServers=10.72.29.130:2181,10.72.29.130:2182,10.72.29.130:2183 \
      -e canal.auto.scan=false \
      -e canal.destinations=canal_test \
      -e canal.instance.master.address=10.72.29.130:3308 \
      -e canal.instance.dbUsername=canal \
      -e canal.instance.dbPassword=canal \
      -e canal.instance.connectionCharset=UTF-8 \
      -e canal.instance.tsdb.enable=true \
      -e canal.instance.gtidon=false \
      --name=canal_test -p 22222:11111 -p 22223:11112 canal

      运行后容器内会生成目录文件:
      /conf/canal_test 即/conf/${canal.destinations} 目录下有 h2.mv.db meta.dat
      /logs/canal/canal.log
      /logs/canal_test/canal_test.log /logs/canal_test/meta.log

      运行后在zk中查询数据
      docker exec -it d355b93ad4a0 /zookeeper-3.4.13/bin/zkCli.sh
      ls /otter/canal/cluster
      [172.17.0.3:11111]
  • ZK + Canal 集群 docker-compose编排

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    文件目录 
    .
    ├── canal
    │   ├── Dockerfile
    │   ├── bootstrap.sh
    │   ├── canal.deployer-1.1.2.tar.gz
    ├── docker-compose.yml
    └── zk
    ├── Dockerfile
    ├── myid_1
    ├── myid_2
    ├── myid_3
    ├── zoo.cfg
    └── zookeeper-3.4.13.tar.gz

    docker-compose.yml
    version: '2'
    services:
    zk1:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk1
    ports:
    - "2181:2181"
    networks:
    canal-cluster: # 使用canal-cluster网络
    ipv4_address: 192.168.222.111 #固定容器IP为192.168.222.111
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_1:/zookeeper-3.4.13/data/myid
    zk2:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk2
    ports:
    - "2182:2181"
    networks:
    canal-cluster: # 使用canal-cluster网络
    ipv4_address: 192.168.222.112 #固定容器IP为192.168.222.112
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_2:/zookeeper-3.4.13/data/myid
    zk3:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk3
    ports:
    - "2183:2181"
    networks:
    canal-cluster: # 使用canal-cluster网络
    ipv4_address: 192.168.222.113 #固定容器IP为192.168.222.113
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_3:/zookeeper-3.4.13/data/myid
    canal1:
    build:
    context: ./canal
    dockerfile: ./Dockerfile
    container_name: canal1
    ports:
    - "11111:11111"
    - "11112:11112"
    depends_on:
    - zk1
    - zk2
    - zk3
    networks:
    canal-cluster: # 使用canal-cluster网络
    ipv4_address: 192.168.222.114 #固定容器IP为192.168.222.114
    environment:
    - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    - canal.auto.scan=false
    - canal.destinations=canal1
    - canal.instance.master.address=10.72.29.130:3308
    - canal.instance.dbUsername=canal
    - canal.instance.dbPassword=canal
    - canal.instance.connectionCharset=UTF-8
    - canal.instance.tsdb.enable=true
    - canal.instance.gtidon=false

    canal2:
    build:
    context: ./canal
    dockerfile: ./Dockerfile
    container_name: canal2
    ports:
    - "11121:11111"
    - "11122:11112"
    depends_on:
    - zk1
    - zk2
    - zk3
    networks:
    canal-cluster: # 使用canal-cluster网络
    ipv4_address: 192.168.222.115 #固定容器IP为192.168.222.115
    environment:
    - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    - canal.auto.scan=false
    - canal.destinations=canal2
    - canal.instance.master.address=10.72.29.130:3308
    - canal.instance.dbUsername=canal
    - canal.instance.dbPassword=canal
    - canal.instance.connectionCharset=UTF-8
    - canal.instance.tsdb.enable=true
    - canal.instance.gtidon=false
    networks:
    canal-cluster:
    driver: bridge
    # 自定义网络信息
    ipam:
    driver: default
    config:
    - subnet: 192.168.222.0/24 # 子网段
    gateway: 192.168.222.1 # 网关

Canal监听binlog并发送至MQ

  • canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
    1. Kafka
    2. RocketMQ
    3. 官方详细介绍
  • 单节点测试发送至MQ

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    docker run -it \
    -e canal.serverMode=kafka \
    -e canal.mq.topic=canal_test \
    -e canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192 \
    -e canal.mq.flatMessage=true \
    -e canal.mq.acks=all \
    -e canal.auto.scan=false \
    -e canal.destinations=canal_test \
    -e canal.instance.master.address=10.72.29.130:3308 \
    -e canal.instance.dbUsername=canal \
    -e canal.instance.dbPassword=canal \
    -e canal.instance.connectionCharset=UTF-8 \
    -e canal.instance.tsdb.enable=true \
    -e canal.instance.gtidon=false \
    --name=canal_test -p 22222:11111 -p 22223:11112 canal
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    SQL:
    UPDATE `data-center`.`vb_credential` SET `app_acount` = 'vbim.test5' WHERE id = 4

    Kafka监听到消息:
    {
    "data": [
    {
    "id": "4",
    "app_account": "vbim.test5",
    "app_key": "F6F1EEC9C195B0F10AE74812A938EFBDD6561C4016B9B0F65FE224F921C09BDAC7B6A5E94CDDF10014C82938FC2ECACA",
    "app_secret": "bjJlazVTT0M5MzBCYTBPUjQxMTRBblQ2OXRzTDM1NDA=",
    "type": "1",
    "expire_time": "2018-09-20 13:50:16",
    "last_auth_time": "2018-09-20 13:47:16",
    "is_del": "1",
    "creater_time": "2018-09-19 16:09:03",
    "creater_id": null,
    "update_time": null,
    "update_id": null
    }
    ],
    "database": "data-center",
    "es": 1552620281000,
    "id": 2,
    "isDdl": false,
    "mysqlType": {
    "id": "bigint(20)",
    "app_account": "varchar(64)",
    "app_key": "varchar(255)",
    "app_secret": "varchar(255)",
    "type": "smallint(1)",
    "expire_time": "datetime",
    "last_auth_time": "datetime",
    "is_del": "tinyint(1)",
    "creater_time": "datetime",
    "creater_id": "varchar(64)",
    "update_time": "datetime",
    "update_id": "varchar(64)"
    },
    "old": [
    {
    "app_account": "vbim.test4"
    }
    ],
    "sql": "",
    "sqlType": {
    "id": -5,
    "app_account": 12,
    "app_key": 12,
    "app_secret": 12,
    "type": 5,
    "expire_time": 93,
    "last_auth_time": 93,
    "is_del": -7,
    "creater_time": 93,
    "creater_id": 12,
    "update_time": 93,
    "update_id": 12
    },
    "table": "vb_credential",
    "ts": 1552620281658,
    "type": "UPDATE"
    }
  • Canal HA 服务编排

    1. Canal HA模式需要两个相同的节点instance name一样,挂掉后自动切换(官方描述:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      version: '2'
      services:
      zk1:
      build:
      context: ./zk
      dockerfile: ./Dockerfile
      container_name: zk1
      ports:
      - "2181:2181"
      networks:
      canal-cluster: # 使用canal-cluster网络
      ipv4_address: 192.168.222.111 #固定容器IP为192.168.222.111
      volumes:
      - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk/myid_1:/zookeeper-3.4.13/data/myid
      zk2:
      build:
      context: ./zk
      dockerfile: ./Dockerfile
      container_name: zk2
      ports:
      - "2182:2181"
      networks:
      canal-cluster: # 使用canal-cluster网络
      ipv4_address: 192.168.222.112 #固定容器IP为192.168.222.112
      volumes:
      - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk/myid_2:/zookeeper-3.4.13/data/myid
      zk3:
      build:
      context: ./zk
      dockerfile: ./Dockerfile
      container_name: zk3
      ports:
      - "2183:2181"
      networks:
      canal-cluster: # 使用canal-cluster网络
      ipv4_address: 192.168.222.113 #固定容器IP为192.168.222.113
      volumes:
      - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk/myid_3:/zookeeper-3.4.13/data/myid
      canal1:
      build:
      context: ./canal
      dockerfile: ./Dockerfile
      container_name: canal1
      ports:
      - "11111:11111"
      - "11112:11112"
      depends_on:
      - zk1
      - zk2
      - zk3
      networks:
      canal-cluster: # 使用canal-cluster网络
      ipv4_address: 192.168.222.114 #固定容器IP为192.168.222.114
      environment:
      - canal.serverMode=kafka
      - canal.mq.topic=canal_test
      - canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
      - canal.mq.flatMessage=true
      - canal.mq.acks=all
      - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
      - canal.auto.scan=false
      - canal.destinations=canal
      - canal.instance.master.address=10.72.29.130:3308
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=true
      - canal.instance.gtidon=false

      canal2:
      build:
      context: ./canal
      dockerfile: ./Dockerfile
      container_name: canal2
      ports:
      - "11121:11111"
      - "11122:11112"
      depends_on:
      - zk1
      - zk2
      - zk3
      networks:
      canal-cluster: # 使用canal-cluster网络
      ipv4_address: 192.168.222.115 #固定容器IP为192.168.222.115
      environment:
      - canal.serverMode=kafka
      - canal.mq.topic=canal_test
      - canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
      - canal.mq.flatMessage=true
      - canal.mq.acks=all
      - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
      - canal.auto.scan=false
      - canal.destinations=canal
      - canal.instance.master.address=10.72.29.130:3308
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=true
      - canal.instance.gtidon=false
      networks:
      canal-cluster:
      driver: bridge
      # 自定义网络信息
      ipam:
      driver: default
      config:
      - subnet: 192.168.222.0/24 # 子网段
      gateway: 192.168.222.1 # 网关

SpringBoot监听程序

  • Dockerfile

    1
    2
    3
    4
    FROM alpine/jre8
    MAINTAINER charles "amwfhv@yeah.net"
    ADD target/spring-cloud-stream.jar /opt/spring-cloud-stream.jar
    ENTRYPOINT ["java" ,"-jar","-Dspring.profiles.active=docker","/opt/spring-cloud-stream.jar"]
  • pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>charles.org</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.M9</spring-cloud.version>
    </properties>

    <repositories>
    <repository>
    <id>spring-milestones</id>
    <name>Spring Milestones</name>
    <url>https://repo.spring.io/milestone</url>
    <snapshots>
    <enabled>false</enabled>
    </snapshots>
    </repository>
    </repositories>

    <dependencyManagement>
    <dependencies>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>${spring-cloud.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    </dependencies>
    </dependencyManagement>

    <dependencies>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>2.0.1.RELEASE</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    </dependencies>

    <build>
    <finalName>${project.artifactId}</finalName>
    <resources>
    <resource>
    <directory>src/main/java</directory>
    <includes>
    <include>**/*.properties</include>
    <include>**/*.xml</include>
    <include>**/*.yml</include>
    </includes>
    <filtering>false</filtering>
    </resource>
    <resource>
    <directory>src/main/resources</directory>
    <includes>
    <include>**/*.*</include>
    </includes>
    <filtering>false</filtering>
    </resource>
    </resources>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <version>2.0.1.RELEASE</version>
    <executions>
    <execution>
    <goals>
    <goal>repackage</goal>
    </goals>
    </execution>
    </executions>
    <configuration>
    <layout>JAR</layout>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>
  • application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    server:
    port: 7890
    spring:
    application:
    name: canal_mq_consumer
    cloud:
    stream:
    kafka:
    binder:
    brokers: kafka1:9092,kafka2:9092,kafka3:9092
    default-binder: kafka
    bindings:
    canal_test:
    destination: canal_test
    group: canal_test_group
    partitioned: false
    content-type: application/json
  • java代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package charles.org.stream.receive;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;

    @EnableBinding(Sink.class)
    public class ReceiveService {
    private static final Logger logger = LoggerFactory.getLogger(ReceiveService.class);

    @StreamListener("canal_test")
    public void receive(String payload) {
    logger.info("接收到消息 {}", payload);
    // logger.info("接收到消息 {}", new String((byte[])payload));
    }
    }

    package charles.org.stream.receive;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;

    @EnableBinding(StreamBinders.class)
    @SpringBootApplication
    public class StreamApplication {
    public static void main(String[] args) {
    SpringApplication.run(StreamApplication.class, args);
    }
    }

    package charles.org.stream.receive;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;

    public interface StreamBinders {
    @Input("canal_test")
    SubscribableChannel canalTest();
    }

(附) Kafka Docker器群

  • 使用自定义网络,由于Kafka固定IP的需求,固定所有在编排的容器IP
  • 下载kafka

    1
    curl -O http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
  • server.properties

    1. listeners 一定要配置成为IP地址; 如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异常:org.apache.kafka.common.errors.TimeoutException: Batch Expired。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
  • node1 配置server1.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    broker.id=1
    listeners=PLAINTEXT://192.168.111.115:9092
    advertised.listeners=PLAINTEXT://192.168.111.115:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    delete.topic.enable=true
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=true
    #zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.leader.rebalance.enable=true
    inter.broker.protocol.version=1.1.1
    log.message.format.version=1.1.1
  • node2 配置server2.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    broker.id=2
    listeners=PLAINTEXT://192.168.111.116:9092
    advertised.listeners=PLAINTEXT://192.168.111.116:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    delete.topic.enable=true
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=true
    #zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.leader.rebalance.enable=true
    inter.broker.protocol.version=1.1.1
    log.message.format.version=1.1.1
  • node3 配置server3.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    broker.id=3
    listeners=PLAINTEXT://192.168.111.117:9092
    advertised.listeners=PLAINTEXT://192.168.111.117:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    delete.topic.enable=true
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=true
    #zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.leader.rebalance.enable=true
    inter.broker.protocol.version=1.1.1
    log.message.format.version=1.1.1
  • kafka集群编排

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    version: '2'
    services:
    zk1:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk1
    ports:
    - "2181:2181"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.111 #固定容器IP为192.168.111.111
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_1:/zookeeper-3.4.13/data/myid
    zk2:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk2
    ports:
    - "2182:2181"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.112 #固定容器IP为192.168.111.112
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_2:/zookeeper-3.4.13/data/myid
    zk3:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk3
    ports:
    - "2183:2181"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.113 #固定容器IP为192.168.111.113
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_3:/zookeeper-3.4.13/data/myid

    kafka1:
    build:
    context: ./kafka
    dockerfile: ./Dockerfile
    container_name: kafka1
    ports:
    - "9092:9092"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.115 #固定容器IP为192.168.111.115
    volumes:
    - ./kafka/server1.properties:/kafka_2.12-2.1.1/config/server.properties

    kafka2:
    build:
    context: ./kafka
    dockerfile: ./Dockerfile
    container_name: kafka2
    ports:
    - "9192:9092"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.116 #固定容器IP为192.168.111.116
    volumes:
    - ./kafka/server2.properties:/kafka_2.12-2.1.1/config/server.properties

    kafka3:
    build:
    context: ./kafka
    dockerfile: ./Dockerfile
    container_name: kafka3
    ports:
    - "9292:9092"
    networks:
    kafka-cluster: # 使用kafka-cluster网络
    ipv4_address: 192.168.111.117 #固定容器IP为192.168.111.117
    volumes:
    - ./kafka/server3.properties:/kafka_2.12-2.1.1/config/server.properties
    networks:
    kafka-cluster:
    driver: bridge
    # 自定义网络信息
    ipam:
    driver: default
    config:
    - subnet: 192.168.111.0/24 # 子网段
    gateway: 192.168.111.1 # 网关
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    1.创建topic
    docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --create --zookeeper zk1:2181 --replication-factor 2 --partitions 1 --topic charles_tpoic
    WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    Created topic "charles_tpoic".

    2.查看topic列表
    docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --list --zookeeper zk1:2181
    charles_tpoic

    3.查看topic状态
    docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --describe --zookeeper zk1:2181 --topic charles_tpoic
    Topic:charles_tpoic PartitionCount:1 ReplicationFactor:2 Configs:
    Topic: charles_tpoic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1

完整编排文件

  • 目录结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ├── canal
    │   ├── Dockerfile
    │   ├── bootstrap.sh
    │   ├── canal.deployer-1.1.2.tar.gz
    ├── docker-compose.yml
    ├── spring-cloud-stream
    │   ├── Dockerfile
    │   └── target
    │   └── spring-cloud-stream.jar
    └── zk
    ├── Dockerfile
    ├── myid_1
    ├── myid_2
    ├── myid_3
    ├── zoo.cfg
    └── zookeeper-3.4.13.tar.gz
  • 使用外部kafka,MySQL,在编排中去除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    version: '2'
    services:
    zk1:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk1
    ports:
    - "2181:2181"
    networks:
    - canal-cluster
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_1:/zookeeper-3.4.13/data/myid
    zk2:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk2
    ports:
    - "2182:2181"
    networks:
    - canal-cluster
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_2:/zookeeper-3.4.13/data/myid
    zk3:
    build:
    context: ./zk
    dockerfile: ./Dockerfile
    container_name: zk3
    ports:
    - "2183:2181"
    networks:
    - canal-cluster
    volumes:
    - ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
    - ./zk/myid_3:/zookeeper-3.4.13/data/myid

    canal1:
    build:
    context: ./canal
    dockerfile: ./Dockerfile
    container_name: canal1
    ports:
    - "11111:11111"
    - "11112:11112"
    depends_on:
    - zk1
    - zk2
    - zk3
    networks:
    - canal-cluster
    environment:
    - canal.serverMode=kafka
    - canal.mq.topic=canal_test
    - canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
    - canal.mq.flatMessage=true
    - canal.mq.acks=all
    - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    - canal.auto.scan=false
    - canal.destinations=canal
    - canal.instance.master.address=10.72.29.130:3308
    - canal.instance.dbUsername=canal
    - canal.instance.dbPassword=canal
    - canal.instance.connectionCharset=UTF-8
    - canal.instance.tsdb.enable=true
    - canal.instance.gtidon=false

    canal2:
    build:
    context: ./canal
    dockerfile: ./Dockerfile
    container_name: canal2
    ports:
    - "11121:11111"
    - "11122:11112"
    depends_on:
    - zk1
    - zk2
    - zk3
    networks:
    - canal-cluster
    environment:
    - canal.serverMode=kafka
    - canal.mq.topic=canal_test
    - canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
    - canal.mq.flatMessage=true
    - canal.mq.acks=all
    - canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    - canal.auto.scan=false
    - canal.destinations=canal
    - canal.instance.master.address=10.72.29.130:3308
    - canal.instance.dbUsername=canal
    - canal.instance.dbPassword=canal
    - canal.instance.connectionCharset=UTF-8
    - canal.instance.tsdb.enable=true
    - canal.instance.gtidon=false

    mq-consumer:
    build:
    context: ./spring-cloud-stream
    dockerfile: ./Dockerfile
    container_name: mq_consumer
    depends_on:
    - zk1
    - zk2
    - zk3
    - canal1
    - canal2
    networks:
    - canal-cluster
    networks:
    canal-cluster:
    driver: bridge