This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d0b69beba Flink: Fix IcebergSource tableloader lifecycle management 
in batch mode (#9173)
4d0b69beba is described below

commit 4d0b69beba104e6912f6e6850189121fcd23ef8a
Author: Mason Chen <mas.c...@berkeley.edu>
AuthorDate: Sat Dec 9 00:06:32 2023 -0800

    Flink: Fix IcebergSource tableloader lifecycle management in batch mode 
(#9173)
---
 .../apache/iceberg/flink/source/IcebergSource.java | 58 ++++++++--------------
 .../enumerator/ContinuousSplitPlannerImpl.java     |  4 +-
 2 files changed, 24 insertions(+), 38 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 179253cb3a..a7ce2db61f 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -81,16 +81,18 @@ import org.slf4j.LoggerFactory;
 public class IcebergSource<T> implements Source<T, IcebergSourceSplit, 
IcebergEnumeratorState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSource.class);
 
+  // This table loader can be closed, and it is only safe to use this instance 
for resource
+  // independent information (e.g. a table name). Copies of this are required 
to avoid lifecycle
+  // management conflicts with the user provided table loader. e.g. a copy of 
this is required for
+  // split planning, which uses the underlying io, and should be closed after 
split planning is
+  // complete.
   private final TableLoader tableLoader;
   private final ScanContext scanContext;
   private final ReaderFunction<T> readerFunction;
   private final SplitAssignerFactory assignerFactory;
   private final SerializableComparator<IcebergSourceSplit> splitComparator;
   private final SerializableRecordEmitter<T> emitter;
-
-  // Can't use SerializableTable as enumerator needs a regular table
-  // that can discover table changes
-  private transient Table table;
+  private final String tableName;
 
   IcebergSource(
       TableLoader tableLoader,
@@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       SerializableComparator<IcebergSourceSplit> splitComparator,
       Table table,
       SerializableRecordEmitter<T> emitter) {
+    Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
+    Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
+    Preconditions.checkNotNull(assignerFactory, "assignerFactory is 
required.");
+    Preconditions.checkNotNull(table, "table is required.");
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
     this.readerFunction = readerFunction;
     this.assignerFactory = assignerFactory;
     this.splitComparator = splitComparator;
-    this.table = table;
     this.emitter = emitter;
+    this.tableName = table.name();
   }
 
   String name() {
-    return "IcebergSource-" + lazyTable().name();
+    return "IcebergSource-" + tableName;
   }
 
   private String planningThreadName() {
@@ -120,38 +126,26 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     // a public API like the protected method "OperatorCoordinator.Context 
getCoordinatorContext()"
     // from SourceCoordinatorContext implementation. For now, <table 
name>-<random UUID> is used as
     // the unique thread pool name.
-    return lazyTable().name() + "-" + UUID.randomUUID();
+    return tableName + "-" + UUID.randomUUID();
   }
 
   private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
     ExecutorService workerPool =
         ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
-    try {
+    try (TableLoader loader = tableLoader.clone()) {
+      loader.open();
       List<IcebergSourceSplit> splits =
-          FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, 
workerPool);
+          FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), 
scanContext, workerPool);
       LOG.info(
-          "Discovered {} splits from table {} during job initialization",
-          splits.size(),
-          lazyTable().name());
+          "Discovered {} splits from table {} during job initialization", 
splits.size(), tableName);
       return splits;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close table loader", e);
     } finally {
       workerPool.shutdown();
     }
   }
 
-  private Table lazyTable() {
-    if (table == null) {
-      tableLoader.open();
-      try (TableLoader loader = tableLoader) {
-        this.table = loader.loadTable();
-      } catch (IOException e) {
-        throw new UncheckedIOException("Failed to close table loader", e);
-      }
-    }
-
-    return table;
-  }
-
   @Override
   public Boundedness getBoundedness() {
     return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED;
@@ -160,7 +154,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   @Override
   public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext 
readerContext) {
     IcebergSourceReaderMetrics metrics =
-        new IcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
+        new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
     return new IcebergSourceReader<>(
         emitter, metrics, readerFunction, splitComparator, readerContext);
   }
@@ -197,13 +191,12 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       LOG.info(
           "Iceberg source restored {} splits from state for table {}",
           enumState.pendingSplits().size(),
-          lazyTable().name());
+          tableName);
       assigner = assignerFactory.createAssigner(enumState.pendingSplits());
     }
-
     if (scanContext.isStreaming()) {
       ContinuousSplitPlanner splitPlanner =
-          new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, 
planningThreadName());
+          new ContinuousSplitPlannerImpl(tableLoader, scanContext, 
planningThreadName());
       return new ContinuousIcebergEnumerator(
           enumContext, assigner, scanContext, splitPlanner, enumState);
     } else {
@@ -537,7 +530,6 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         }
       }
 
-      checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
       return new IcebergSource<>(
           tableLoader,
@@ -548,11 +540,5 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
           table,
           emitter);
     }
-
-    private void checkRequired() {
-      Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
-      Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is 
required.");
-      Preconditions.checkNotNull(readerFunction, "readerFunction is 
required.");
-    }
   }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index f0d8ca8d70..450b649253 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements 
ContinuousSplitPlanner {
    */
   public ContinuousSplitPlannerImpl(
       TableLoader tableLoader, ScanContext scanContext, String threadName) {
-    this.tableLoader = tableLoader;
+    this.tableLoader = tableLoader.clone();
     this.tableLoader.open();
-    this.table = tableLoader.loadTable();
+    this.table = this.tableLoader.loadTable();
     this.scanContext = scanContext;
     this.isSharedPool = threadName == null;
     this.workerPool =

Reply via email to