C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if ti has been 
invoked previously, before `doFlush` has also been called?
   ```suggestion
           assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
           assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to