This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c21b5ed337 Flink: support split discovery throttling for streaming
read (#6299)
c21b5ed337 is described below
commit c21b5ed337cb13279d9b16a6532c9ef03055272d
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Dec 5 13:11:27 2022 -0800
Flink: support split discovery throttling for streaming read (#6299)
* Flink: support split discovery throttling for streaming read in case of
lagging behind or intentially delayed assumption.
This is to avoid eagerly discovering too many splits and tracking them in
memory when the Flink job falling behind too much. It helps to keep memory
footprint and enumerator checkpoint size in check.
---
.../apache/iceberg/flink/source/IcebergSource.java | 3 +-
.../flink/source/assigner/SimpleSplitAssigner.java | 11 +-
.../flink/source/assigner/SplitAssigner.java | 22 +++-
.../enumerator/ContinuousIcebergEnumerator.java | 65 +++++++---
.../enumerator/ContinuousSplitPlannerImpl.java | 26 +++-
.../source/enumerator/EnumerationHistory.java | 96 +++++++++++++++
.../source/enumerator/IcebergEnumeratorState.java | 15 +++
.../IcebergEnumeratorStateSerializer.java | 119 ++++++++++++++----
.../source/enumerator/StaticIcebergEnumerator.java | 15 +--
.../enumerator/ManualContinuousSplitPlanner.java | 67 ++++++++---
.../TestContinuousIcebergEnumerator.java | 96 +++++++++++++--
.../enumerator/TestContinuousSplitPlannerImpl.java | 74 ++++++++++++
.../source/enumerator/TestEnumerationHistory.java | 134 +++++++++++++++++++++
.../TestIcebergEnumeratorStateSerializer.java | 64 ++++++++--
14 files changed, 718 insertions(+), 89 deletions(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index df95b3e434..9728aeb2b3 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -187,8 +187,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
} else {
List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
- return new StaticIcebergEnumerator(
- enumContext, assigner, lazyTable(), scanContext, enumState);
+ return new StaticIcebergEnumerator(enumContext, assigner);
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
index c1da261a55..b43660f907 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
@@ -51,7 +51,7 @@ public class SimpleSplitAssigner implements SplitAssigner {
}
@Override
- public GetSplitResult getNext(@Nullable String hostname) {
+ public synchronized GetSplitResult getNext(@Nullable String hostname) {
if (pendingSplits.isEmpty()) {
return GetSplitResult.unavailable();
} else {
@@ -70,7 +70,7 @@ public class SimpleSplitAssigner implements SplitAssigner {
addSplits(splits);
}
- private void addSplits(Collection<IcebergSourceSplit> splits) {
+ private synchronized void addSplits(Collection<IcebergSourceSplit> splits) {
if (!splits.isEmpty()) {
pendingSplits.addAll(splits);
// only complete pending future if new splits are discovered
@@ -80,7 +80,7 @@ public class SimpleSplitAssigner implements SplitAssigner {
/** Simple assigner only tracks unassigned splits */
@Override
- public Collection<IcebergSourceSplitState> state() {
+ public synchronized Collection<IcebergSourceSplitState> state() {
return pendingSplits.stream()
.map(split -> new IcebergSourceSplitState(split,
IcebergSourceSplitStatus.UNASSIGNED))
.collect(Collectors.toList());
@@ -94,6 +94,11 @@ public class SimpleSplitAssigner implements SplitAssigner {
return availableFuture;
}
+ @Override
+ public synchronized int pendingSplitCount() {
+ return pendingSplits.size();
+ }
+
private synchronized void completeAvailableFuturesIfNeeded() {
if (availableFuture != null && !pendingSplits.isEmpty()) {
availableFuture.complete(null);
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
index b17a554f5e..ca60612f0e 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
@@ -38,8 +39,9 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
* single source or across sources.
* </ul>
*
- * <p>Enumerator should call the assigner APIs from the coordinator thread.
This is to simplify the
- * thread safety for assigner implementation.
+ * <p>Assigner implementation needs to be thread safe. Enumerator call the
assigner APIs mostly from
+ * the coordinator thread. But enumerator may call the {@link
SplitAssigner#pendingSplitCount()}
+ * from the I/O threads.
*/
public interface SplitAssigner extends Closeable {
@@ -97,4 +99,20 @@ public interface SplitAssigner extends Closeable {
* the coordinator thread using {@link
SplitEnumeratorContext#runInCoordinatorThread(Runnable)}.
*/
CompletableFuture<Void> isAvailable();
+
+ /**
+ * Return the number of pending splits that haven't been assigned yet.
+ *
+ * <p>The enumerator can poll this API to publish a metric on the number of
pending splits.
+ *
+ * <p>The enumerator can also use this information to throttle split
discovery for streaming read.
+ * If there are already many pending splits tracked by the assigner, it is
undesirable to discover
+ * more splits and track them in the assigner. That will increase the memory
footprint and
+ * enumerator checkpoint size.
+ *
+ * <p>Throttling works better together with {@link
ScanContext#maxPlanningSnapshotCount()}.
+ * Otherwise, the next split discovery after throttling will just discover
all non-enumerated
+ * snapshots and splits, which defeats the purpose of throttling.
+ */
+ int pendingSplitCount();
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index d104f46fda..b84dab190a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
+import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -34,6 +35,11 @@ import org.slf4j.LoggerFactory;
public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
+ /**
+ * This is hardcoded, as {@link ScanContext#maxPlanningSnapshotCount()}
could be the knob to
+ * control the total number of snapshots worth of splits tracked by assigner.
+ */
+ private static final int ENUMERATION_SPLIT_COUNT_HISTORY_SIZE = 3;
private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
private final SplitAssigner assigner;
@@ -46,6 +52,9 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
*/
private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
+ /** Track enumeration result history for split discovery throttling. */
+ private final EnumerationHistory enumerationHistory;
+
public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
@@ -59,6 +68,8 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
this.scanContext = scanContext;
this.splitPlanner = splitPlanner;
this.enumeratorPosition = new AtomicReference<>();
+ this.enumerationHistory = new
EnumerationHistory(ENUMERATION_SPLIT_COUNT_HISTORY_SIZE);
+
if (enumState != null) {
this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
}
@@ -87,12 +98,25 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
@Override
public IcebergEnumeratorState snapshotState(long checkpointId) {
- return new IcebergEnumeratorState(enumeratorPosition.get(),
assigner.state());
+ return new IcebergEnumeratorState(
+ enumeratorPosition.get(), assigner.state(),
enumerationHistory.snapshot());
}
/** This method is executed in an IO thread pool. */
private ContinuousEnumerationResult discoverSplits() {
- return splitPlanner.planSplits(enumeratorPosition.get());
+ int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+ if
(enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+ // If the assigner already has many pending splits, it is better to
pause split discovery.
+ // Otherwise, eagerly discovering more splits will just increase
assigner memory footprint
+ // and enumerator checkpoint state size.
+ LOG.info(
+ "Pause split discovery as the assigner already has too many pending
splits: {}",
+ pendingSplitCountFromAssigner);
+ return new ContinuousEnumerationResult(
+ Collections.emptyList(), enumeratorPosition.get(),
enumeratorPosition.get());
+ } else {
+ return splitPlanner.planSplits(enumeratorPosition.get());
+ }
}
/** This method is executed in a single coordinator thread. */
@@ -100,13 +124,10 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
if (error == null) {
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting
snapshot to the I/O
- // thread pool.
- // E.g., the splitDiscoveryInterval is very short (like 10 ms in some
unit tests) or the
- // thread
- // pool is busy and multiple discovery actions are executed
concurrently. Discovery result
- // should
- // only be accepted if the starting position matches the enumerator
position (like
- // compare-and-swap).
+ // thread pool. E.g., the splitDiscoveryInterval is very short (like
10 ms in some unit
+ // tests) or the thread pool is busy and multiple discovery actions
are executed
+ // concurrently. Discovery result should only be accepted if the
starting position
+ // matches the enumerator position (like compare-and-swap).
LOG.info(
"Skip {} discovered splits because the scan starting position
doesn't match "
+ "the current enumerator position: enumerator position = {},
scan starting position = {}",
@@ -114,12 +135,26 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
enumeratorPosition.get(),
result.fromPosition());
} else {
- assigner.onDiscoveredSplits(result.splits());
- LOG.info(
- "Added {} splits discovered between ({}, {}] to the assigner",
- result.splits().size(),
- result.fromPosition(),
- result.toPosition());
+ // Sometimes, enumeration may yield no splits for a few reasons.
+ // - upstream paused or delayed streaming writes to the Iceberg table.
+ // - enumeration frequency is higher than the upstream write frequency.
+ if (!result.splits().isEmpty()) {
+ assigner.onDiscoveredSplits(result.splits());
+ // EnumerationHistory makes throttling decision on split discovery
+ // based on the total number of splits discovered in the last a few
cycles.
+ // Only update enumeration history when there are some discovered
splits.
+ enumerationHistory.add(result.splits().size());
+ LOG.info(
+ "Added {} splits discovered between ({}, {}] to the assigner",
+ result.splits().size(),
+ result.fromPosition(),
+ result.toPosition());
+ } else {
+ LOG.info(
+ "No new splits discovered between ({}, {}]",
+ result.fromPosition(),
+ result.toPosition());
+ }
// update the enumerator position even if there is no split discovered
// or the toPosition is empty (e.g. for empty table).
enumeratorPosition.set(result.toPosition());
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 6ba370666e..5016cc8b26 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
@@ -78,7 +79,22 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
}
}
- /** Discover incremental changes between {@code lastPosition} and current
table snapshot */
+ private Snapshot toSnapshotInclusive(
+ Long lastConsumedSnapshotId, Snapshot currentSnapshot, int
maxPlanningSnapshotCount) {
+ // snapshots are in reverse order (latest snapshot first)
+ List<Snapshot> snapshots =
+ Lists.newArrayList(
+ SnapshotUtil.ancestorsBetween(
+ table, currentSnapshot.snapshotId(), lastConsumedSnapshotId));
+ if (snapshots.size() <= maxPlanningSnapshotCount) {
+ return currentSnapshot;
+ } else {
+ // Because snapshots are in reverse order of commit history, this index
returns
+ // the max allowed number of snapshots from the lastConsumedSnapshotId.
+ return snapshots.get(snapshots.size() - maxPlanningSnapshotCount);
+ }
+ }
+
private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
Snapshot currentSnapshot = table.currentSnapshot();
@@ -94,12 +110,16 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
LOG.info("Current table snapshot is already enumerated: {}",
currentSnapshot.snapshotId());
return new ContinuousEnumerationResult(Collections.emptyList(),
lastPosition, lastPosition);
} else {
+ Long lastConsumedSnapshotId = lastPosition != null ?
lastPosition.snapshotId() : null;
+ Snapshot toSnapshotInclusive =
+ toSnapshotInclusive(
+ lastConsumedSnapshotId, currentSnapshot,
scanContext.maxPlanningSnapshotCount());
IcebergEnumeratorPosition newPosition =
IcebergEnumeratorPosition.of(
- currentSnapshot.snapshotId(), currentSnapshot.timestampMillis());
+ toSnapshotInclusive.snapshotId(),
toSnapshotInclusive.timestampMillis());
ScanContext incrementalScan =
scanContext.copyWithAppendsBetween(
- lastPosition.snapshotId(), currentSnapshot.snapshotId());
+ lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(table, incrementalScan,
workerPool);
LOG.info(
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java
new file mode 100644
index 0000000000..ef21dad019
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Arrays;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.annotation.VisibleForTesting;
+
+/**
+ * This enumeration history is used for split discovery throttling. It tracks
the discovered split
+ * count per every non-empty enumeration.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+ private final int[] history;
+ // int (2B) should be enough without overflow for enumeration history
+ private int count;
+
+ EnumerationHistory(int maxHistorySize) {
+ this.history = new int[maxHistorySize];
+ }
+
+ synchronized void restore(int[] restoredHistory) {
+ int startingOffset = 0;
+ int restoreSize = restoredHistory.length;
+
+ if (restoredHistory.length > history.length) {
+ // keep the newest history
+ startingOffset = restoredHistory.length - history.length;
+ // only restore the latest history up to maxHistorySize
+ restoreSize = history.length;
+ }
+
+ System.arraycopy(restoredHistory, startingOffset, history, 0, restoreSize);
+ count = restoreSize;
+ }
+
+ synchronized int[] snapshot() {
+ int len = history.length;
+ if (count > len) {
+ int[] copy = new int[len];
+ // this is like a circular buffer
+ int indexForOldest = count % len;
+ System.arraycopy(history, indexForOldest, copy, 0, len - indexForOldest);
+ System.arraycopy(history, 0, copy, len - indexForOldest, indexForOldest);
+ return copy;
+ } else {
+ return Arrays.copyOfRange(history, 0, count);
+ }
+ }
+
+ /** Add the split count from the last enumeration result. */
+ synchronized void add(int splitCount) {
+ int pos = count % history.length;
+ history[pos] = splitCount;
+ count += 1;
+ }
+
+ @VisibleForTesting
+ synchronized boolean hasFullHistory() {
+ return count >= history.length;
+ }
+
+ /** @return true if split discovery should pause because assigner has too
many splits already. */
+ synchronized boolean shouldPauseSplitDiscovery(int
pendingSplitCountFromAssigner) {
+ if (count < history.length) {
+ // only check throttling when full history is obtained.
+ return false;
+ } else {
+ // if ScanContext#maxPlanningSnapshotCount() is 10, each split
enumeration can
+ // discovery splits up to 10 snapshots. if maxHistorySize is 3, the max
number of
+ // splits tracked in assigner shouldn't be more than 10 * (3 + 1)
snapshots
+ // worth of splits. +1 because there could be another enumeration when
the
+ // pending splits fall just below the 10 * 3.
+ int totalSplitCountFromRecentDiscovery =
Arrays.stream(history).reduce(0, Integer::sum);
+ return pendingSplitCountFromAssigner >=
totalSplitCountFromRecentDiscovery;
+ }
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
index 7913f7b435..024d0b1011 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
@@ -21,12 +21,15 @@ package org.apache.iceberg.flink.source.enumerator;
import java.io.Serializable;
import java.util.Collection;
import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
/** Enumerator state for checkpointing */
+@Internal
public class IcebergEnumeratorState implements Serializable {
@Nullable private final IcebergEnumeratorPosition lastEnumeratedPosition;
private final Collection<IcebergSourceSplitState> pendingSplits;
+ private int[] enumerationSplitCountHistory;
public IcebergEnumeratorState(Collection<IcebergSourceSplitState>
pendingSplits) {
this(null, pendingSplits);
@@ -35,8 +38,16 @@ public class IcebergEnumeratorState implements Serializable {
public IcebergEnumeratorState(
@Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
Collection<IcebergSourceSplitState> pendingSplits) {
+ this(lastEnumeratedPosition, pendingSplits, new int[0]);
+ }
+
+ public IcebergEnumeratorState(
+ @Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
+ Collection<IcebergSourceSplitState> pendingSplits,
+ int[] enumerationSplitCountHistory) {
this.lastEnumeratedPosition = lastEnumeratedPosition;
this.pendingSplits = pendingSplits;
+ this.enumerationSplitCountHistory = enumerationSplitCountHistory;
}
@Nullable
@@ -47,4 +58,8 @@ public class IcebergEnumeratorState implements Serializable {
public Collection<IcebergSourceSplitState> pendingSplits() {
return pendingSplits;
}
+
+ public int[] enumerationSplitCountHistory() {
+ return enumerationSplitCountHistory;
+ }
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
index f728043638..a6adc02ff6 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
@@ -37,7 +38,7 @@ public class IcebergEnumeratorStateSerializer
public static final IcebergEnumeratorStateSerializer INSTANCE =
new IcebergEnumeratorStateSerializer();
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
@@ -54,7 +55,7 @@ public class IcebergEnumeratorStateSerializer
@Override
public byte[] serialize(IcebergEnumeratorState enumState) throws IOException
{
- return serializeV1(enumState);
+ return serializeV2(enumState);
}
@Override
@@ -62,39 +63,73 @@ public class IcebergEnumeratorStateSerializer
switch (version) {
case 1:
return deserializeV1(serialized);
+ case 2:
+ return deserializeV2(serialized);
default:
throw new IOException("Unknown version: " + version);
}
}
- private byte[] serializeV1(IcebergEnumeratorState enumState) throws
IOException {
+ @VisibleForTesting
+ byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
DataOutputSerializer out = SERIALIZER_CACHE.get();
+ serializeEnumeratorPosition(out, enumState.lastEnumeratedPosition(),
positionSerializer);
+ serializePendingSplits(out, enumState.pendingSplits(), splitSerializer);
+ byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
- out.writeBoolean(enumState.lastEnumeratedPosition() != null);
- if (enumState.lastEnumeratedPosition() != null) {
- out.writeInt(positionSerializer.getVersion());
- byte[] positionBytes =
positionSerializer.serialize(enumState.lastEnumeratedPosition());
- out.writeInt(positionBytes.length);
- out.write(positionBytes);
- }
-
- out.writeInt(splitSerializer.getVersion());
- out.writeInt(enumState.pendingSplits().size());
- for (IcebergSourceSplitState splitState : enumState.pendingSplits()) {
- byte[] splitBytes = splitSerializer.serialize(splitState.split());
- out.writeInt(splitBytes.length);
- out.write(splitBytes);
- out.writeUTF(splitState.status().name());
- }
+ @VisibleForTesting
+ IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ IcebergEnumeratorPosition enumeratorPosition =
+ deserializeEnumeratorPosition(in, positionSerializer);
+ Collection<IcebergSourceSplitState> pendingSplits =
+ deserializePendingSplits(in, splitSerializer);
+ return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+ }
+ @VisibleForTesting
+ byte[] serializeV2(IcebergEnumeratorState enumState) throws IOException {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+ serializeEnumeratorPosition(out, enumState.lastEnumeratedPosition(),
positionSerializer);
+ serializePendingSplits(out, enumState.pendingSplits(), splitSerializer);
+ serializeEnumerationSplitCountHistory(out,
enumState.enumerationSplitCountHistory());
byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
}
- private IcebergEnumeratorState deserializeV1(byte[] serialized) throws
IOException {
+ @VisibleForTesting
+ IcebergEnumeratorState deserializeV2(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
+ IcebergEnumeratorPosition enumeratorPosition =
+ deserializeEnumeratorPosition(in, positionSerializer);
+ Collection<IcebergSourceSplitState> pendingSplits =
+ deserializePendingSplits(in, splitSerializer);
+ int[] enumerationSplitCountHistory =
deserializeEnumerationSplitCountHistory(in);
+ return new IcebergEnumeratorState(
+ enumeratorPosition, pendingSplits, enumerationSplitCountHistory);
+ }
+ private static void serializeEnumeratorPosition(
+ DataOutputSerializer out,
+ IcebergEnumeratorPosition enumeratorPosition,
+ IcebergEnumeratorPositionSerializer positionSerializer)
+ throws IOException {
+ out.writeBoolean(enumeratorPosition != null);
+ if (enumeratorPosition != null) {
+ out.writeInt(positionSerializer.getVersion());
+ byte[] positionBytes = positionSerializer.serialize(enumeratorPosition);
+ out.writeInt(positionBytes.length);
+ out.write(positionBytes);
+ }
+ }
+
+ private static IcebergEnumeratorPosition deserializeEnumeratorPosition(
+ DataInputDeserializer in, IcebergEnumeratorPositionSerializer
positionSerializer)
+ throws IOException {
IcebergEnumeratorPosition enumeratorPosition = null;
if (in.readBoolean()) {
int version = in.readInt();
@@ -102,7 +137,26 @@ public class IcebergEnumeratorStateSerializer
in.read(positionBytes);
enumeratorPosition = positionSerializer.deserialize(version,
positionBytes);
}
+ return enumeratorPosition;
+ }
+ private static void serializePendingSplits(
+ DataOutputSerializer out,
+ Collection<IcebergSourceSplitState> pendingSplits,
+ IcebergSourceSplitSerializer splitSerializer)
+ throws IOException {
+ out.writeInt(splitSerializer.getVersion());
+ out.writeInt(pendingSplits.size());
+ for (IcebergSourceSplitState splitState : pendingSplits) {
+ byte[] splitBytes = splitSerializer.serialize(splitState.split());
+ out.writeInt(splitBytes.length);
+ out.write(splitBytes);
+ out.writeUTF(splitState.status().name());
+ }
+ }
+
+ private static Collection<IcebergSourceSplitState> deserializePendingSplits(
+ DataInputDeserializer in, IcebergSourceSplitSerializer splitSerializer)
throws IOException {
int splitSerializerVersion = in.readInt();
int splitCount = in.readInt();
Collection<IcebergSourceSplitState> pendingSplits =
Lists.newArrayListWithCapacity(splitCount);
@@ -114,6 +168,29 @@ public class IcebergEnumeratorStateSerializer
pendingSplits.add(
new IcebergSourceSplitState(split,
IcebergSourceSplitStatus.valueOf(statusName)));
}
- return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+ return pendingSplits;
+ }
+
+ private static void serializeEnumerationSplitCountHistory(
+ DataOutputSerializer out, int[] enumerationSplitCountHistory) throws
IOException {
+ out.writeInt(enumerationSplitCountHistory.length);
+ if (enumerationSplitCountHistory.length > 0) {
+ for (int i = 0; i < enumerationSplitCountHistory.length; ++i) {
+ out.writeInt(enumerationSplitCountHistory[i]);
+ }
+ }
+ }
+
+ private static int[]
deserializeEnumerationSplitCountHistory(DataInputDeserializer in)
+ throws IOException {
+ int historySize = in.readInt();
+ int[] history = new int[historySize];
+ if (historySize > 0) {
+ for (int i = 0; i < historySize; ++i) {
+ history[i] = in.readInt();
+ }
+ }
+
+ return history;
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
index 9bb1501389..4e55ea5d5f 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -18,11 +18,8 @@
*/
package org.apache.iceberg.flink.source.enumerator;
-import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@@ -30,19 +27,11 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
@Internal
public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {
private final SplitAssigner assigner;
- private final Table table;
- private final ScanContext scanContext;
public StaticIcebergEnumerator(
- SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
- SplitAssigner assigner,
- Table table,
- ScanContext scanContext,
- @Nullable IcebergEnumeratorState enumState) {
+ SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner) {
super(enumeratorContext, assigner);
this.assigner = assigner;
- this.table = table;
- this.scanContext = scanContext;
}
@Override
@@ -57,6 +46,6 @@ public class StaticIcebergEnumerator extends
AbstractIcebergEnumerator {
@Override
public IcebergEnumeratorState snapshotState(long checkpointId) {
- return new IcebergEnumeratorState(null, assigner.state());
+ return new IcebergEnumeratorState(null, assigner.state(), new int[0]);
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index e6610acbc1..7575beed6e 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -19,31 +19,70 @@
package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
-import java.util.ArrayDeque;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
- private final ArrayDeque<IcebergSourceSplit> splits = new ArrayDeque<>();
- private IcebergEnumeratorPosition latestPosition;
+ private final int maxPlanningSnapshotCount;
+ // track splits per snapshot
+ private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
+ private long latestSnapshotId;
+
+ ManualContinuousSplitPlanner(ScanContext scanContext) {
+ this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
+ this.splits = new TreeMap<>();
+ this.latestSnapshotId = 0L;
+ }
@Override
- public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition
lastPosition) {
+ public synchronized ContinuousEnumerationResult planSplits(
+ IcebergEnumeratorPosition lastPosition) {
+ long fromSnapshotIdExclusive = 0;
+ if (lastPosition != null && lastPosition.snapshotId() != null) {
+ fromSnapshotIdExclusive = lastPosition.snapshotId();
+ }
+
+ Preconditions.checkArgument(
+ fromSnapshotIdExclusive <= latestSnapshotId,
+ "last enumerated snapshotId is greater than the latestSnapshotId");
+ if (fromSnapshotIdExclusive == latestSnapshotId) {
+ // already discovered everything.
+ return new ContinuousEnumerationResult(Lists.newArrayList(),
lastPosition, lastPosition);
+ }
+
+ // find the subset of snapshots to return discovered splits
+ long toSnapshotIdInclusive;
+ if (latestSnapshotId - fromSnapshotIdExclusive > maxPlanningSnapshotCount)
{
+ toSnapshotIdInclusive = fromSnapshotIdExclusive +
maxPlanningSnapshotCount;
+ } else {
+ toSnapshotIdInclusive = latestSnapshotId;
+ }
+
+ List<IcebergSourceSplit> discoveredSplits = Lists.newArrayList();
+ NavigableMap<Long, List<IcebergSourceSplit>> discoveredView =
+ splits.subMap(fromSnapshotIdExclusive, false, toSnapshotIdInclusive,
true);
+ discoveredView.forEach((snapshotId, snapshotSplits) ->
discoveredSplits.addAll(snapshotSplits));
ContinuousEnumerationResult result =
- new ContinuousEnumerationResult(Lists.newArrayList(splits),
lastPosition, latestPosition);
+ new ContinuousEnumerationResult(
+ discoveredSplits,
+ lastPosition,
+ // use the snapshot Id as snapshot timestamp.
+ IcebergEnumeratorPosition.of(toSnapshotIdInclusive,
toSnapshotIdInclusive));
return result;
}
- /** Add new splits to the collection */
- public void addSplits(List<IcebergSourceSplit> newSplits,
IcebergEnumeratorPosition newPosition) {
- splits.addAll(newSplits);
- this.latestPosition = newPosition;
- }
-
- /** Clear the splits collection */
- public void clearSplits() {
- splits.clear();
+ /**
+ * Add a collection of new splits. A monotonically increased snapshotId is
assigned to each batch
+ * of splits added by this method.
+ */
+ public synchronized void addSplits(List<IcebergSourceSplit> newSplits) {
+ latestSnapshotId += 1;
+ splits.put(latestSnapshotId, newSplits);
}
@Override
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index 50b730ae52..a051a4de0f 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.enumerator;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -43,7 +44,6 @@ public class TestContinuousIcebergEnumerator {
@Test
public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner();
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
@@ -51,6 +51,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -61,7 +62,7 @@ public class TestContinuousIcebergEnumerator {
// make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
- splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ splitPlanner.addSplits(splits);
enumeratorContext.triggerAllActions();
Collection<IcebergSourceSplitState> pendingSplits =
enumerator.snapshotState(2).pendingSplits();
@@ -73,7 +74,6 @@ public class TestContinuousIcebergEnumerator {
@Test
public void testDiscoverWhenReaderRegistered() throws Exception {
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner();
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
ScanContext scanContext =
@@ -81,6 +81,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -92,7 +93,7 @@ public class TestContinuousIcebergEnumerator {
// make one split available and trigger the periodic discovery
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
- splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ splitPlanner.addSplits(splits);
enumeratorContext.triggerAllActions();
Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty());
@@ -102,16 +103,16 @@ public class TestContinuousIcebergEnumerator {
@Test
public void testRequestingReaderUnavailableWhenSplitDiscovered() throws
Exception {
- ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner();
TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
new TestingSplitEnumeratorContext<>(4);
- ScanContext config =
+ ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
ContinuousIcebergEnumerator enumerator =
- createEnumerator(enumeratorContext, config, splitPlanner);
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
// register one reader, and let it request a split
enumeratorContext.registerReader(2, "localhost");
@@ -125,7 +126,7 @@ public class TestContinuousIcebergEnumerator {
List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
Assert.assertEquals(1, splits.size());
- splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ splitPlanner.addSplits(splits);
enumeratorContext.triggerAllActions();
Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2));
@@ -147,6 +148,85 @@ public class TestContinuousIcebergEnumerator {
.contains(splits.get(0));
}
+ @Test
+ public void testThrottlingDiscovery() throws Exception {
+ // create 10 splits
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER,
10, 1);
+
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ // discover one snapshot at a time
+ .maxPlanningSnapshotCount(1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new
ManualContinuousSplitPlanner(scanContext);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // register reader-2, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+ // add splits[0] to the planner for next discovery
+ splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+ enumeratorContext.triggerAllActions();
+
+ // because discovered split was assigned to reader, pending splits should
be empty
+ Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+ // split assignment to reader-2 should contain splits[0, 1)
+ Assert.assertEquals(
+ splits.subList(0, 1),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+ // add the remaining 9 splits (one for every snapshot)
+ // run discovery cycles while reader-2 still processing the splits[0]
+ for (int i = 1; i < 10; ++i) {
+ splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+ enumeratorContext.triggerAllActions();
+ }
+
+ // can only discover up to 3 snapshots/splits
+ Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+ // split assignment to reader-2 should be splits[0, 1)
+ Assert.assertEquals(
+ splits.subList(0, 1),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+ // now reader-2 finished splits[0]
+ enumerator.handleSourceEvent(2, new
SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+ enumeratorContext.triggerAllActions();
+ // still have 3 pending splits. After assigned splits[1] to reader-2, one
more split was
+ // discovered and added.
+ Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+ // split assignment to reader-2 should be splits[0, 2)
+ Assert.assertEquals(
+ splits.subList(0, 2),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+ // run 3 more split discovery cycles
+ for (int i = 0; i < 3; ++i) {
+ enumeratorContext.triggerAllActions();
+ }
+
+ // no more splits are discovered due to throttling
+ Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+ // split assignment to reader-2 should still be splits[0, 2)
+ Assert.assertEquals(
+ splits.subList(0, 2),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+ // now reader-2 finished splits[1]
+ enumerator.handleSourceEvent(2, new
SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+ enumeratorContext.triggerAllActions();
+ // still have 3 pending splits. After assigned new splits[2] to reader-2,
one more split was
+ // discovered and added.
+ Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+ // split assignment to reader-2 should be splits[0, 3)
+ Assert.assertEquals(
+ splits.subList(0, 3),
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+ }
+
private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
index 63fc53341f..84382c040c 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -507,4 +507,78 @@ public class TestContinuousSplitPlannerImpl {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
}
}
+
+ @Test
+ public void testMaxPlanningSnapshotCount() throws Exception {
+ appendTwoSnapshots();
+ // append 3 more snapshots
+ for (int i = 2; i < 5; ++i) {
+ appendSnapshot(i, 2);
+ }
+
+ ScanContext scanContext =
+ ScanContext.builder()
+
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ // limit to 1 snapshot per discovery
+ .maxPlanningSnapshotCount(1)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner =
+ new ContinuousSplitPlannerImpl(tableResource.table(), scanContext,
null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ // For inclusive behavior, the initial result should point to snapshot1's
parent,
+ // which leads to null snapshotId and snapshotTimestampMs.
+ Assert.assertNull(initialResult.toPosition().snapshotId());
+ Assert.assertNull(initialResult.toPosition().snapshotTimestampMs());
+ Assert.assertEquals(0, initialResult.splits().size());
+
+ ContinuousEnumerationResult secondResult =
splitPlanner.planSplits(initialResult.toPosition());
+ // should discover dataFile1 appended in snapshot1
+ verifyMaxPlanningSnapshotCountResult(
+ secondResult, null, snapshot1,
ImmutableSet.of(dataFile1.path().toString()));
+
+ ContinuousEnumerationResult thirdResult =
splitPlanner.planSplits(secondResult.toPosition());
+ // should discover dataFile2 appended in snapshot2
+ verifyMaxPlanningSnapshotCountResult(
+ thirdResult, snapshot1, snapshot2,
ImmutableSet.of(dataFile2.path().toString()));
+ }
+
+ private void verifyMaxPlanningSnapshotCountResult(
+ ContinuousEnumerationResult result,
+ Snapshot fromSnapshotExclusive,
+ Snapshot toSnapshotInclusive,
+ Set<String> expectedFiles) {
+ if (fromSnapshotExclusive == null) {
+ Assert.assertNull(result.fromPosition().snapshotId());
+ Assert.assertNull(result.fromPosition().snapshotTimestampMs());
+ } else {
+ Assert.assertEquals(
+ fromSnapshotExclusive.snapshotId(),
result.fromPosition().snapshotId().longValue());
+ Assert.assertEquals(
+ fromSnapshotExclusive.timestampMillis(),
+ result.fromPosition().snapshotTimestampMs().longValue());
+ }
+ Assert.assertEquals(
+ toSnapshotInclusive.snapshotId(),
result.toPosition().snapshotId().longValue());
+ Assert.assertEquals(
+ toSnapshotInclusive.timestampMillis(),
+ result.toPosition().snapshotTimestampMs().longValue());
+ // should only have one split with one data file, because split discover
is limited to
+ // one snapshot and each snapshot has only one data file appended.
+ IcebergSourceSplit split = Iterables.getOnlyElement(result.splits());
+ Assert.assertEquals(1, split.task().files().size());
+ Set<String> discoveredFiles =
+ split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+ }
+
+ private Snapshot appendSnapshot(long seed, int numRecords) throws Exception {
+ List<Record> batch = RandomGenericData.generate(TestFixtures.SCHEMA,
numRecords, seed);
+ DataFile dataFile = dataAppender.writeFile(null, batch);
+ dataAppender.appendToTable(dataFile);
+ return tableResource.table().currentSnapshot();
+ }
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestEnumerationHistory.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestEnumerationHistory.java
new file mode 100644
index 0000000000..e2be0b4b03
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestEnumerationHistory.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEnumerationHistory {
+ private static final int MAX_HISTORY_SIZE = 3;
+ private static final int FEW_PENDING_SPLITS = 2;
+ private static final int TOO_MANY_PENDING_SPLITS = 100;
+
+ @Test
+ public void testEmptyHistory() {
+ EnumerationHistory history = new EnumerationHistory(MAX_HISTORY_SIZE);
+ int[] expectedHistorySnapshot = new int[0];
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ @Test
+ public void testNotFullHistory() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ int[] expectedHistorySnapshot = {1, 2};
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ @Test
+ public void testExactFullHistory() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ history.add(3);
+ int[] expectedHistorySnapshot = {1, 2, 3};
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ @Test
+ public void testOneMoreThanFullHistory() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ history.add(3);
+ history.add(4);
+ int[] expectedHistorySnapshot = {2, 3, 4};
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ @Test
+ public void testTwoMoreThanFullHistory() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ history.add(3);
+ history.add(4);
+ history.add(5);
+ int[] expectedHistorySnapshot = {3, 4, 5};
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ @Test
+ public void testThreeMoreThanFullHistory() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ history.add(3);
+ history.add(4);
+ history.add(5);
+ history.add(6);
+ int[] expectedHistorySnapshot = {4, 5, 6};
+ testHistory(history, expectedHistorySnapshot);
+ }
+
+ private void testHistory(EnumerationHistory history, int[]
expectedHistorySnapshot) {
+ Assert.assertFalse(history.shouldPauseSplitDiscovery(FEW_PENDING_SPLITS));
+ if (history.hasFullHistory()) {
+ // throttle because pending split count is more than the sum of
enumeration history
+
Assert.assertTrue(history.shouldPauseSplitDiscovery(TOO_MANY_PENDING_SPLITS));
+ } else {
+ // skipped throttling check because there is not enough history
+
Assert.assertFalse(history.shouldPauseSplitDiscovery(TOO_MANY_PENDING_SPLITS));
+ }
+
+ int[] historySnapshot = history.snapshot();
+ Assert.assertArrayEquals(expectedHistorySnapshot, historySnapshot);
+
+ EnumerationHistory restoredHistory = new
EnumerationHistory(MAX_HISTORY_SIZE);
+ restoredHistory.restore(historySnapshot);
+
+ Assert.assertFalse(history.shouldPauseSplitDiscovery(FEW_PENDING_SPLITS));
+ if (history.hasFullHistory()) {
+ // throttle because pending split count is more than the sum of
enumeration history
+
Assert.assertTrue(history.shouldPauseSplitDiscovery(TOO_MANY_PENDING_SPLITS));
+ } else {
+ // skipped throttling check because there is not enough history
+ Assert.assertFalse(history.shouldPauseSplitDiscovery(30));
+ }
+ }
+
+ @Test
+ public void testRestoreDifferentSize() {
+ EnumerationHistory history = new EnumerationHistory(3);
+ history.add(1);
+ history.add(2);
+ history.add(3);
+ int[] historySnapshot = history.snapshot();
+
+ EnumerationHistory smallerHistory = new EnumerationHistory(2);
+ smallerHistory.restore(historySnapshot);
+ int[] expectedRestoredHistorySnapshot = {2, 3};
+ Assert.assertArrayEquals(expectedRestoredHistorySnapshot,
smallerHistory.snapshot());
+
+ EnumerationHistory largerHisotry = new EnumerationHistory(4);
+ largerHisotry.restore(historySnapshot);
+ Assert.assertArrayEquals(historySnapshot, largerHisotry.snapshot());
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
index 33ff58c52f..0082e25add 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.enumerator;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -31,30 +32,41 @@ import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class TestIcebergEnumeratorStateSerializer {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
private final IcebergEnumeratorStateSerializer serializer =
IcebergEnumeratorStateSerializer.INSTANCE;
+ protected final int version;
+
+ @Parameterized.Parameters(name = "version={0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
+
+ public TestIcebergEnumeratorStateSerializer(int version) {
+ this.version = version;
+ }
+
@Test
public void testEmptySnapshotIdAndPendingSplits() throws Exception {
IcebergEnumeratorState enumeratorState = new
IcebergEnumeratorState(Collections.emptyList());
- byte[] result = serializer.serialize(enumeratorState);
- IcebergEnumeratorState deserialized =
serializer.deserialize(serializer.getVersion(), result);
- assertEnumeratorStateEquals(enumeratorState, deserialized);
+ testSerializer(enumeratorState);
}
@Test
public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception {
IcebergEnumeratorPosition position =
IcebergEnumeratorPosition.of(1L, System.currentTimeMillis());
+
IcebergEnumeratorState enumeratorState =
new IcebergEnumeratorState(position, Collections.emptyList());
- byte[] result = serializer.serialize(enumeratorState);
- IcebergEnumeratorState deserialized =
serializer.deserialize(serializer.getVersion(), result);
- assertEnumeratorStateEquals(enumeratorState, deserialized);
+ testSerializer(enumeratorState);
}
@Test
@@ -72,14 +84,47 @@ public class TestIcebergEnumeratorStateSerializer {
new IcebergSourceSplitState(splits.get(2),
IcebergSourceSplitStatus.COMPLETED));
IcebergEnumeratorState enumeratorState = new
IcebergEnumeratorState(position, pendingSplits);
- byte[] result = serializer.serialize(enumeratorState);
- IcebergEnumeratorState deserialized =
serializer.deserialize(serializer.getVersion(), result);
+ testSerializer(enumeratorState);
+ }
+
+ @Test
+ public void testEnumerationSplitCountHistory() throws Exception {
+ if (version == 2) {
+ IcebergEnumeratorPosition position =
+ IcebergEnumeratorPosition.of(2L, System.currentTimeMillis());
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER,
3, 1);
+ Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayList();
+ pendingSplits.add(
+ new IcebergSourceSplitState(splits.get(0),
IcebergSourceSplitStatus.UNASSIGNED));
+ pendingSplits.add(
+ new IcebergSourceSplitState(splits.get(1),
IcebergSourceSplitStatus.ASSIGNED));
+ pendingSplits.add(
+ new IcebergSourceSplitState(splits.get(2),
IcebergSourceSplitStatus.COMPLETED));
+ int[] enumerationSplitCountHistory = {1, 2, 3};
+
+ IcebergEnumeratorState enumeratorState =
+ new IcebergEnumeratorState(position, pendingSplits,
enumerationSplitCountHistory);
+ testSerializer(enumeratorState);
+ }
+ }
+
+ private void testSerializer(IcebergEnumeratorState enumeratorState) throws
IOException {
+ byte[] result;
+ if (version == 1) {
+ result = serializer.serializeV1(enumeratorState);
+ } else {
+ result = serializer.serialize(enumeratorState);
+ }
+
+ IcebergEnumeratorState deserialized = serializer.deserialize(version,
result);
assertEnumeratorStateEquals(enumeratorState, deserialized);
}
private void assertEnumeratorStateEquals(
IcebergEnumeratorState expected, IcebergEnumeratorState actual) {
Assert.assertEquals(expected.lastEnumeratedPosition(),
actual.lastEnumeratedPosition());
+
Assert.assertEquals(expected.pendingSplits().size(),
actual.pendingSplits().size());
Iterator<IcebergSourceSplitState> expectedIterator =
expected.pendingSplits().iterator();
Iterator<IcebergSourceSplitState> actualIterator =
actual.pendingSplits().iterator();
@@ -93,5 +138,8 @@ public class TestIcebergEnumeratorStateSerializer {
expectedSplitState.split().recordOffset(),
actualSplitState.split().recordOffset());
Assert.assertEquals(expectedSplitState.status(),
actualSplitState.status());
}
+
+ Assert.assertArrayEquals(
+ expected.enumerationSplitCountHistory(),
actual.enumerationSplitCountHistory());
}
}