[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -365,6 +365,10 @@ public void execute() {
 } catch (InterruptedException e) {
 // Ignore and allow to exit.
 } catch (RuntimeException e) {
+if (isCancelled()) {
+log.debug("Skipping final offset commit as task has been 
cancelled");
+throw e;
+}

Review Comment:
   Honestly not sure why we didn't put this here to begin with. Nice 



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Still think this may apply here



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -100,23 +104,45 @@ private boolean flushing() {
 
 /**
  * Performs the first step of a flush operation, snapshotting the current 
state. This does not
- * actually initiate the flush with the underlying storage.
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
  *
  * @return true if a flush was initiated, false if no data was available
+ * @throws ConnectException if the previous flush is not complete before 
this method is called
  */
-public synchronized boolean beginFlush() {
-if (flushing()) {
-log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+public boolean beginFlush() {
+try {
+return beginFlush(0, TimeUnit.NANOSECONDS);
+} catch (InterruptedException | TimeoutException e) {
+log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
 + "framework should not allow this");
 throw new ConnectException("OffsetStorageWriter is already 
flushing");
 }
+}
 
-if (data.isEmpty())
-return false;
-
-toFlush = data;
-data = new HashMap<>();
-return true;
+/**
+ * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
+ *
+ * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+ * @param timeUnit Units of the timeout argument
+ * @return true if a flush was initiated, false if no data was available
+ * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+ * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+ */
+public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+if 

[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if if has been 
invoked previously, before `doFlush` has also been called?
   ```suggestion
   assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
   assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-14 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106250177


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -98,6 +104,24 @@ private boolean flushing() {
 return toFlush != null;
 }
 
+public boolean waitForBeginFlush(Supplier timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   > I don't think that this PR makes double commits possible where they 
weren't before.
   
   So the issue with double commits in non-EOS mode is that, right now, we may 
throw an exception because of the bug that we're addressing here. But if we fix 
that exception, then double commits become possible. And if the first commit 
takes a while, then we might end up lagging too much and performing our second 
commit after a new instance of the same source task has been brought up.
   
   > WDYT about adding the EOS-style cancellation semantics to the final 
commit, or closing the OffsetBackingStore in cancel() to address these cases? 
Do you think that we can explore those changes in a follow-up PR?
   
   I think adding the EOS-style cancellation semantics would be okay for now, 
though they aren't as effective for this kind of task since we don't have a way 
of fencing out producers. We can do that part in this PR, and then file a Jira 
ticket to improve cancellation logic for source task offset commit, which we 
can explore at a later point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-14 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if ti has been 
invoked previously, before `doFlush` has also been called?
   ```suggestion
   assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
   assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-14 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106203170


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -256,14 +257,25 @@ private void maybeBeginTransaction() {
 private void commitTransaction() {
 log.debug("{} Committing offsets", this);
 
+long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 long started = time.milliseconds();
+long deadline = started + commitTimeoutMs;
 
 // We might have just aborted a transaction, in which case we'll have 
to begin a new one
 // in order to commit offsets
 maybeBeginTransaction();
 
 AtomicReference flushError = new AtomicReference<>();
-if (offsetWriter.beginFlush()) {
+boolean shouldFlush = false;
+try {
+// Provide a constant timeout value to wait indefinitely, as there 
should not be any concurrent flushes.
+// This is because commitTransaction is always called on the same 
thread, and should always block until
+// the flush is complete, or cancel the flush if an error occurs.
+shouldFlush = offsetWriter.beginFlush(deadline - 
time.milliseconds(), TimeUnit.MILLISECONDS);

Review Comment:
   I agree with the comment about how this method should never be invoked while 
there are in-progress flushes.
   
   Given that, is there any reason to go to the work of calculating a deadline 
and deriving a timeout from it, instead of simply invoking this method with a 
timeout of zero?
   
   We could even add a no-arg variant of `beginFlush` that calls `beginFlush(0, 
TimeUnit.MILLISECONDS)`.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -104,44 +101,29 @@ private boolean flushing() {
 return toFlush != null;
 }
 
-public boolean waitForBeginFlush(Supplier timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {
-while (true) {
-Future inProgressFlush;
-synchronized (this) {
-if (flushing()) {
-inProgressFlush = latestFlush;
-} else {
-return beginFlush();
-}
-}
-try {
-inProgressFlush.get(timeout.get(), timeUnit);
-} catch (ExecutionException e) {
-// someone else is responsible for handling this error, we 
just want to wait for the flush to be over.
-}
-}
-}
-
 /**
  * Performs the first step of a flush operation, snapshotting the current 
state. This does not
- * actually initiate the flush with the underlying storage.
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
  *
+ * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+ * @param timeUnit Units of the timeout argument
  * @return true if a flush was initiated, false if no data was available
+ * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+ * @throws TimeoutException if the `timeout` elapses before previous 
flushes are complete.

Review Comment:
   Nit: Javadocs != markdown, should be `{@code timeout}` (without backticks).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-08 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -98,6 +104,24 @@ private boolean flushing() {
 return toFlush != null;
 }
 
+public boolean waitForBeginFlush(Supplier timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   It seems like we're bending over backwards here to accommodate an assumption 
made in `beginFlush` that we'll never try to trigger two offset flushes at 
once, which is clearly false given the conditions that necessitate this fix 
(i.e., a task's end-of-life offset flush is triggered at the same time as its 
periodic offset flush).
   
   Given that, do we really need a separate method here, or can we relax the 
constraints in `beginFlush` to wait for in-progress flushes to conclude instead 
of throwing an exception if there are any?
   
   Additionally, it seems like the use of a `CompleteableFuture` here is a bit 
strange. Would a `Semaphore` or `CountDownLatch` be more suited?
   
   Finally--since this change may lead to us performing double offset commits 
when a task is being shut down, do you think it might also make sense to add a 
`close` method to the offset writer that throws an exception for any further 
attempts to flush, and possibly forcibly terminates any in-progress flushes? We 
can invoke that in `AbstractWorkerTask::cancel` (or possibly 
`WorkerSourceTask::cancel` if a different approach is necessary to preserve 
exactly-once semantics) to help tasks complete shutdown within the timeout 
allotted to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org