[ https://issues.apache.org/jira/browse/FLINK-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenyore updated FLINK-24626: ---------------------------- Summary: Flink JDBC Sink may lose data in left join (was: Flink JDBC Sink may lose data in retract stream) > Flink JDBC Sink may lose data in left join > ------------------------------------------ > > Key: FLINK-24626 > URL: https://issues.apache.org/jira/browse/FLINK-24626 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Reporter: Kenyore > Priority: Major > > The JDBC sink will lose some data while usingĀ > TableBufferReducedStatementExecutor. > Here are someĀ snippets. > {code} > @Override > public void addToBatch(RowData record) throws SQLException { > RowData key = keyExtractor.apply(record); > if(record.getRowKind()==RowKind.DELETE) { > //XXX cut delete off because the retract stream would generate > return; > } > boolean flag = changeFlag(record.getRowKind()); > RowData value = valueTransform.apply(record); // copy or not > reduceBuffer.put(key, Tuple2.of(flag, value)); > } > private boolean changeFlag(RowKind rowKind) { > switch (rowKind) { > case INSERT: > case UPDATE_AFTER: > return true; > case DELETE: > case UPDATE_BEFORE: > return false; > default: > throw new UnsupportedOperationException( > String.format( > "Unknown row kind, the supported row kinds > is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," > + " DELETE, but get: %s.", > rowKind)); > } > } > {code} > The code above add changeFlag to Tuple2 as the sign of upsert or delete > {code} > @Override > public void executeBatch() throws SQLException { > for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : > reduceBuffer.entrySet()) { > if (entry.getValue().f0) { > upsertExecutor.addToBatch(entry.getValue().f1); > } else { > // delete by key > deleteExecutor.addToBatch(entry.getKey()); > } > } > upsertExecutor.executeBatch(); > deleteExecutor.executeBatch(); > reduceBuffer.clear(); > } > {code} > executeBatch deletes all false flag data after true flag data. > It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we > would meet data lose because of this. -- This message was sent by Atlassian Jira (v8.3.4#803005)