[ https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096221#comment-17096221 ]
ranqiqiang commented on FLINK-17459: ------------------------------------ Thanks , Maybe I did wrong ! use JDBCUpsertTableSink to append code like: {code:java} //代码占位符 tableEnv.createTemporaryView("test",streamSource); Table appendTable = tableEnv.sqlQuery("select order_id,user_id,status from test"); DataStream stream = tableEnv.toAppendStream(appendTable, Row.class); JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder() .setOptions(options) .setTableSchema(schema) .setFlushIntervalMills(3000) .build(); // error : java.lang.UnsupportedOperationException: JDBCUpsertTableSink can not support // If I add "sink.setIsAppendOnly(true)" // error : org.apache.flink.types.Row cannot be cast to org.apache.flink.api.java.tuple.Tuple2 // so I do like : DataStream stream = tableEnv.toRetractStream(appendTable, Row.class); sink.setIsAppendOnly(true) // result is OK ! I just want to insert/append data // I don't know where to set by framework ? if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) { throw new UnsupportedOperationException("JDBCUpsertTableSink can not support "); } {code} > JDBCAppendTableSink not support flush by flushIntervalMills > -------------------------------------------------------------- > > Key: FLINK-17459 > URL: https://issues.apache.org/jira/browse/FLINK-17459 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC > Affects Versions: 1.10.0 > Reporter: ranqiqiang > Priority: Major > > {{JDBCAppendTableSink just support append by > "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like > "JDBCUpsertTableSink#flushIntervalMills"}} > > {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not > be append !!}} -- This message was sent by Atlassian Jira (v8.3.4#803005)