gharris1727 commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1107669147
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ########## @@ -100,23 +104,45 @@ private boolean flushing() { /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * * @return true if a flush was initiated, false if no data was available + * @throws ConnectException if the previous flush is not complete before this method is called */ - public synchronized boolean beginFlush() { - if (flushing()) { - log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " + public boolean beginFlush() { + try { + return beginFlush(0, TimeUnit.NANOSECONDS); + } catch (InterruptedException | TimeoutException e) { + log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the " + "framework should not allow this"); throw new ConnectException("OffsetStorageWriter is already flushing"); } + } - if (data.isEmpty()) - return false; - - toFlush = data; - data = new HashMap<>(); - return true; + /** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. + * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument + * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete. + */ + public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) { + synchronized (this) { + if (data.isEmpty()) + return false; Review Comment: Oh wow that's pretty serious, I added a unit test that targets this release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org