ahmedabu98 commented on code in PR #32666:
URL: https://github.com/apache/beam/pull/32666#discussion_r1792386285


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java:
##########
@@ -265,14 +265,18 @@ public void finishBundle(FinishBundleContext c) throws 
Exception {
         return;
       }
       recordWriterManager.close();
-      for (Map.Entry<WindowedValue<IcebergDestination>, List<ManifestFile>> 
destinationAndFiles :
-          
Preconditions.checkNotNull(recordWriterManager).getManifestFiles().entrySet()) {
+
+      for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>>
+          destinationAndFiles :
+              Preconditions.checkNotNull(recordWriterManager)
+                  .getSerializableDataFiles()
+                  .entrySet()) {
         WindowedValue<IcebergDestination> windowedDestination = 
destinationAndFiles.getKey();
 
-        for (ManifestFile manifestFile : destinationAndFiles.getValue()) {
+        for (SerializableDataFile dataFile : destinationAndFiles.getValue()) {
           c.output(
               FileWriteResult.builder()

Review Comment:
   That's true, failed bundles will likely leave behind unused files. 
   But also true that these unused files will ultimately never be committed, so 
Iceberg will not consider them as part of the table. 
   
   > Probably there's no correctness issue
   
   In that sense, it's not as much of a problem as the one we face with FileIO 
or BigQueryIO. I think worst case is it will cause some noise and wasted space? 
If it is a legit problem, we can implement a similar pattern of 1) write to 
temp directory and 2) copy over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to