[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100533#comment-17100533 ] ranqiqiang commented on FLINK-17459: Thanks [~jark]: I get it . If want to do sink like : {code:java} insert ... on duplicate key update id = if( values(modiy_time) >= modiy_time, values(id),id) name = if( values(modiy_time) >= modiy_time, values(name),name) .. modiy_time = if( values(modiy_time) >= modiy_time, values(modiy_time),modiy_time) {code} The modiy_time just like version field ! > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100519#comment-17100519 ] Jark Wu commented on FLINK-17459: - The suggested way is to create the jdbc table using DDL. > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100513#comment-17100513 ] ranqiqiang commented on FLINK-17459: Thanks [~jark] : I used StreamTableEnvironment, but the method is deprecated ! {code:java} StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); // DemoSource extends RichSourceFunction {...} DemoSource source = new DemoSource(); DataStreamSource streamSource = env.addSource(source); tableEnv.createTemporaryView("test",streamSource); // @Deprecated tableEnv.registerTableSink("jdbc_sink", sink); // have any other method ? maybe I used wrong?{code} > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100442#comment-17100442 ] Jark Wu commented on FLINK-17459: - Hi [~michael ran], here is an example: {code:java} tableEnv.createTemporaryView("test",streamSource); JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder() .setOptions(options) .setTableSchema(schema) .setFlushIntervalMills(3000) .build(); tableEnv.registerTableSink("jdbc_sink", sink); tableEnv.sqlUpdate("insert into jdbc_sink select order_id,user_id,status from test"); tableEnv.execute() {code} > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100433#comment-17100433 ] ranqiqiang commented on FLINK-17459: [~jark] can you give an example by JDBCUpsertTableSink to finish append ? > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096221#comment-17096221 ] ranqiqiang commented on FLINK-17459: Thanks , Maybe I did wrong ! use JDBCUpsertTableSink to append code like: {code:java} //代码占位符 tableEnv.createTemporaryView("test",streamSource); Table appendTable = tableEnv.sqlQuery("select order_id,user_id,status from test"); DataStream stream = tableEnv.toAppendStream(appendTable, Row.class); JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder() .setOptions(options) .setTableSchema(schema) .setFlushIntervalMills(3000) .build(); // error : java.lang.UnsupportedOperationException: JDBCUpsertTableSink can not support // If I add "sink.setIsAppendOnly(true)" // error : org.apache.flink.types.Row cannot be cast to org.apache.flink.api.java.tuple.Tuple2 // so I do like : DataStream stream = tableEnv.toRetractStream(appendTable, Row.class); sink.setIsAppendOnly(true) // result is OK ! I just want to insert/append data // I don't know where to set by framework ? if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) { throw new UnsupportedOperationException("JDBCUpsertTableSink can not support "); } {code} > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096120#comment-17096120 ] Jark Wu commented on FLINK-17459: - {code:java} sink.setIsAppendOnly(true); sink.setKeyFields(new String[]{"id"}); {code} They are invoked by framework, you don't need to call them. And you can use it in both append-only and updating queries. Yes. JDBCAppendTableSink will be removed in the future. > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096029#comment-17096029 ] ranqiqiang commented on FLINK-17459: 1.JDBCAppendTableSink will be abandoned ? 2.JDBCUpsertTableSink could not support setQuery , and JDBCUpsertTableSink how to do like as JDBCAppendTableSink to apending ? I could not found document ! > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17459) JDBCAppendTableSink not support flush by flushIntervalMills
[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17095458#comment-17095458 ] Jark Wu commented on FLINK-17459: - We suggest to use {{JDBCUpsertTableSink}} in Table API & SQL which both support append-only queries and updating queries. > JDBCAppendTableSink not support flush by flushIntervalMills > -- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: ranqiqiang >Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)