公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
在做测试的时候定义一张cdc源表和一张sink表
CREATE TABLE pvuv_test (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test'
);
CREATE TABLE pvuv_test_back (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test_back'
);
但是在通过SQL Client执行下面语句的时候,报错
INSERT INTO pvuv_test_back
SELECT * FROM pvuv_test;
---------------------------------
报错信息如下
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for 
identifier 'mysql-cdc' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
Available factory identifiers are:
blackhole
elasticsearch-6
kafka
print


Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里


最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢

回复