This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a15bf58da54 [FLINK-35924][Connectors / Common] delay the SplitReader
closure to until all the emitted records are processed. (#25130)
a15bf58da54 is described below
commit a15bf58da5442deeb07ac2a1795a961a0ec75561
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Aug 6 08:31:59 2024 -0700
[FLINK-35924][Connectors / Common] delay the SplitReader closure to until
all the emitted records are processed. (#25130)
This patch delays the SplitReader closure to wait until all the previously
emitted records are processed. This is needed for some of the SplitReader
implementation which stores the returned records in internal buffer to save a
data copy. In that case, closing the SplitReader will result in corruption of
the emitted but not yet processed records.
---
.../base/source/reader/fetcher/SplitFetcher.java | 45 ++++++++++++
.../source/reader/fetcher/SplitFetcherManager.java | 29 +++++++-
.../reader/fetcher/SplitFetcherManagerTest.java | 79 +++++++++++++++++++++-
.../source/reader/fetcher/SplitFetcherTest.java | 36 ++++++++++
.../java/org/apache/flink/test/util/TestUtils.java | 38 +++++++++++
5 files changed, 223 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 339686415ee..f05d7d16a45 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -32,10 +33,12 @@ import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -82,6 +85,14 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
private final Consumer<Collection<String>> splitFinishedHook;
+ /**
+ * A shutdown latch to help make sure the SplitReader is only closed after
all the emitted
+ * records have been processed by the main reader thread. This is needed
because in some cases,
+ * the records in the <tt>RecordsWithSplitIds</tt> may have not been
processed when the split
+ * fetcher shuts down.
+ */
+ private final CountDownLatch recordsProcessedLatch;
+
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
@@ -97,6 +108,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
this.shutdownHook = checkNotNull(shutdownHook);
this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
this.splitFinishedHook = splitFinishedHook;
+ this.recordsProcessedLatch = new CountDownLatch(1);
this.fetchTask =
new FetchTask<>(
@@ -117,10 +129,25 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
while (runOnce()) {
// nothing to do, everything is inside #runOnce.
}
+ if (recordsProcessedLatch.getCount() > 0) {
+ // Put an empty synchronization batch to the element queue.
+ // When this batch is recycled, all the records emitted earlier
+ // must have already been processed.
+ elementsQueue.put(
+ fetcherId(),
+ new RecordsBySplits<E>(Collections.emptyMap(),
Collections.emptySet()) {
+ @Override
+ public void recycle() {
+ super.recycle();
+ recordsProcessedLatch.countDown();
+ }
+ });
+ }
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
+ recordsProcessedLatch.await();
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
@@ -308,6 +335,24 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
/** Shutdown the split fetcher. */
public void shutdown() {
+ shutdown(false);
+ }
+
+ /**
+ * Shutdown the split fetcher. When waitingForRecordsProcessed is set to
true, the split fetcher
+ * will block waiting for the previously emitted records to be processed
before it closes the
+ * encapsulated SplitReader. Otherwise, it will just close the SplitReader.
+ *
+ * <p>This method is package private because it should only be used by the
SplitFetcherManager
+ * when closing the idle fetchers.
+ *
+ * @param waitingForRecordsProcessed whether wait for the previously
emitted records to be
+ * processed.
+ */
+ public void shutdown(boolean waitingForRecordsProcessed) {
+ if (!waitingForRecordsProcessed) {
+ recordsProcessedLatch.countDown();
+ }
lock.lock();
try {
if (!closed) {
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 4f7219eb2b8..7601994e401 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -78,6 +78,13 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
/** A map keeping track of all the split fetchers. */
protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
+ /**
+ * To Track the total number of fetcher threads that needs to be cleaned
up when the
+ * SplitFetcherManager shuts down. It is different from the fetchers Map
as the map only
+ * contains alive fetchers, but not shutting down fetchers.
+ */
+ private final AtomicInteger fetchersToShutDown;
+
/**
* An executor service with two threads. One for the fetcher and one for
the future completing
* thread.
@@ -148,6 +155,7 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
this.fetcherIdGenerator = new AtomicInteger(0);
this.fetchers = new ConcurrentHashMap<>();
this.allowUnalignedSourceSplits =
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
+ this.fetchersToShutDown = new AtomicInteger(0);
// Create the executor with a thread factory that fails the source
reader if one of
// the fetcher thread exits abnormally.
@@ -202,6 +210,7 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
this.fetcherIdGenerator = new AtomicInteger(0);
this.fetchers = new ConcurrentHashMap<>();
this.allowUnalignedSourceSplits =
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
+ this.fetchersToShutDown = new AtomicInteger(0);
// Create the executor with a thread factory that fails the source
reader if one of
// the fetcher thread exits abnormally.
@@ -259,6 +268,7 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
int fetcherId = fetcherIdGenerator.getAndIncrement();
+ fetchersToShutDown.incrementAndGet();
SplitFetcher<E, SplitT> splitFetcher =
new SplitFetcher<>(
fetcherId,
@@ -267,6 +277,7 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
errorHandler,
() -> {
fetchers.remove(fetcherId);
+ fetchersToShutDown.decrementAndGet();
// We need this to synchronize status of fetchers
to concurrent partners
// as
// ConcurrentHashMap's aggregate status methods
including size, isEmpty,
@@ -292,7 +303,7 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
SplitFetcher<E, SplitT> fetcher = entry.getValue();
if (fetcher.isIdle()) {
LOG.info("Closing splitFetcher {} because it is idle.",
entry.getKey());
- fetcher.shutdown();
+ fetcher.shutdown(true);
iter.remove();
}
}
@@ -315,14 +326,26 @@ public abstract class SplitFetcherManager<E, SplitT
extends SourceSplit> {
* @throws Exception when failed to close the split fetcher manager.
*/
public synchronized void close(long timeoutMs) throws Exception {
+ final long startTime = System.currentTimeMillis();
closed = true;
fetchers.values().forEach(SplitFetcher::shutdown);
+ // Actively drain the element queue in case there are previously
shutting down
+ // fetcher threads blocking on putting batches into the element queue.
+ executors.submit(
+ () -> {
+ while (fetchersToShutDown.get() > 0
+ && System.currentTimeMillis() - startTime <
timeoutMs) {
+ elementsQueue
+ .getAvailabilityFuture()
+ .thenRun(() -> elementsQueue.poll().recycle());
+ }
+ });
executors.shutdown();
if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
LOG.warn(
- "Failed to close the source reader in {} ms. There are
still {} split fetchers running",
+ "Failed to close the split fetchers in {} ms. There are
still {} split fetchers running",
timeoutMs,
- fetchers.size());
+ fetchersToShutDown);
}
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 08e7ac45df9..21572b1c84d 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
@@ -33,11 +34,13 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.junit.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Queue;
+import static org.apache.flink.test.util.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
@@ -68,6 +71,76 @@ public class SplitFetcherManagerTest {
.hasRootCauseMessage("Artificial exception on closing the
split reader.");
}
+ @Test(timeout = 30000)
+ public void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
+ final String splitId = "testSplit";
+ // Set the queue capacity to 1 to make sure in this case the
+ // fetcher shutdown won't block on putting the batches into the queue.
+ Configuration config = new Configuration();
+ config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+ final AwaitingReader<Integer, TestingSourceSplit> reader =
+ new AwaitingReader<>(
+ new IOException("Should not happen"),
+ new RecordsBySplits<>(
+ Collections.emptyMap(),
Collections.singleton(splitId)));
+ final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+ createFetcher(splitId, reader, config);
+ // Ensure the fetcher has emitted an element into the queue.
+ fetcherManager.getQueue().getAvailabilityFuture().get();
+ waitUntil(
+ () -> {
+ fetcherManager.maybeShutdownFinishedFetchers();
+ return fetcherManager.fetchers.isEmpty();
+ },
+ "The idle fetcher should have been removed.");
+ // Now close the fetcher manager. The fetcher manager closing should
not block.
+ fetcherManager.close(60_000);
+ }
+
+ @Test
+ public void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws
Exception {
+ final String splitId = "testSplit";
+ final AwaitingReader<Integer, TestingSourceSplit> reader =
+ new AwaitingReader<>(
+ new IOException("Should not happen"),
+ new RecordsBySplits<>(
+ Collections.emptyMap(),
Collections.singleton(splitId)));
+ final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+ createFetcher(splitId, reader, new Configuration());
+ try {
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue =
+ fetcherManager.getQueue();
+ // Wait util the data batch is emitted.
+ queue.getAvailabilityFuture().get();
+ waitUntil(
+ () -> {
+ fetcherManager.maybeShutdownFinishedFetchers();
+ return fetcherManager.getNumAliveFetchers() == 0;
+ },
+ Duration.ofSeconds(1),
+ "The fetcher should have already been removed from the
alive fetchers.");
+
+ // There should be two fetches available, one for data (although
empty), one for the
+ // shutdown synchronization (also empty).
+ waitUntil(
+ () -> queue.size() == 2,
+ Duration.ofSeconds(1),
+ "The element queue should have 2 batches when the fetcher
is closed.");
+
+ // Finish the first batch (data batch).
+ queue.poll().recycle();
+ assertThat(reader.isClosed).as("The reader should have not been
closed.").isFalse();
+ // Finish the second batch (synchronization batch).
+ queue.poll().recycle();
+ waitUntil(
+ () -> reader.isClosed,
+ Duration.ofSeconds(1),
+ "The reader should hava been closed.");
+ } finally {
+ fetcherManager.close(30_000);
+ }
+ }
+
// the final modifier is important so that '@SafeVarargs' is accepted on
Java 8
@SuppressWarnings("FinalPrivateMethod")
@SafeVarargs
@@ -134,6 +207,8 @@ public class SplitFetcherManagerTest {
private final OneShotLatch inBlocking = new OneShotLatch();
private final OneShotLatch throwError = new OneShotLatch();
+ private volatile boolean isClosed = false;
+
@SafeVarargs
AwaitingReader(IOException testError, RecordsWithSplitIds<E>...
fetches) {
this.testError = testError;
@@ -163,7 +238,9 @@ public class SplitFetcherManagerTest {
public void wakeUp() {}
@Override
- public void close() throws Exception {}
+ public void close() throws Exception {
+ isClosed = true;
+ }
public void awaitAllRecordsReturned() throws InterruptedException {
inBlocking.await();
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 26e62eb6f66..75c0c7b6548 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,6 +41,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.lang.Thread.State.WAITING;
+import static org.apache.flink.test.util.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for {@link SplitFetcher}. */
@@ -275,6 +278,39 @@ public class SplitFetcherTest {
assertThat(fetcher.runOnce()).isFalse();
}
+ @Test
+ public void testShutdownWaitingForRecordsProcessing() throws Exception {
+ TestingSplitReader<Object, TestingSourceSplit> splitReader = new
TestingSplitReader<>();
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+ new FutureCompletingBlockingQueue<>();
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcher(splitReader, queue);
+ fetcher.shutdown(true);
+
+ // Spawn a new fetcher thread to go through the shutdown sequence.
+ CheckedThread fetcherThread =
+ new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ fetcher.run();
+ assertThat(splitReader.isClosed()).isTrue();
+ }
+ };
+ fetcherThread.start();
+
+ // Wait until the fetcher thread to block on the shutdown latch.
+ waitUntil(
+ () -> fetcherThread.getState() == WAITING,
+ Duration.ofSeconds(1),
+ "The fetcher thread should be waiting for the shutdown latch");
+ assertThat(splitReader.isClosed())
+ .as("The split reader should have not been closed.")
+ .isFalse();
+
+ queue.getAvailabilityFuture().thenRun(() -> queue.poll().recycle());
+ // Now pull the latch.
+ fetcherThread.sync();
+ }
+
// ------------------------------------------------------------------------
// testing utils
// ------------------------------------------------------------------------
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
index 4adfbfe9765..23df204126c 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
@@ -47,10 +47,13 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Duration;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
import static
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
@@ -213,6 +216,41 @@ public class TestUtils {
.get());
}
+ /**
+ * Wait util the give condition is met or timeout is reached, whichever
comes first.
+ *
+ * @param condition the condition to meet.
+ * @param message the message to show if the condition is not met before
timeout.
+ * @throws InterruptedException when the thread is interrupted when
waiting for the condition.
+ * @throws TimeoutException when the condition is not met after the
specified timeout has
+ * elapsed.
+ */
+ public static void waitUntil(Supplier<Boolean> condition, String message)
+ throws InterruptedException, TimeoutException {
+ waitUntil(condition, Duration.ofSeconds(5), message);
+ }
+
+ /**
+ * Wait util the give condition is met or timeout is reached, whichever
comes first.
+ *
+ * @param condition the condition to meet.
+ * @param timeout the maximum time to wait for the condition to become
true.
+ * @param message the message to show if the condition is not met before
timeout.
+ * @throws InterruptedException when the thread is interrupted when
waiting for the condition.
+ * @throws TimeoutException when the condition is not met after the
specified timeout has
+ * elapsed.
+ */
+ public static void waitUntil(Supplier<Boolean> condition, Duration
timeout, String message)
+ throws InterruptedException, TimeoutException {
+ long startTime = System.currentTimeMillis();
+ while (!condition.get() && System.currentTimeMillis() < startTime +
timeout.toMillis()) {
+ Thread.sleep(1);
+ }
+ if (!condition.get()) {
+ throw new TimeoutException(message);
+ }
+ }
+
private static boolean allVerticesRunning(Map<ExecutionState, Integer>
states) {
return states.entrySet().stream()
.allMatch(