This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 4d0b69beba Flink: Fix IcebergSource tableloader lifecycle management in batch mode (#9173) 4d0b69beba is described below commit 4d0b69beba104e6912f6e6850189121fcd23ef8a Author: Mason Chen <mas.c...@berkeley.edu> AuthorDate: Sat Dec 9 00:06:32 2023 -0800 Flink: Fix IcebergSource tableloader lifecycle management in batch mode (#9173) --- .../apache/iceberg/flink/source/IcebergSource.java | 58 ++++++++-------------- .../enumerator/ContinuousSplitPlannerImpl.java | 4 +- 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 179253cb3a..a7ce2db61f 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -81,16 +81,18 @@ import org.slf4j.LoggerFactory; public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> { private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); + // This table loader can be closed, and it is only safe to use this instance for resource + // independent information (e.g. a table name). Copies of this are required to avoid lifecycle + // management conflicts with the user provided table loader. e.g. a copy of this is required for + // split planning, which uses the underlying io, and should be closed after split planning is + // complete. private final TableLoader tableLoader; private final ScanContext scanContext; private final ReaderFunction<T> readerFunction; private final SplitAssignerFactory assignerFactory; private final SerializableComparator<IcebergSourceSplit> splitComparator; private final SerializableRecordEmitter<T> emitter; - - // Can't use SerializableTable as enumerator needs a regular table - // that can discover table changes - private transient Table table; + private final String tableName; IcebergSource( TableLoader tableLoader, @@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn SerializableComparator<IcebergSourceSplit> splitComparator, Table table, SerializableRecordEmitter<T> emitter) { + Preconditions.checkNotNull(tableLoader, "tableLoader is required."); + Preconditions.checkNotNull(readerFunction, "readerFunction is required."); + Preconditions.checkNotNull(assignerFactory, "assignerFactory is required."); + Preconditions.checkNotNull(table, "table is required."); this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.splitComparator = splitComparator; - this.table = table; this.emitter = emitter; + this.tableName = table.name(); } String name() { - return "IcebergSource-" + lazyTable().name(); + return "IcebergSource-" + tableName; } private String planningThreadName() { @@ -120,38 +126,26 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn // a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()" // from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as // the unique thread pool name. - return lazyTable().name() + "-" + UUID.randomUUID(); + return tableName + "-" + UUID.randomUUID(); } private List<IcebergSourceSplit> planSplitsForBatch(String threadName) { ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); - try { + try (TableLoader loader = tableLoader.clone()) { + loader.open(); List<IcebergSourceSplit> splits = - FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool); + FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", - splits.size(), - lazyTable().name()); + "Discovered {} splits from table {} during job initialization", splits.size(), tableName); return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table loader", e); } finally { workerPool.shutdown(); } } - private Table lazyTable() { - if (table == null) { - tableLoader.open(); - try (TableLoader loader = tableLoader) { - this.table = loader.loadTable(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close table loader", e); - } - } - - return table; - } - @Override public Boundedness getBoundedness() { return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; @@ -160,7 +154,7 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn @Override public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) { IcebergSourceReaderMetrics metrics = - new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name()); + new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName); return new IcebergSourceReader<>( emitter, metrics, readerFunction, splitComparator, readerContext); } @@ -197,13 +191,12 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn LOG.info( "Iceberg source restored {} splits from state for table {}", enumState.pendingSplits().size(), - lazyTable().name()); + tableName); assigner = assignerFactory.createAssigner(enumState.pendingSplits()); } - if (scanContext.isStreaming()) { ContinuousSplitPlanner splitPlanner = - new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName()); + new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName()); return new ContinuousIcebergEnumerator( enumContext, assigner, scanContext, splitPlanner, enumState); } else { @@ -537,7 +530,6 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn } } - checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource<>( tableLoader, @@ -548,11 +540,5 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn table, emitter); } - - private void checkRequired() { - Preconditions.checkNotNull(tableLoader, "tableLoader is required."); - Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required."); - Preconditions.checkNotNull(readerFunction, "readerFunction is required."); - } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index f0d8ca8d70..450b649253 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner { */ public ContinuousSplitPlannerImpl( TableLoader tableLoader, ScanContext scanContext, String threadName) { - this.tableLoader = tableLoader; + this.tableLoader = tableLoader.clone(); this.tableLoader.open(); - this.table = tableLoader.loadTable(); + this.table = this.tableLoader.loadTable(); this.scanContext = scanContext; this.isSharedPool = threadName == null; this.workerPool =