chamikaramj commented on code in PR #38018:
URL: https://github.com/apache/beam/pull/38018#discussion_r3017658084


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -227,101 +257,170 @@ public ConvertToDataFile(
     private static final String UNKNOWN_FORMAT_ERROR = "Could not determine 
the file's format";
     static final String UNKNOWN_PARTITION_ERROR = "Could not determine the 
file's partition: ";
 
-    @ProcessElement
-    public void process(@Element String filePath, MultiOutputReceiver output)
-        throws IOException, InterruptedException {
-      FileFormat format;
-      try {
-        format = inferFormat(filePath);
-      } catch (UnknownFormatException e) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    private static class ProcessResult {
+      final @Nullable SerializableDataFile dataFile;
+      final @Nullable Row errorRow;
+      final Instant timestamp;
+      final BoundedWindow window;
+
+      ProcessResult(
+          @Nullable SerializableDataFile dataFile,
+          @Nullable Row errorRow,
+          Instant timestamp,
+          BoundedWindow window) {
+        Preconditions.checkState(
+            dataFile == null || errorRow == null,
+            "Expected only one of dataFile or errorRow, but got both:%n\tfile: 
%s%n\terror: %s",
+            dataFile != null ? dataFile.getPath() : null,
+            errorRow);
+        this.dataFile = dataFile;
+        this.errorRow = errorRow;
+        this.timestamp = timestamp;
+        this.window = window;
       }
+    }
 
-      if (table == null) {
-        try {
-          table = getOrCreateTable(filePath, format);
-        } catch (FileNotFoundException e) {
-          output
-              .get(ERRORS)
-              .output(
-                  Row.withSchema(ERROR_SCHEMA)
-                      .addValues(filePath, checkStateNotNull(e.getMessage()))
-                      .build());
-          numErrorFiles.inc();
-          return;
-        }
-      }
+    @Setup
+    public void setup() {
+      executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    }
 
-      // Check if the file path contains the provided prefix
-      if (table.spec().isPartitioned()
-          && !Strings.isNullOrEmpty(prefix)
-          && !filePath.startsWith(checkStateNotNull(prefix))) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
PREFIX_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    @Teardown
+    public void teardown() {
+      if (executor != null) {
+        executor.shutdownNow();
       }
+    }
 
-      InputFile inputFile = table.io().newInputFile(filePath);
+    @StartBundle
+    public void startBundle() {
+      activeTasks = new ArrayList<>();
+    }
 
-      Metrics metrics;
-      try {
-        metrics =
-            getFileMetrics(
-                inputFile,
-                format,
-                MetricsConfig.forTable(table),
-                MappingUtil.create(table.schema()));
-      } catch (FileNotFoundException e) {
-        output
-            .get(ERRORS)
-            .output(
+    private Callable<ProcessResult> createProcessTask(
+        String filePath, Instant timestamp, BoundedWindow window) {
+
+      return () -> {
+        FileFormat format;
+        try {
+          format = inferFormat(filePath);
+        } catch (UnknownFormatException e) {
+          return new ProcessResult(
+              null,
+              Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build(),
+              timestamp,
+              window);
+        }
+
+        // Synchronize table initialization

Review Comment:
   Was this synchronized ? Just checking null could result in a race condition.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -227,101 +257,170 @@ public ConvertToDataFile(
     private static final String UNKNOWN_FORMAT_ERROR = "Could not determine 
the file's format";
     static final String UNKNOWN_PARTITION_ERROR = "Could not determine the 
file's partition: ";
 
-    @ProcessElement
-    public void process(@Element String filePath, MultiOutputReceiver output)
-        throws IOException, InterruptedException {
-      FileFormat format;
-      try {
-        format = inferFormat(filePath);
-      } catch (UnknownFormatException e) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    private static class ProcessResult {
+      final @Nullable SerializableDataFile dataFile;
+      final @Nullable Row errorRow;
+      final Instant timestamp;
+      final BoundedWindow window;
+
+      ProcessResult(
+          @Nullable SerializableDataFile dataFile,
+          @Nullable Row errorRow,
+          Instant timestamp,
+          BoundedWindow window) {
+        Preconditions.checkState(
+            dataFile == null || errorRow == null,
+            "Expected only one of dataFile or errorRow, but got both:%n\tfile: 
%s%n\terror: %s",
+            dataFile != null ? dataFile.getPath() : null,
+            errorRow);
+        this.dataFile = dataFile;
+        this.errorRow = errorRow;
+        this.timestamp = timestamp;
+        this.window = window;
       }
+    }
 
-      if (table == null) {
-        try {
-          table = getOrCreateTable(filePath, format);
-        } catch (FileNotFoundException e) {
-          output
-              .get(ERRORS)
-              .output(
-                  Row.withSchema(ERROR_SCHEMA)
-                      .addValues(filePath, checkStateNotNull(e.getMessage()))
-                      .build());
-          numErrorFiles.inc();
-          return;
-        }
-      }
+    @Setup
+    public void setup() {
+      executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    }
 
-      // Check if the file path contains the provided prefix
-      if (table.spec().isPartitioned()
-          && !Strings.isNullOrEmpty(prefix)
-          && !filePath.startsWith(checkStateNotNull(prefix))) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
PREFIX_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    @Teardown
+    public void teardown() {
+      if (executor != null) {
+        executor.shutdownNow();
       }
+    }
 
-      InputFile inputFile = table.io().newInputFile(filePath);
+    @StartBundle
+    public void startBundle() {
+      activeTasks = new ArrayList<>();
+    }
 
-      Metrics metrics;
-      try {
-        metrics =
-            getFileMetrics(
-                inputFile,
-                format,
-                MetricsConfig.forTable(table),
-                MappingUtil.create(table.schema()));
-      } catch (FileNotFoundException e) {
-        output
-            .get(ERRORS)
-            .output(
+    private Callable<ProcessResult> createProcessTask(
+        String filePath, Instant timestamp, BoundedWindow window) {
+
+      return () -> {
+        FileFormat format;
+        try {
+          format = inferFormat(filePath);
+        } catch (UnknownFormatException e) {
+          return new ProcessResult(
+              null,
+              Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build(),
+              timestamp,
+              window);
+        }
+
+        // Synchronize table initialization
+        if (table == null) {
+          try {
+            table = getOrCreateTable(filePath, format);
+          } catch (FileNotFoundException e) {
+            return new ProcessResult(
+                null,
                 Row.withSchema(ERROR_SCHEMA)
                     .addValues(filePath, checkStateNotNull(e.getMessage()))
-                    .build());
-        numErrorFiles.inc();
-        return;
-      }
+                    .build(),
+                timestamp,
+                window);
+          }
+        }
 
-      // Figure out which partition this DataFile should go to
-      String partitionPath;
-      if (table.spec().isUnpartitioned()) {
-        partitionPath = "";
-      } else if (!Strings.isNullOrEmpty(prefix)) {
-        // option 1: use directory structure to determine partition
-        // Note: we don't validate the DataFile content here
-        partitionPath = getPartitionFromFilePath(filePath);
-      } else {
+        // Check if the file path contains the provided prefix
+        if (table.spec().isPartitioned()
+            && !Strings.isNullOrEmpty(prefix)
+            && !filePath.startsWith(checkStateNotNull(prefix))) {
+          return new ProcessResult(
+              null,
+              Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
PREFIX_ERROR).build(),
+              timestamp,
+              window);
+        }
+
+        InputFile inputFile = table.io().newInputFile(filePath);
+
+        Metrics metrics;
         try {
-          // option 2: examine DataFile min/max statistics to determine 
partition
-          partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
-        } catch (UnknownPartitionException e) {
-          output
-              .get(ERRORS)
-              .output(
-                  Row.withSchema(ERROR_SCHEMA)
-                      .addValues(filePath, UNKNOWN_PARTITION_ERROR + 
e.getMessage())
-                      .build());
-          numErrorFiles.inc();
-          return;
+          metrics =
+              getFileMetrics(
+                  inputFile,
+                  format,
+                  MetricsConfig.forTable(table),
+                  MappingUtil.create(table.schema()));
+        } catch (FileNotFoundException e) {
+          return new ProcessResult(
+              null,
+              Row.withSchema(ERROR_SCHEMA)
+                  .addValues(filePath, checkStateNotNull(e.getMessage()))
+                  .build(),
+              timestamp,
+              window);
         }
-      }
 
-      DataFile df =
-          DataFiles.builder(table.spec())
-              .withPath(filePath)
-              .withFormat(format)
-              .withMetrics(metrics)
-              .withFileSizeInBytes(inputFile.getLength())
-              .withPartitionPath(partitionPath)
-              .build();
+        // Figure out which partition this DataFile should go to
+        String partitionPath;
+        if (table.spec().isUnpartitioned()) {
+          partitionPath = "";
+        } else if (!Strings.isNullOrEmpty(prefix)) {
+          // option 1: use directory structure to determine partition
+          // Note: we don't validate the DataFile content here
+          partitionPath = getPartitionFromFilePath(filePath);
+        } else {
+          try {
+            // option 2: examine DataFile min/max statistics to determine 
partition
+            partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
+          } catch (UnknownPartitionException e) {
+            return new ProcessResult(
+                null,
+                Row.withSchema(ERROR_SCHEMA)
+                    .addValues(filePath, UNKNOWN_PARTITION_ERROR + 
e.getMessage())
+                    .build(),
+                timestamp,
+                window);
+          }
+        }
+
+        DataFile df =
+            DataFiles.builder(table.spec())
+                .withPath(filePath)
+                .withFormat(format)
+                .withMetrics(metrics)
+                .withFileSizeInBytes(inputFile.getLength())
+                .withPartitionPath(partitionPath)
+                .build();
+
+        return new ProcessResult(
+            SerializableDataFile.from(df, partitionPath), null, timestamp, 
window);
+      };
+    }
+
+    @ProcessElement
+    public void process(
+        @Element String filePath,
+        @Timestamp Instant timestamp,
+        BoundedWindow window,
+        MultiOutputReceiver output)
+        throws IOException, InterruptedException {
+
+      Callable<ProcessResult> task = createProcessTask(filePath, timestamp, 
window);
+      
checkStateNotNull(activeTasks).add(checkStateNotNull(executor).submit(task));

Review Comment:
   Is there a chance of OOMs here due to adding a large number of pending tasks 
?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -205,10 +230,15 @@ static class ConvertToDataFile extends DoFn<String, 
SerializableDataFile> {
     private final @Nullable String prefix;
     private final @Nullable List<String> partitionFields;
     private final @Nullable Map<String, String> tableProps;
+    private transient @MonotonicNonNull ExecutorService executor;
+    private transient @MonotonicNonNull List<Future<ProcessResult>> 
activeTasks;
     private transient @MonotonicNonNull Table table;
+
     // Limit open readers to avoid blowing up memory on one worker
     private static final int MAX_READERS = 10;
     private static final Semaphore ACTIVE_READERS = new Semaphore(MAX_READERS);
+    // Number of parallel threads processing incoming files
+    private static final int THREAD_POOL_SIZE = 10;

Review Comment:
   Probably we should do some experimentation to pick a reasonable default here 
or base this on similar optimizations done for other I/Os.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to