C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() - assertFalse(writer.beginFlush()); + assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test - public void testAlreadyFlushing() { + public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback<Void> callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); - assertTrue(writer.beginFlush()); + assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Worth it to add a check to ensure that `beginFlush` times out if ti has been invoked previously, before `doFlush` has also been called? ```suggestion assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org