[ 
https://issues.apache.org/jira/browse/FLINK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522764#comment-17522764
 ] 

Shengkai Fang commented on FLINK-27215:
---------------------------------------

Hi, [~yulei0824]. 

It's up to the planner to determine whether the upstream operator sends the -U. 
The logic just about  how the jdbc sink processes the -U messages. If you are 
interested about the ChangelogMode inference, you can take a look at the 
FlinkChangelogModeInferenceProgram. I think we have already made the 
optimizaiton.

> JDBC sink transiently deleted a record because of -u message of that record
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-27215
>                 URL: https://issues.apache.org/jira/browse/FLINK-27215
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.12.7, 1.13.5, 1.14.3
>            Reporter: tim yu
>            Priority: Major
>
> A record is deleted transiently when using JDBC sink in upsert mode.
> The -U message is processed as delete operation in class 
> TableBufferReducedStatementExecutor.
> The following codes show how to process -U message:
> {code:java}
>     /**
>      * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false 
> if the row kind is
>      * DELETE or UPDATE_BEFORE.
>      */
>     private boolean changeFlag(RowKind rowKind) {
>         switch (rowKind) {
>             case INSERT:
>             case UPDATE_AFTER:
>                 return true;
>             case DELETE:
>             case UPDATE_BEFORE:
>                 return false;
>             default:
>                 throw new UnsupportedOperationException(
>                         String.format(
>                                 "Unknown row kind, the supported row kinds 
> is: INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
>                                         + " DELETE, but get: %s.",
>                                 rowKind));
>         }
>     }
>     @Override
>     public void executeBatch() throws SQLException {
>         for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
> reduceBuffer.entrySet()) {
>             if (entry.getValue().f0) {
>                 upsertExecutor.addToBatch(entry.getValue().f1);
>             } else {
>                 // delete by key
>                 deleteExecutor.addToBatch(entry.getKey());
>             }
>         }
>         upsertExecutor.executeBatch();
>         deleteExecutor.executeBatch();
>         reduceBuffer.clear();
>     }
> {code}
> If -U and +U messages of one record are executed separately in different JDBC 
> batches, that record will be deleted transiently in external database and 
> then insert a new updated record to it. In fact, this record should be merely 
> updated once in the external database.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to