Hi, chenxuying

看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
,这是老的option,使用老的option参数还是需要根据query推导主键,
需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
 
Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options>

> 在 2020年7月31日,16:12,chenxuying <cxydeve...@163.com> 写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> "     id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED      " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT 
> A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 

回复