0%

Canal

Canal

**anal [kə’næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

https://github.com/alibaba/canal

https://github.com/alibaba/canal/wiki/ClientAPI

Linux中Canal环境配置

查看MySQL是否开启log_bin

1
2
3
4
5
6
7
8
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+

1 row in set (0.00 sec)

如果log_bin关闭,修改MySQL配置开启log_bin

  1. 修改 mysql 的配置文件 my.cnf
1
vi /etc/my.cnf 

追加内容

1
2
3
4
5
6
#binlog文件名
log-bin=mysql-bin
#选择row模式
binlog_format=ROW
#mysql实例id,不能和canal的slaveId重复
server_id=1
  1. 重启 mysql:
1
service mysql restart        
  1. 登录 mysql 客户端,查看 log_bin 变量
1
2
3
4
5
6
7
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
1 row in set (0.00 sec)

下载canal,将canal安装的Linux,修改canal的配置文件,canal解压后的目录结构如图所示

image-20220425113106046
  1. 解压文件
1
tar -zxvf canal.deployer-1.1.4.tar.gz
  1. 解压后进入canal的配置文件,修改配置文件 vi conf/example/instance.properties
1
2
3
4
5
6
7
8
9
#需要改成自己的数据库信息
canal.instance.master.address=192.168.159.160:3306
#需要改成自己的数据库用户名与密码
canal.instance.dbUsername=root
canal.instance.dbPassword=1234
#需要改成同步的数据库表规则,例如只是同步一下表
canal.instance.filter.regex=.*\\..*
#所有的表都同步
#canal.instance.filter.regex=guli_ucenter.ucenter_member#只是同步指定的数据表
  1. 启动canal和关闭canal
1
2
3
cd ./bin
./startup.sh
./stop.sh

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package cn.joinhealth.canalcliennt;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient {
public void run() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.3.206",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
// connector.subscribe(".*\\..*");
connector.subscribe("sf_dev_25.*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
// System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------&gt; before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------&gt; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package cn.joinhealth.canalcliennt;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;

@SpringBootApplication
public class CanalClienntApplication implements CommandLineRunner {
@Resource
private CanalClient canalClient;

public static void main(String[] args) {
SpringApplication.run(CanalClienntApplication.class, args);
}

@Override
public void run(String... strings) throws Exception {
//项目启动,执行canal客户端监听
canalClient.run();
}
}

效果

image-20220425113033804