C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106250177


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > I don't think that this PR makes double commits possible where they 
weren't before.
   
   So the issue with double commits in non-EOS mode is that, right now, we may 
throw an exception because of the bug that we're addressing here. But if we fix 
that exception, then double commits become possible. And if the first commit 
takes a while, then we might end up lagging too much and performing our second 
commit after a new instance of the same source task has been brought up.
   
   > WDYT about adding the EOS-style cancellation semantics to the final 
commit, or closing the OffsetBackingStore in cancel() to address these cases? 
Do you think that we can explore those changes in a follow-up PR?
   
   I think adding the EOS-style cancellation semantics would be okay for now, 
though they aren't as effective for this kind of task since we don't have a way 
of fencing out producers. We can do that part in this PR, and then file a Jira 
ticket to improve cancellation logic for source task offset commit, which we 
can explore at a later point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to