ahmedabu98 commented on code in PR #38018:
URL: https://github.com/apache/beam/pull/38018#discussion_r3018252388


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -227,101 +257,170 @@ public ConvertToDataFile(
     private static final String UNKNOWN_FORMAT_ERROR = "Could not determine 
the file's format";
     static final String UNKNOWN_PARTITION_ERROR = "Could not determine the 
file's partition: ";
 
-    @ProcessElement
-    public void process(@Element String filePath, MultiOutputReceiver output)
-        throws IOException, InterruptedException {
-      FileFormat format;
-      try {
-        format = inferFormat(filePath);
-      } catch (UnknownFormatException e) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    private static class ProcessResult {
+      final @Nullable SerializableDataFile dataFile;
+      final @Nullable Row errorRow;
+      final Instant timestamp;
+      final BoundedWindow window;
+
+      ProcessResult(
+          @Nullable SerializableDataFile dataFile,
+          @Nullable Row errorRow,
+          Instant timestamp,
+          BoundedWindow window) {
+        Preconditions.checkState(
+            dataFile == null || errorRow == null,
+            "Expected only one of dataFile or errorRow, but got both:%n\tfile: 
%s%n\terror: %s",
+            dataFile != null ? dataFile.getPath() : null,
+            errorRow);
+        this.dataFile = dataFile;
+        this.errorRow = errorRow;
+        this.timestamp = timestamp;
+        this.window = window;
       }
+    }
 
-      if (table == null) {
-        try {
-          table = getOrCreateTable(filePath, format);
-        } catch (FileNotFoundException e) {
-          output
-              .get(ERRORS)
-              .output(
-                  Row.withSchema(ERROR_SCHEMA)
-                      .addValues(filePath, checkStateNotNull(e.getMessage()))
-                      .build());
-          numErrorFiles.inc();
-          return;
-        }
-      }
+    @Setup
+    public void setup() {
+      executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    }
 
-      // Check if the file path contains the provided prefix
-      if (table.spec().isPartitioned()
-          && !Strings.isNullOrEmpty(prefix)
-          && !filePath.startsWith(checkStateNotNull(prefix))) {
-        output
-            .get(ERRORS)
-            .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
PREFIX_ERROR).build());
-        numErrorFiles.inc();
-        return;
+    @Teardown
+    public void teardown() {
+      if (executor != null) {
+        executor.shutdownNow();
       }
+    }
 
-      InputFile inputFile = table.io().newInputFile(filePath);
+    @StartBundle
+    public void startBundle() {
+      activeTasks = new ArrayList<>();
+    }
 
-      Metrics metrics;
-      try {
-        metrics =
-            getFileMetrics(
-                inputFile,
-                format,
-                MetricsConfig.forTable(table),
-                MappingUtil.create(table.schema()));
-      } catch (FileNotFoundException e) {
-        output
-            .get(ERRORS)
-            .output(
+    private Callable<ProcessResult> createProcessTask(
+        String filePath, Instant timestamp, BoundedWindow window) {
+
+      return () -> {
+        FileFormat format;
+        try {
+          format = inferFormat(filePath);
+        } catch (UnknownFormatException e) {
+          return new ProcessResult(
+              null,
+              Row.withSchema(ERROR_SCHEMA).addValues(filePath, 
UNKNOWN_FORMAT_ERROR).build(),
+              timestamp,
+              window);
+        }
+
+        // Synchronize table initialization

Review Comment:
   Added synchronization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to