[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -365,6 +365,10 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } catch (RuntimeException e) { +if (isCancelled()) { +log.debug("Skipping final offset commit as task has been cancelled"); +throw e; +} Review Comment: Honestly not sure why we didn't put this here to begin with. Nice ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Still think this may apply here ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -100,23 +104,45 @@ private boolean flushing() { /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * * @return true if a flush was initiated, false if no data was available + * @throws ConnectException if the previous flush is not complete before this method is called */ -public synchronized boolean beginFlush() { -if (flushing()) { -log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " +public boolean beginFlush() { +try { +return beginFlush(0, TimeUnit.NANOSECONDS); +} catch (InterruptedException | TimeoutException e) { +log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the " + "framework should not allow this"); throw new ConnectException("OffsetStorageWriter is already flushing"); } +} -if (data.isEmpty()) -return false; - -toFlush = data; -data = new HashMap<>(); -return true; +/** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. + * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument + * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete. + */ +public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { +if
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Worth it to add a check to ensure that `beginFlush` times out if if has been invoked previously, before `doFlush` has also been called? ```suggestion assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106250177 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -98,6 +104,24 @@ private boolean flushing() { return toFlush != null; } +public boolean waitForBeginFlush(Supplier timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { Review Comment: > I don't think that this PR makes double commits possible where they weren't before. So the issue with double commits in non-EOS mode is that, right now, we may throw an exception because of the bug that we're addressing here. But if we fix that exception, then double commits become possible. And if the first commit takes a while, then we might end up lagging too much and performing our second commit after a new instance of the same source task has been brought up. > WDYT about adding the EOS-style cancellation semantics to the final commit, or closing the OffsetBackingStore in cancel() to address these cases? Do you think that we can explore those changes in a follow-up PR? I think adding the EOS-style cancellation semantics would be okay for now, though they aren't as effective for this kind of task since we don't have a way of fencing out producers. We can do that part in this PR, and then file a Jira ticket to improve cancellation logic for source task offset commit, which we can explore at a later point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Worth it to add a check to ensure that `beginFlush` times out if ti has been invoked previously, before `doFlush` has also been called? ```suggestion assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106203170 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -256,14 +257,25 @@ private void maybeBeginTransaction() { private void commitTransaction() { log.debug("{} Committing offsets", this); +long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); long started = time.milliseconds(); +long deadline = started + commitTimeoutMs; // We might have just aborted a transaction, in which case we'll have to begin a new one // in order to commit offsets maybeBeginTransaction(); AtomicReference flushError = new AtomicReference<>(); -if (offsetWriter.beginFlush()) { +boolean shouldFlush = false; +try { +// Provide a constant timeout value to wait indefinitely, as there should not be any concurrent flushes. +// This is because commitTransaction is always called on the same thread, and should always block until +// the flush is complete, or cancel the flush if an error occurs. +shouldFlush = offsetWriter.beginFlush(deadline - time.milliseconds(), TimeUnit.MILLISECONDS); Review Comment: I agree with the comment about how this method should never be invoked while there are in-progress flushes. Given that, is there any reason to go to the work of calculating a deadline and deriving a timeout from it, instead of simply invoking this method with a timeout of zero? We could even add a no-arg variant of `beginFlush` that calls `beginFlush(0, TimeUnit.MILLISECONDS)`. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -104,44 +101,29 @@ private boolean flushing() { return toFlush != null; } -public boolean waitForBeginFlush(Supplier timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { -while (true) { -Future inProgressFlush; -synchronized (this) { -if (flushing()) { -inProgressFlush = latestFlush; -} else { -return beginFlush(); -} -} -try { -inProgressFlush.get(timeout.get(), timeUnit); -} catch (ExecutionException e) { -// someone else is responsible for handling this error, we just want to wait for the flush to be over. -} -} -} - /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the `timeout` elapses before previous flushes are complete. Review Comment: Nit: Javadocs != markdown, should be `{@code timeout}` (without backticks). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -98,6 +104,24 @@ private boolean flushing() { return toFlush != null; } +public boolean waitForBeginFlush(Supplier timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { Review Comment: It seems like we're bending over backwards here to accommodate an assumption made in `beginFlush` that we'll never try to trigger two offset flushes at once, which is clearly false given the conditions that necessitate this fix (i.e., a task's end-of-life offset flush is triggered at the same time as its periodic offset flush). Given that, do we really need a separate method here, or can we relax the constraints in `beginFlush` to wait for in-progress flushes to conclude instead of throwing an exception if there are any? Additionally, it seems like the use of a `CompleteableFuture` here is a bit strange. Would a `Semaphore` or `CountDownLatch` be more suited? Finally--since this change may lead to us performing double offset commits when a task is being shut down, do you think it might also make sense to add a `close` method to the offset writer that throws an exception for any further attempts to flush, and possibly forcibly terminates any in-progress flushes? We can invoke that in `AbstractWorkerTask::cancel` (or possibly `WorkerSourceTask::cancel` if a different approach is necessary to preserve exactly-once semantics) to help tasks complete shutdown within the timeout allotted to them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org