danny0405 commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3433204884
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -603,9 +610,23 @@ private void handleWriteMetaEvent(WriteMetadataEvent
event) {
*/
private boolean commitInstants(long checkpointId) {
// use < instead of <= because the write metadata event sends the last
known checkpoint id which is smaller than the current one.
- List<Boolean> result =
this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() <
checkpointId)
- .map(entry -> commitInstant(entry.getKey(),
entry.getValue().getLeft(),
entry.getValue().getRight())).collect(Collectors.toList());
- return result.stream().anyMatch(i -> i);
+ List<Map.Entry<Long, Pair<String, EventBuffer>>> entries =
this.eventBuffers.getEventBufferStream()
+ .filter(entry -> entry.getKey() < checkpointId)
+ .collect(Collectors.toList());
+ boolean anyCommitted = false;
+ for (int i = 0; i < entries.size(); i++) {
+ if (i > 0) {
+ // Refresh the baseline for subsequent instants so that OCC conflict
resolution
+ // sees the just-committed instant as completed, not as a concurrent
conflict.
+ this.metaClient.reloadActiveTimeline();
+ this.writeClient.preTxn(tableState.operationType, this.metaClient);
Review Comment:
this does not really work, the preTxn needs to be invoked before each
instant starts, so that we can figure out what instants got complted during the
current write window.
And another problem with Flink OCC is it does not work as expected, even if
the conflict deteced and we throws, the Flink checkpoint already succeeded,
there is no way to rollback to the precedding checkpoint by default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]