Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-29 文章 Leonard Xu


> On Nov 4, 2022, at 2:34 PM, 左岩 <13520871...@163.com> wrote:
> 
> tenv.executeSql("xxx);
> env.execute();


这样使用是不对的,你可以看下这两个方法的java doc

祝好,
Leonard

Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-28 文章 左岩
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里

















在 2022-11-07 10:11:56,"Shengkai Fang"  写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>>  
>> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> >StatementSet statementSet = tenv.createStatementSet();
>> >statementSet.addInsertSql(sql1);
>> >statementSet.addInsertSql(sql2);
>> >TableResult result = statementSet.execute();
>> >result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>>  这个任务给去掉
>> >
>> >
>> >
>> >yinghua...@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >" 'connector' = 'mysql-cdc', " +
>> >" 'server-id' = '5401-5404', " +
>> >" 'scan.startup.mode' = 'latest-offset', " +
>> >//" 'scan.startup.mode' = 'earliest-offset', " +
>> >" 'hostname' = '192.168.0.220', " +
>> >" 'port' = '3306', " +
>> >" 'username' = 'root', " +
>> >" 'password' = 'root', " +
>> >" 'database-name' = 'zy', " +
>> >" 'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >"  'connector' = 'jdbc', " +
>> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >"  'table-name' = 't_stu2', " +
>> >"  'username' = 'root', " +
>> >"  'password' = 'root'  " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> >}
>>


Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-06 文章 Shengkai Fang
你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。

Best,
Shengkai

左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:

>
>
>
>
>
>
>
>
>
> .print(); 去掉也不行,
>  
> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>
>
>
>
>
>
>
>
> 在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>
> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> >StatementSet statementSet = tenv.createStatementSet();
> >statementSet.addInsertSql(sql1);
> >statementSet.addInsertSql(sql2);
> >TableResult result = statementSet.execute();
> >result.getJobClient().get().getJobID().toString();
> >
> >
> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
>  --------这个任务给去掉
> >
> >
> >
> >yinghua...@163.com
> >
> >发件人: 左岩
> >发送时间: 2022-11-04 14:34
> >收件人: user-zh
> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
> >代码如下:控制台打印情况见附件
> >public static void main(String[] args) throws Exception {
> >Configuration conf = new Configuration();
> >conf.setInteger("rest.port", 10041);
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >
> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> >
> >env.setParallelism(1);
> >// 建表
> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
> >"  userid INT, " +
> >"  username string, " +
> >"  age string, " +
> >"  `partition` INT, " +
> >" PRIMARY KEY(userid) NOT ENFORCED " +
> >" ) WITH ( " +
> >" 'connector' = 'mysql-cdc', " +
> >" 'server-id' = '5401-5404', " +
> >" 'scan.startup.mode' = 'latest-offset', " +
> >//" 'scan.startup.mode' = 'earliest-offset', " +
> >" 'hostname' = '192.168.0.220', " +
> >" 'port' = '3306', " +
> >" 'username' = 'root', " +
> >" 'password' = 'root', " +
> >" 'database-name' = 'zy', " +
> >" 'table-name' = 't_stu' " +
> >")");
> >
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
> >
> >
> >// 建一个目标表,用来存放查询结果
> >tenv.executeSql(
> >"CREATE TABLE flink_t_stu2 ( " +
> >"  userid INT, " +
> >"  username string, " +
> >"  age string, " +
> >"  `partition` INT, " +
> >" PRIMARY KEY(userid) NOT ENFORCED " +
> >" ) WITH ( " +
> >"  'connector' = 'jdbc', " +
> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
> >"  'table-name' = 't_stu2', " +
> >"  'username' = 'root', " +
> >"  'password' = 'root'  " +
> >")"
> >);
> >
> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
> >"SELECT * FROM flink_t_stu");
> >env.execute();
> >
> >}
>


Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-04 文章 左岩









.print(); 去掉也不行,   
跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢








在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>StatementSet statementSet = tenv.createStatementSet();
>statementSet.addInsertSql(sql1);
>statementSet.addInsertSql(sql2);
>TableResult result = statementSet.execute();
>result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();   
>这个任务给去掉
>
>
>
>yinghua...@163.com
> 
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>"  userid INT, " +
>"  username string, " +
>"  age string, " +
>"  `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'mysql-cdc', " +
>" 'server-id' = '5401-5404', " +
>" 'scan.startup.mode' = 'latest-offset', " +
>//" 'scan.startup.mode' = 'earliest-offset', " +
>" 'hostname' = '192.168.0.220', " +
>" 'port' = '3306', " +
>" 'username' = 'root', " +
>" 'password' = 'root', " +
>" 'database-name' = 'zy', " +
>" 'table-name' = 't_stu' " +
>")");
> 
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
> 
> 
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>"  userid INT, " +
>"  username string, " +
>"  age string, " +
>"  `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>"  'connector' = 'jdbc', " +
>"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>"  'table-name' = 't_stu2', " +
>"  'username' = 'root', " +
>"  'password' = 'root'  " +
>")"
>);
> 
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
> 
>}


FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-04 文章 左岩
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"  userid INT, " +
"  username string, " +
"  age string, " +
"  `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'scan.startup.mode' = 'latest-offset', " +
//" 'scan.startup.mode' = 'earliest-offset', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();


// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
"  userid INT, " +
"  username string, " +
"  age string, " +
"  `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
"  'connector' = 'jdbc', " +
"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
"  'table-name' = 't_stu2', " +
"  'username' = 'root', " +
"  'password' = 'root'  " +
")"
);

tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();

}