gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107669147


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -100,23 +104,45 @@ private boolean flushing() {
 
     /**
      * Performs the first step of a flush operation, snapshotting the current 
state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
      *
      * @return true if a flush was initiated, false if no data was available
+     * @throws ConnectException if the previous flush is not complete before 
this method is called
      */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+    public boolean beginFlush() {
+        try {
+            return beginFlush(0, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
                     + "framework should not allow this");
             throw new ConnectException("OffsetStorageWriter is already 
flushing");
         }
+    }
 
-        if (data.isEmpty())
-            return false;
-
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
+    /**
+     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
+     *
+     * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
+     * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+     * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+     */
+    public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+        if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+            synchronized (this) {
+                if (data.isEmpty())
+                    return false;

Review Comment:
   Oh wow that's pretty serious, I added a unit test that targets this release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to