Docker构建Canal集群
条评论在MacOS上使用Docker构建Canal集群
Canal基本使用
集群环境
- 操作系统: MacOS 10.14.2
- Docker版本: Server Version: 18.09.0
- Canal版本: 1.1.2
- Zookeeper版本: 3.4.13
- MySQL版本: 5.6.43
- Kafka版本: kafka_2.12-2.1.1
zookeeper集群
下载zookeeper
1
curl -O https://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.13.tar.gz
Dockerfile
1
2
3
4
5
6
7
8FROM alpine/jre8
MAINTAINER charles "amwfhv@yeah.net"
RUN apk add --no-cache \
bash \
su-exec
ADD zookeeper-3.4.13.tar.gz /
ENTRYPOINT ["/zookeeper-3.4.13/bin/zkServer.sh"]
CMD ["start-foreground"]配置文件
1
2
3
4
5
6
7dataDir=/zookeeper-3.4.13/data
dataLogDir=/zookeeper-3.4.13/log
clientPort=2181
#集群配置
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888单机运行(去掉zoo.cfg中集群配置)
1
docker run --rm -it -v `pwd`/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg zk /bin/bash
myid
1
2
3echo "1" >> myid_1
echo "2" >> myid_2
echo "3" >> myid_3docker-compose编排
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50version: '2'
services:
zk1:
build:
context: ./
dockerfile: ./Dockerfile
container_name: zk1
ports:
- "2181:2181"
networks:
zk-cluster: # 使用zk-cluster网络
ipv4_address: 192.168.111.111 #固定容器IP为192.168.111.111
volumes:
- ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./myid_1:/zookeeper-3.4.13/data/myid
zk2:
build:
context: ./
dockerfile: ./Dockerfile
container_name: zk2
ports:
- "2182:2181"
networks:
zk-cluster: # 使用zk-cluster网络
ipv4_address: 192.168.111.112 #固定容器IP为192.168.111.112
volumes:
- ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./myid_2:/zookeeper-3.4.13/data/myid
zk3:
build:
context: ./
dockerfile: ./Dockerfile
container_name: zk3
ports:
- "2183:2181"
networks:
zk-cluster: # 使用zk-cluster网络
ipv4_address: 192.168.111.113 #固定容器IP为192.168.111.113
volumes:
- ./zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./myid_3:/zookeeper-3.4.13/data/myid
networks:
zk-cluster:
driver: bridge
# 自定义网络信息
ipam:
driver: default
config:
- subnet: 192.168.111.0/24 # 子网段
gateway: 192.168.111.1 # 网关查看节点状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14docker exec -it 29f801542d70 /zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
docker exec -it 2325d87530f9 /zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader
docker exec -it f5d8b209d22f /zookeeper-3.4.13/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
MySQL(单机)
拉取镜像
1
docker pull mysql/mysql-server:5.6
配置mysql文件(
主要开启binlog
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59vi my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.6/en/server-configuration-defaults.html
[mysql]
default-character-set=utf8mb4
[mysqld]
#不加 user=root会报错: Fatal error: Please read "Security" section of the manual to find out how to run mysqld as root!
user=root
port = 3306
character_set_server=utf8mb4
#错误日志
log-error=/logs/error.log
#开启慢查询日志记录
slow_query_log=1
#查询时间超过2秒的sql语句会被记录
long_query_time=2
#记录没有使用索引的查询
log_queries_not_using_indexes=1
#记录慢查询日志的文件地址
slow-query-log-file=/logs/slowquery.log
#记录用户所有的操作
general_log=ON
#操作日志
general_log_file=/logs/operation.log
#bin Log
#log-bin=/logs/bin-log.log
###开启binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# Recommended in standard MySQL setup
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
[mysqld_safe]
pid-file=/var/run/mysqld/mysqld.pid测试单机运行
1
docker run -p 3306:3306 --name mysql5.6 -v `pwd`/my.cnf:/etc/my.cnf -v `pwd`/logs:/logs -v `pwd`/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=password -d mysql/mysql-server:5.6
测试进入命令行(
容器ID为:897495c2b60d
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16docker exec -it 897495c2b60d mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 355
Server version: 5.6.42-log MySQL Community Server (GPL)
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
Canal
修改conf/canal.properties
- 增加zkServers配置,多个zk用逗号分隔
canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- 采用自行配置canal镜像,非官方docker处理方式,镜像体积缩小至167M。
1
2这种方式会报错与MySQLsocket通信异常...
docker run -it --rm --link mysql5.6 -p 1111:11111 -p 1112:11112 -v `pwd`/canal-signle.properties:/canal/conf/canal.properties -v `pwd`/example:/canal/conf/example canal
- 增加zkServers配置,多个zk用逗号分隔
参考官方脚本,精简后运行
- -e参数里可以指定以前canal.properties/instance.properties里的所有配置的key和value,canal-server启动时会有限读取-e指定的变量
- 有相关容器目录需要挂载出来,请自行挂载。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62bootstrap.sh
#!/bin/bash
current_path=`pwd`
case "`uname`" in
Linux)
bin_abs_path=$(readlink -f $(dirname $0))
;;
*)
bin_abs_path=`cd $(dirname $0); pwd`
;;
esac
if [ -z "$JAVA" ] ; then
JAVA=$(which java)
fi
base=${bin_abs_path}/..
canal_conf=$base/conf/canal.properties
logback_configurationFile=$base/conf/logback.xml
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
for i in $base/lib/*;
do CLASSPATH=$i:"$CLASSPATH";
done
CLASSPATH="$base/conf:$CLASSPATH";
exec $JAVA $JAVA_OPTS $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher
Dockerfile:
FROM alpine/jre8
MAINTAINER charles "amwfhv@yeah.net"
RUN apk add --no-cache \
bash \
su-exec
ADD canal.deployer-1.1.2.tar.gz /canal/
ADD bootstrap.sh /canal/bin/
EXPOSE 11111 11112
ENTRYPOINT ["/canal/bin/bootstrap.sh"]
运行:
docker run -it \
-e canal.zkServers=10.72.29.130:2181,10.72.29.130:2182,10.72.29.130:2183 \
-e canal.auto.scan=false \
-e canal.destinations=canal_test \
-e canal.instance.master.address=10.72.29.130:3308 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
--name=canal_test -p 22222:11111 -p 22223:11112 canal
运行后容器内会生成目录文件:
/conf/canal_test 即/conf/${canal.destinations} 目录下有 h2.mv.db meta.dat
/logs/canal/canal.log
/logs/canal_test/canal_test.log /logs/canal_test/meta.log
运行后在zk中查询数据
docker exec -it d355b93ad4a0 /zookeeper-3.4.13/bin/zkCli.sh
ls /otter/canal/cluster
[172.17.0.3:11111]
ZK + Canal 集群 docker-compose编排
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117文件目录
.
├── canal
│ ├── Dockerfile
│ ├── bootstrap.sh
│ ├── canal.deployer-1.1.2.tar.gz
├── docker-compose.yml
└── zk
├── Dockerfile
├── myid_1
├── myid_2
├── myid_3
├── zoo.cfg
└── zookeeper-3.4.13.tar.gz
docker-compose.yml
version: '2'
services:
zk1:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk1
ports:
- "2181:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.111 #固定容器IP为192.168.222.111
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_1:/zookeeper-3.4.13/data/myid
zk2:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk2
ports:
- "2182:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.112 #固定容器IP为192.168.222.112
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_2:/zookeeper-3.4.13/data/myid
zk3:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk3
ports:
- "2183:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.113 #固定容器IP为192.168.222.113
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_3:/zookeeper-3.4.13/data/myid
canal1:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal1
ports:
- "11111:11111"
- "11112:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.114 #固定容器IP为192.168.222.114
environment:
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal1
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
canal2:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal2
ports:
- "11121:11111"
- "11122:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.115 #固定容器IP为192.168.222.115
environment:
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal2
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
networks:
canal-cluster:
driver: bridge
# 自定义网络信息
ipam:
driver: default
config:
- subnet: 192.168.222.0/24 # 子网段
gateway: 192.168.222.1 # 网关
Canal监听binlog并发送至MQ
- canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
单节点测试发送至MQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15docker run -it \
-e canal.serverMode=kafka \
-e canal.mq.topic=canal_test \
-e canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192 \
-e canal.mq.flatMessage=true \
-e canal.mq.acks=all \
-e canal.auto.scan=false \
-e canal.destinations=canal_test \
-e canal.instance.master.address=10.72.29.130:3308 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
--name=canal_test -p 22222:11111 -p 22223:11112 canal测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63SQL:
UPDATE `data-center`.`vb_credential` SET `app_acount` = 'vbim.test5' WHERE id = 4
Kafka监听到消息:
{
"data": [
{
"id": "4",
"app_account": "vbim.test5",
"app_key": "F6F1EEC9C195B0F10AE74812A938EFBDD6561C4016B9B0F65FE224F921C09BDAC7B6A5E94CDDF10014C82938FC2ECACA",
"app_secret": "bjJlazVTT0M5MzBCYTBPUjQxMTRBblQ2OXRzTDM1NDA=",
"type": "1",
"expire_time": "2018-09-20 13:50:16",
"last_auth_time": "2018-09-20 13:47:16",
"is_del": "1",
"creater_time": "2018-09-19 16:09:03",
"creater_id": null,
"update_time": null,
"update_id": null
}
],
"database": "data-center",
"es": 1552620281000,
"id": 2,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"app_account": "varchar(64)",
"app_key": "varchar(255)",
"app_secret": "varchar(255)",
"type": "smallint(1)",
"expire_time": "datetime",
"last_auth_time": "datetime",
"is_del": "tinyint(1)",
"creater_time": "datetime",
"creater_id": "varchar(64)",
"update_time": "datetime",
"update_id": "varchar(64)"
},
"old": [
{
"app_account": "vbim.test4"
}
],
"sql": "",
"sqlType": {
"id": -5,
"app_account": 12,
"app_key": 12,
"app_secret": 12,
"type": 5,
"expire_time": 93,
"last_auth_time": 93,
"is_del": -7,
"creater_time": 93,
"creater_id": 12,
"update_time": 93,
"update_id": 12
},
"table": "vb_credential",
"ts": 1552620281658,
"type": "UPDATE"
}Canal HA 服务编排
- Canal HA模式需要两个相同的节点instance name一样,挂掉后自动切换(官方描述:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111version: '2'
services:
zk1:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk1
ports:
- "2181:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.111 #固定容器IP为192.168.222.111
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_1:/zookeeper-3.4.13/data/myid
zk2:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk2
ports:
- "2182:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.112 #固定容器IP为192.168.222.112
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_2:/zookeeper-3.4.13/data/myid
zk3:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk3
ports:
- "2183:2181"
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.113 #固定容器IP为192.168.222.113
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_3:/zookeeper-3.4.13/data/myid
canal1:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal1
ports:
- "11111:11111"
- "11112:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.114 #固定容器IP为192.168.222.114
environment:
- canal.serverMode=kafka
- canal.mq.topic=canal_test
- canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
- canal.mq.flatMessage=true
- canal.mq.acks=all
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
canal2:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal2
ports:
- "11121:11111"
- "11122:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
canal-cluster: # 使用canal-cluster网络
ipv4_address: 192.168.222.115 #固定容器IP为192.168.222.115
environment:
- canal.serverMode=kafka
- canal.mq.topic=canal_test
- canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
- canal.mq.flatMessage=true
- canal.mq.acks=all
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
networks:
canal-cluster:
driver: bridge
# 自定义网络信息
ipam:
driver: default
config:
- subnet: 192.168.222.0/24 # 子网段
gateway: 192.168.222.1 # 网关
- Canal HA模式需要两个相同的节点instance name一样,挂掉后自动切换(官方描述:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置)
SpringBoot监听程序
Dockerfile
1
2
3
4FROM alpine/jre8
MAINTAINER charles "amwfhv@yeah.net"
ADD target/spring-cloud-stream.jar /opt/spring-cloud-stream.jar
ENTRYPOINT ["java" ,"-jar","-Dspring.profiles.active=docker","/opt/spring-cloud-stream.jar"]pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>charles.org</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.M9</spring-cloud.version>
</properties>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.yml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.1.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<layout>JAR</layout>
</configuration>
</plugin>
</plugins>
</build>
</project>application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17server:
port: 7890
spring:
application:
name: canal_mq_consumer
cloud:
stream:
kafka:
binder:
brokers: kafka1:9092,kafka2:9092,kafka3:9092
default-binder: kafka
bindings:
canal_test:
destination: canal_test
group: canal_test_group
partitioned: false
content-type: application/jsonjava代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package charles.org.stream.receive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class ReceiveService {
private static final Logger logger = LoggerFactory.getLogger(ReceiveService.class);
@StreamListener("canal_test")
public void receive(String payload) {
logger.info("接收到消息 {}", payload);
// logger.info("接收到消息 {}", new String((byte[])payload));
}
}
package charles.org.stream.receive;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(StreamBinders.class)
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}
package charles.org.stream.receive;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface StreamBinders {
@Input("canal_test")
SubscribableChannel canalTest();
}
(附) Kafka Docker器群
- 使用自定义网络,由于Kafka固定IP的需求,固定所有在编排的容器IP
下载kafka
1
curl -O http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
server.properties
listeners
一定要配置成为IP地址; 如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异常:org.apache.kafka.common.errors.TimeoutException: Batch Expired
。因为在没有配置advertised.host.name
的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name
,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
node1 配置
server1.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26broker.id=1
listeners=PLAINTEXT://192.168.111.115:9092
advertised.listeners=PLAINTEXT://192.168.111.115:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
delete.topic.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=true
#zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.leader.rebalance.enable=true
inter.broker.protocol.version=1.1.1
log.message.format.version=1.1.1node2 配置
server2.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26broker.id=2
listeners=PLAINTEXT://192.168.111.116:9092
advertised.listeners=PLAINTEXT://192.168.111.116:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
delete.topic.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=true
#zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.leader.rebalance.enable=true
inter.broker.protocol.version=1.1.1
log.message.format.version=1.1.1node3 配置
server3.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26broker.id=3
listeners=PLAINTEXT://192.168.111.117:9092
advertised.listeners=PLAINTEXT://192.168.111.117:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
delete.topic.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=true
#zookeeper.connect=192.168.111.111:2181,192.168.111.112:2181,192.168.111.113:2181
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.leader.rebalance.enable=true
inter.broker.protocol.version=1.1.1
log.message.format.version=1.1.1kafka集群编排
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89version: '2'
services:
zk1:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk1
ports:
- "2181:2181"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.111 #固定容器IP为192.168.111.111
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_1:/zookeeper-3.4.13/data/myid
zk2:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk2
ports:
- "2182:2181"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.112 #固定容器IP为192.168.111.112
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_2:/zookeeper-3.4.13/data/myid
zk3:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk3
ports:
- "2183:2181"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.113 #固定容器IP为192.168.111.113
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_3:/zookeeper-3.4.13/data/myid
kafka1:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka1
ports:
- "9092:9092"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.115 #固定容器IP为192.168.111.115
volumes:
- ./kafka/server1.properties:/kafka_2.12-2.1.1/config/server.properties
kafka2:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka2
ports:
- "9192:9092"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.116 #固定容器IP为192.168.111.116
volumes:
- ./kafka/server2.properties:/kafka_2.12-2.1.1/config/server.properties
kafka3:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka3
ports:
- "9292:9092"
networks:
kafka-cluster: # 使用kafka-cluster网络
ipv4_address: 192.168.111.117 #固定容器IP为192.168.111.117
volumes:
- ./kafka/server3.properties:/kafka_2.12-2.1.1/config/server.properties
networks:
kafka-cluster:
driver: bridge
# 自定义网络信息
ipam:
driver: default
config:
- subnet: 192.168.111.0/24 # 子网段
gateway: 192.168.111.1 # 网关测试
1
2
3
4
5
6
7
8
9
10
11
12
131.创建topic
docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --create --zookeeper zk1:2181 --replication-factor 2 --partitions 1 --topic charles_tpoic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "charles_tpoic".
2.查看topic列表
docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --list --zookeeper zk1:2181
charles_tpoic
3.查看topic状态
docker exec -it dc730ee4db89 /kafka_2.12-2.1.1/bin/kafka-topics.sh --describe --zookeeper zk1:2181 --topic charles_tpoic
Topic:charles_tpoic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: charles_tpoic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
完整编排文件
目录结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16├── canal
│ ├── Dockerfile
│ ├── bootstrap.sh
│ ├── canal.deployer-1.1.2.tar.gz
├── docker-compose.yml
├── spring-cloud-stream
│ ├── Dockerfile
│ └── target
│ └── spring-cloud-stream.jar
└── zk
├── Dockerfile
├── myid_1
├── myid_2
├── myid_3
├── zoo.cfg
└── zookeeper-3.4.13.tar.gz使用外部kafka,MySQL,在编排中去除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115version: '2'
services:
zk1:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk1
ports:
- "2181:2181"
networks:
- canal-cluster
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_1:/zookeeper-3.4.13/data/myid
zk2:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk2
ports:
- "2182:2181"
networks:
- canal-cluster
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_2:/zookeeper-3.4.13/data/myid
zk3:
build:
context: ./zk
dockerfile: ./Dockerfile
container_name: zk3
ports:
- "2183:2181"
networks:
- canal-cluster
volumes:
- ./zk/zoo.cfg:/zookeeper-3.4.13/conf/zoo.cfg
- ./zk/myid_3:/zookeeper-3.4.13/data/myid
canal1:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal1
ports:
- "11111:11111"
- "11112:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
- canal-cluster
environment:
- canal.serverMode=kafka
- canal.mq.topic=canal_test
- canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
- canal.mq.flatMessage=true
- canal.mq.acks=all
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
canal2:
build:
context: ./canal
dockerfile: ./Dockerfile
container_name: canal2
ports:
- "11121:11111"
- "11122:11112"
depends_on:
- zk1
- zk2
- zk3
networks:
- canal-cluster
environment:
- canal.serverMode=kafka
- canal.mq.topic=canal_test
- canal.mq.servers=10.72.8.91:9192,10.72.8.92:9192,10.72.8.93:9192
- canal.mq.flatMessage=true
- canal.mq.acks=all
- canal.zkServers=zk1:2181,zk2:2181,zk3:2181
- canal.auto.scan=false
- canal.destinations=canal
- canal.instance.master.address=10.72.29.130:3308
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- canal.instance.gtidon=false
mq-consumer:
build:
context: ./spring-cloud-stream
dockerfile: ./Dockerfile
container_name: mq_consumer
depends_on:
- zk1
- zk2
- zk3
- canal1
- canal2
networks:
- canal-cluster
networks:
canal-cluster:
driver: bridge