[ https://issues.apache.org/jira/browse/FLINK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522764#comment-17522764 ]
Shengkai Fang commented on FLINK-27215: --------------------------------------- Hi, [~yulei0824]. It's up to the planner to determine whether the upstream operator sends the -U. The logic just about how the jdbc sink processes the -U messages. If you are interested about the ChangelogMode inference, you can take a look at the FlinkChangelogModeInferenceProgram. I think we have already made the optimizaiton. > JDBC sink transiently deleted a record because of -u message of that record > --------------------------------------------------------------------------- > > Key: FLINK-27215 > URL: https://issues.apache.org/jira/browse/FLINK-27215 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.12.7, 1.13.5, 1.14.3 > Reporter: tim yu > Priority: Major > > A record is deleted transiently when using JDBC sink in upsert mode. > The -U message is processed as delete operation in class > TableBufferReducedStatementExecutor. > The following codes show how to process -U message: > {code:java} > /** > * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false > if the row kind is > * DELETE or UPDATE_BEFORE. > */ > private boolean changeFlag(RowKind rowKind) { > switch (rowKind) { > case INSERT: > case UPDATE_AFTER: > return true; > case DELETE: > case UPDATE_BEFORE: > return false; > default: > throw new UnsupportedOperationException( > String.format( > "Unknown row kind, the supported row kinds > is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," > + " DELETE, but get: %s.", > rowKind)); > } > } > @Override > public void executeBatch() throws SQLException { > for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : > reduceBuffer.entrySet()) { > if (entry.getValue().f0) { > upsertExecutor.addToBatch(entry.getValue().f1); > } else { > // delete by key > deleteExecutor.addToBatch(entry.getKey()); > } > } > upsertExecutor.executeBatch(); > deleteExecutor.executeBatch(); > reduceBuffer.clear(); > } > {code} > If -U and +U messages of one record are executed separately in different JDBC > batches, that record will be deleted transiently in external database and > then insert a new updated record to it. In fact, this record should be merely > updated once in the external database. > -- This message was sent by Atlassian Jira (v8.20.1#820001)