This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new c76692b8 Add base representation of storage files
c76692b8 is described below

commit c76692b85a646adb449a263652747c1c87de37b8
Author: Ashvin Agrawal <ash...@apache.org>
AuthorDate: Sun Feb 16 12:48:22 2025 -0800

    Add base representation of storage files
---
 .../java/org/apache/xtable/model/TableChange.java  |  4 +-
 .../org/apache/xtable/model/storage/FilesDiff.java |  6 +--
 .../xtable/model/storage/InternalDataFile.java     | 19 ++++----
 .../apache/xtable/model/storage/InternalFile.java  | 56 ++++++++++++++++++++++
 .../{DataFilesDiff.java => InternalFilesDiff.java} | 38 +++++++++++++--
 .../xtable/model/storage/PartitionFileGroup.java   | 12 ++++-
 .../apache/xtable/spi/sync/ConversionTarget.java   |  6 +--
 .../xtable/model/storage/TestDataFilesDiff.java    |  8 ++--
 .../apache/xtable/model/storage/TestFilesDiff.java |  2 +-
 .../spi/extractor/TestExtractFromSource.java       | 10 ++--
 .../xtable/spi/sync/TestTableFormatSync.java       | 52 ++++++++++----------
 .../apache/xtable/delta/DeltaConversionSource.java |  8 ++--
 .../apache/xtable/delta/DeltaConversionTarget.java |  6 +--
 .../delta/DeltaDataFileUpdatesExtractor.java       | 19 ++++----
 .../xtable/hudi/BaseFileUpdatesExtractor.java      | 17 +++----
 .../apache/xtable/hudi/HudiConversionTarget.java   |  6 +--
 .../apache/xtable/hudi/HudiDataFileExtractor.java  |  6 +--
 .../xtable/iceberg/IcebergConversionSource.java    |  9 ++--
 .../xtable/iceberg/IcebergConversionTarget.java    |  6 +--
 .../xtable/iceberg/IcebergDataFileUpdatesSync.java | 25 ++++++----
 .../org/apache/xtable/ValidationTestHelper.java    |  8 ++--
 .../xtable/delta/ITDeltaConversionSource.java      |  2 +-
 .../apache/xtable/hudi/ITHudiConversionTarget.java | 14 +++---
 .../xtable/hudi/TestBaseFileUpdatesExtractor.java  |  6 +--
 .../xtable/hudi/TestHudiConversionTarget.java      |  4 +-
 .../iceberg/TestIcebergConversionSource.java       |  4 +-
 26 files changed, 232 insertions(+), 121 deletions(-)

diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java 
b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index fe3907ee..b425fd01 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -22,7 +22,7 @@ import lombok.Builder;
 import lombok.NonNull;
 import lombok.Value;
 
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 
 /**
  * Captures the changes in a single commit/instant from the source table.
@@ -33,7 +33,7 @@ import org.apache.xtable.model.storage.DataFilesDiff;
 @Builder(toBuilder = true)
 public class TableChange {
   // Change in files at the specified instant
-  DataFilesDiff filesDiff;
+  InternalFilesDiff filesDiff;
 
   /** The {@link InternalTable} at the commit time to which this table change 
belongs. */
   InternalTable tableAsOfChange;
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
index 7d628ce4..f86e64bc 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
@@ -92,12 +92,12 @@ public class FilesDiff<L, P> {
    * @param <P> the type of the previous files
    * @return the set of files that are added
    */
-  public static <P> FilesDiff<InternalDataFile, P> findNewAndRemovedFiles(
+  public static <P> FilesDiff<InternalFile, P> findNewAndRemovedFiles(
       List<PartitionFileGroup> latestFileGroups, Map<String, P> previousFiles) 
{
-    Map<String, InternalDataFile> latestFiles =
+    Map<String, InternalFile> latestFiles =
         latestFileGroups.stream()
             .flatMap(group -> group.getFiles().stream())
-            .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, 
Function.identity()));
+            .collect(Collectors.toMap(InternalFile::getPhysicalPath, 
Function.identity()));
     return findNewAndRemovedFiles(latestFiles, previousFiles);
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index 3aee766e..0dcb7359 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -22,8 +22,12 @@ import java.util.Collections;
 import java.util.List;
 
 import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
 import lombok.NonNull;
-import lombok.Value;
+import lombok.ToString;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
 
 import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.stat.PartitionValue;
@@ -33,18 +37,17 @@ import org.apache.xtable.model.stat.PartitionValue;
  *
  * @since 0.1
  */
-@Builder(toBuilder = true)
-@Value
-public class InternalDataFile {
-  // physical path of the file (absolute with scheme)
-  @NonNull String physicalPath;
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class InternalDataFile extends InternalFile {
   // file format
   @Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET;
   // partition ranges for the data file
   @Builder.Default @NonNull List<PartitionValue> partitionValues = 
Collections.emptyList();
 
-  long fileSizeBytes;
-  long recordCount;
   // column stats for each column in the data file
   @Builder.Default @NonNull List<ColumnStat> columnStats = 
Collections.emptyList();
   // last modified time in millis since epoch
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java
new file mode 100644
index 00000000..5a898649
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.storage;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * This class is an internal representation of a logical storage file of a 
table and is the base
+ * class of all types of storage files. The most common type of storage file 
is a data-file, which
+ * contains the actual records of a table. Other examples of a storage file 
are positional delete
+ * files (containing ordinals of the records to be deleted from a data file), 
stat files, and index
+ * files. For completeness of the conversion process, XTable needs to 
recognize current storage
+ * files of a table, and also the storage files that are added and removed 
over time as the state of
+ * the table changes. Different table formats support different storage file 
types. This base file
+ * representation is generic and extensible and is needed to design stable 
interfaces.
+ */
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@ToString(callSuper = true)
+@EqualsAndHashCode
+@AllArgsConstructor
+@Getter
+public abstract class InternalFile {
+  // Absolute path of the storage file, with the scheme, that contains this 
logical file. Typically,
+  // one physical storage file contains only one base file, for e.g. a parquet 
data file. However,
+  // in some cases, one storage file can contain multiple logical storage 
files for optimizations.
+  @NonNull String physicalPath;
+
+  // The size of the logical file in the physical storage file.
+  long fileSizeBytes;
+
+  // The number of records in the storage file.
+  long recordCount;
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
similarity index 62%
rename from 
xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
rename to 
xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
index 9025ec31..2cb4ea9d 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.storage;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -31,17 +32,18 @@ import lombok.experimental.SuperBuilder;
 @Value
 @EqualsAndHashCode(callSuper = true)
 @SuperBuilder
-public class DataFilesDiff extends FilesDiff<InternalDataFile, 
InternalDataFile> {
+public class InternalFilesDiff extends FilesDiff<InternalFile, InternalFile> {
 
   /**
-   * Creates a DataFilesDiff from the list of files in the target table and 
the list of files in the
-   * source table.
+   * Creates a InternalFilesDiff from the list of files in the target table 
and the list of files in
+   * the source table.
    *
    * @param source list of files currently in the source table
    * @param target list of files currently in the target table
    * @return files that need to be added and removed for the target table 
match the source table
    */
-  public static DataFilesDiff from(List<InternalDataFile> source, 
List<InternalDataFile> target) {
+  public static InternalFilesDiff from(
+      List<InternalDataFile> source, List<InternalDataFile> target) {
     Map<String, InternalDataFile> targetPaths =
         target.stream()
             .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, 
Function.identity()));
@@ -51,9 +53,35 @@ public class DataFilesDiff extends 
FilesDiff<InternalDataFile, InternalDataFile>
 
     FilesDiff<InternalDataFile, InternalDataFile> diff =
         findNewAndRemovedFiles(sourcePaths, targetPaths);
-    return DataFilesDiff.builder()
+    return InternalFilesDiff.builder()
         .filesAdded(diff.getFilesAdded())
         .filesRemoved(diff.getFilesRemoved())
         .build();
   }
+
+  /**
+   * Filters files of type {@link InternalDataFile} from the list of files 
added to the source table
+   * and returns the list.
+   */
+  public Set<InternalDataFile> dataFilesAdded() {
+    Set<InternalDataFile> result =
+        getFilesAdded().stream()
+            .filter(InternalDataFile.class::isInstance)
+            .map(file -> (InternalDataFile) file)
+            .collect(Collectors.toSet());
+    return result;
+  }
+
+  /**
+   * Filters files of type {@link InternalDataFile} from the list of files 
removed to the source
+   * table and returns the list.
+   */
+  public Set<InternalDataFile> dataFilesRemoved() {
+    Set<InternalDataFile> result =
+        getFilesRemoved().stream()
+            .filter(InternalDataFile.class::isInstance)
+            .map(file -> (InternalDataFile) file)
+            .collect(Collectors.toSet());
+    return result;
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
index 500fa1b5..e7eadfd5 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
@@ -28,12 +28,12 @@ import lombok.Value;
 
 import org.apache.xtable.model.stat.PartitionValue;
 
-/** Represents a grouping of {@link InternalDataFile} with the same partition 
values. */
+/** Represents a grouping of {@link InternalFile} with the same partition 
values. */
 @Value
 @Builder
 public class PartitionFileGroup {
   List<PartitionValue> partitionValues;
-  List<InternalDataFile> files;
+  List<? extends InternalFile> files;
 
   public static List<PartitionFileGroup> fromFiles(List<InternalDataFile> 
files) {
     return fromFiles(files.stream());
@@ -51,4 +51,12 @@ public class PartitionFileGroup {
                     .build())
         .collect(Collectors.toList());
   }
+
+  /** Filters storage files of type {@link InternalDataFile} and returns them. 
*/
+  public List<InternalDataFile> getDataFiles() {
+    return files.stream()
+        .filter(InternalDataFile.class::isInstance)
+        .map(file -> (InternalDataFile) file)
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index 476578cb..b763be8b 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -28,7 +28,7 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 /** A client that provides the major functionality for syncing changes to a 
target system. */
@@ -68,9 +68,9 @@ public interface ConversionTarget {
    * Syncs the changes in files to the target system. This method is required 
to both add and remove
    * files.
    *
-   * @param dataFilesDiff the diff that needs to be synced
+   * @param internalFilesDiff the diff that needs to be synced
    */
-  void syncFilesForDiff(DataFilesDiff dataFilesDiff);
+  void syncFilesForDiff(InternalFilesDiff internalFilesDiff);
 
   /**
    * Starts the sync and performs any initialization required
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
 
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
index 6896dce8..0e780bb3 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
@@ -37,13 +37,13 @@ public class TestDataFilesDiff {
         
InternalDataFile.builder().physicalPath("file://already_in_target2.parquet").build();
     InternalDataFile sourceFileInTargetAlready =
         
InternalDataFile.builder().physicalPath("file://already_in_target3.parquet").build();
-    DataFilesDiff actual =
-        DataFilesDiff.from(
+    InternalFilesDiff actual =
+        InternalFilesDiff.from(
             Arrays.asList(sourceFile1, sourceFile2, sourceFileInTargetAlready),
             Arrays.asList(targetFile1, targetFile2, 
sourceFileInTargetAlready));
 
-    DataFilesDiff expected =
-        DataFilesDiff.builder()
+    InternalFilesDiff expected =
+        InternalFilesDiff.builder()
             .filesAdded(Arrays.asList(sourceFile1, sourceFile2))
             .filesRemoved(Arrays.asList(targetFile1, targetFile2))
             .build();
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java 
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
index 32e33988..1776c345 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
@@ -50,7 +50,7 @@ public class TestFilesDiff {
     previousFiles.put("file2NoGroup", file2);
     previousFiles.put("file2Group2", file3);
 
-    FilesDiff<InternalDataFile, File> diff =
+    FilesDiff<InternalFile, File> diff =
         FilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
     assertEquals(2, diff.getFilesAdded().size());
     assertTrue(diff.getFilesAdded().contains(file1Group2));
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
 
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 5c7f8cfc..9271acf3 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -41,8 +41,8 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 public class TestExtractFromSource {
@@ -89,7 +89,7 @@ public class TestExtractFromSource {
         TableChange.builder()
             .tableAsOfChange(tableAtFirstInstant)
             .filesDiff(
-                
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+                
InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
             .sourceIdentifier("0")
             .build();
     when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
@@ -98,7 +98,7 @@ public class TestExtractFromSource {
         TableChange.builder()
             .tableAsOfChange(tableAtFirstInstant)
             .filesDiff(
-                
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+                
InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
             .sourceIdentifier("0")
             .build();
 
@@ -112,7 +112,7 @@ public class TestExtractFromSource {
         TableChange.builder()
             .tableAsOfChange(tableAtSecondInstant)
             .filesDiff(
-                DataFilesDiff.builder()
+                InternalFilesDiff.builder()
                     .filesAdded(Arrays.asList(newFile2, newFile3))
                     .filesRemoved(Arrays.asList(initialFile3, newFile1))
                     .build())
@@ -124,7 +124,7 @@ public class TestExtractFromSource {
         TableChange.builder()
             .tableAsOfChange(tableAtSecondInstant)
             .filesDiff(
-                DataFilesDiff.builder()
+                InternalFilesDiff.builder()
                     .filesAdded(Arrays.asList(newFile2, newFile3))
                     .filesRemoved(Arrays.asList(initialFile3, newFile1))
                     .build())
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 852d4e13..e15e3301 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -48,8 +48,8 @@ import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
@@ -129,27 +129,27 @@ public class TestTableFormatSync {
   void syncChangesWithFailureForOneFormat() {
     Instant start = Instant.now();
     InternalTable tableState1 = getTableState(1);
-    DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+    InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
         TableChange.builder()
             .tableAsOfChange(tableState1)
-            .filesDiff(dataFilesDiff1)
+            .filesDiff(internalFilesDiff1)
             .sourceIdentifier("0")
             .build();
     InternalTable tableState2 = getTableState(2);
-    DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
+    InternalFilesDiff internalFilesDiff2 = getFilesDiff(2);
     TableChange tableChange2 =
         TableChange.builder()
             .tableAsOfChange(tableState2)
-            .filesDiff(dataFilesDiff2)
+            .filesDiff(internalFilesDiff2)
             .sourceIdentifier("1")
             .build();
     InternalTable tableState3 = getTableState(3);
-    DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
+    InternalFilesDiff internalFilesDiff3 = getFilesDiff(3);
     TableChange tableChange3 =
         TableChange.builder()
             .tableAsOfChange(tableState3)
-            .filesDiff(dataFilesDiff3)
+            .filesDiff(internalFilesDiff3)
             .sourceIdentifier("2")
             .build();
 
@@ -224,25 +224,25 @@ public class TestTableFormatSync {
         tableState1,
         pendingCommitInstants,
         tableChange1.getSourceIdentifier());
-    verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
+    verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1);
     verifyBaseConversionTargetCalls(
         mockConversionTarget2,
         tableState1,
         pendingCommitInstants,
         tableChange1.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1);
     verifyBaseConversionTargetCalls(
         mockConversionTarget2,
         tableState2,
         pendingCommitInstants,
         tableChange2.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2);
     verifyBaseConversionTargetCalls(
         mockConversionTarget2,
         tableState3,
         pendingCommitInstants,
         tableChange3.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3);
     verify(mockConversionTarget1, times(1)).completeSync();
     verify(mockConversionTarget2, times(3)).completeSync();
   }
@@ -251,27 +251,27 @@ public class TestTableFormatSync {
   void syncChangesWithDifferentFormatsAndMetadata() {
     Instant start = Instant.now();
     InternalTable tableState1 = getTableState(1);
-    DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+    InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
         TableChange.builder()
             .tableAsOfChange(tableState1)
-            .filesDiff(dataFilesDiff1)
+            .filesDiff(internalFilesDiff1)
             .sourceIdentifier("0")
             .build();
     InternalTable tableState2 = getTableState(2);
-    DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
+    InternalFilesDiff internalFilesDiff2 = getFilesDiff(2);
     TableChange tableChange2 =
         TableChange.builder()
             .tableAsOfChange(tableState2)
-            .filesDiff(dataFilesDiff2)
+            .filesDiff(internalFilesDiff2)
             .sourceIdentifier("1")
             .build();
     InternalTable tableState3 = getTableState(3);
-    DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
+    InternalFilesDiff internalFilesDiff3 = getFilesDiff(3);
     TableChange tableChange3 =
         TableChange.builder()
             .tableAsOfChange(tableState3)
-            .filesDiff(dataFilesDiff3)
+            .filesDiff(internalFilesDiff3)
             .sourceIdentifier("2")
             .build();
 
@@ -346,13 +346,13 @@ public class TestTableFormatSync {
         tableState1,
         pendingCommitInstants,
         tableChange1.getSourceIdentifier());
-    verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
+    verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1);
     verifyBaseConversionTargetCalls(
         mockConversionTarget1,
         tableState3,
         pendingCommitInstants,
         tableChange3.getSourceIdentifier());
-    verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
+    verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff3);
     verify(mockConversionTarget1, times(2)).completeSync();
     // conversionTarget2 syncs table changes 2 and 3
     verifyBaseConversionTargetCalls(
@@ -360,13 +360,13 @@ public class TestTableFormatSync {
         tableState2,
         pendingCommitInstants,
         tableChange2.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2);
     verifyBaseConversionTargetCalls(
         mockConversionTarget2,
         tableState3,
         pendingCommitInstants,
         tableChange3.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3);
     verify(mockConversionTarget2, times(2)).completeSync();
   }
 
@@ -374,11 +374,11 @@ public class TestTableFormatSync {
   void syncChangesOneFormatWithNoRequiredChanges() {
     Instant start = Instant.now();
     InternalTable tableState1 = getTableState(1);
-    DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+    InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
         TableChange.builder()
             .tableAsOfChange(tableState1)
-            .filesDiff(dataFilesDiff1)
+            .filesDiff(internalFilesDiff1)
             .sourceIdentifier("0")
             .build();
 
@@ -426,7 +426,7 @@ public class TestTableFormatSync {
         tableState1,
         pendingCommitInstants,
         tableChange1.getSourceIdentifier());
-    verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
+    verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1);
   }
 
   /**
@@ -453,8 +453,8 @@ public class TestTableFormatSync {
         .build();
   }
 
-  private DataFilesDiff getFilesDiff(int id) {
-    return DataFilesDiff.builder()
+  private InternalFilesDiff getFilesDiff(int id) {
+    return InternalFilesDiff.builder()
         .filesAdded(
             Collections.singletonList(
                 InternalDataFile.builder()
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index de4a2b69..97804d5f 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -51,9 +51,9 @@ import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.spi.extractor.ConversionSource;
 import org.apache.xtable.spi.extractor.DataFileIterator;
@@ -163,14 +163,14 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
       }
     }
 
-    DataFilesDiff dataFilesDiff =
-        DataFilesDiff.builder()
+    InternalFilesDiff internalFilesDiff =
+        InternalFilesDiff.builder()
             .filesAdded(addedFiles.values())
             .filesRemoved(removedFiles.values())
             .build();
     return TableChange.builder()
         .tableAsOfChange(tableAtVersion)
-        .filesDiff(dataFilesDiff)
+        .filesDiff(internalFilesDiff)
         .sourceIdentifier(getCommitIdentifier(versionNumber))
         .build();
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index fbe99801..b487e2cb 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -65,7 +65,7 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.schema.SparkSchemaExtractor;
@@ -193,10 +193,10 @@ public class DeltaConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+  public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
     transactionState.setActions(
         dataFileUpdatesExtractor.applyDiff(
-            dataFilesDiff,
+            internalFilesDiff,
             transactionState.getLatestSchemaInternal(),
             deltaLog.dataPath().toString()));
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
index 96776750..caee22f6 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
@@ -39,9 +39,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.paths.PathUtils;
 
@@ -74,30 +75,32 @@ public class DeltaDataFileUpdatesExtractor {
                     file -> DeltaActionsConverter.getFullPathToFile(snapshot, 
file.path()),
                     file -> file));
 
-    FilesDiff<InternalDataFile, Action> diff =
-        DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, 
previousFiles);
+    FilesDiff<InternalFile, Action> diff =
+        InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, 
previousFiles);
 
     return applyDiff(
         diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, 
deltaLog.dataPath().toString());
   }
 
   public Seq<Action> applyDiff(
-      DataFilesDiff dataFilesDiff, InternalSchema tableSchema, String 
tableBasePath) {
+      InternalFilesDiff internalFilesDiff, InternalSchema tableSchema, String 
tableBasePath) {
     List<Action> removeActions =
-        dataFilesDiff.getFilesRemoved().stream()
+        internalFilesDiff.dataFilesRemoved().stream()
             .flatMap(dFile -> createAddFileAction(dFile, tableSchema, 
tableBasePath))
             .map(AddFile::remove)
-            
.collect(CustomCollectors.toList(dataFilesDiff.getFilesRemoved().size()));
-    return applyDiff(dataFilesDiff.getFilesAdded(), removeActions, 
tableSchema, tableBasePath);
+            
.collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size()));
+    return applyDiff(internalFilesDiff.dataFilesAdded(), removeActions, 
tableSchema, tableBasePath);
   }
 
   private Seq<Action> applyDiff(
-      Set<InternalDataFile> filesAdded,
+      Set<? extends InternalFile> filesAdded,
       Collection<Action> removeFileActions,
       InternalSchema tableSchema,
       String tableBasePath) {
     Stream<Action> addActions =
         filesAdded.stream()
+            .filter(InternalDataFile.class::isInstance)
+            .map(file -> (InternalDataFile) file)
             .flatMap(dFile -> createAddFileAction(dFile, tableSchema, 
tableBasePath));
     int totalActions = filesAdded.size() + removeFileActions.size();
     List<Action> allActions =
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index bef00013..ba1f9f9d 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -54,8 +54,8 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 @AllArgsConstructor(staticName = "of")
@@ -96,7 +96,7 @@ public class BaseFileUpdatesExtractor {
         partitionedDataFiles.stream()
             .map(
                 partitionFileGroup -> {
-                  List<InternalDataFile> dataFiles = 
partitionFileGroup.getFiles();
+                  List<InternalDataFile> dataFiles = 
partitionFileGroup.getDataFiles();
                   String partitionPath = getPartitionPath(tableBasePath, 
dataFiles);
                   // remove the partition from the set of partitions to drop 
since it is present in
                   // the snapshot
@@ -163,16 +163,17 @@ public class BaseFileUpdatesExtractor {
   }
 
   /**
-   * Converts the provided {@link DataFilesDiff}.
+   * Converts the provided {@link InternalFilesDiff}.
    *
-   * @param dataFilesDiff the diff to apply to the Hudi table
+   * @param internalFilesDiff the diff to apply to the Hudi table
    * @param commit The current commit started by the Hudi client
    * @return The information needed to create a "replace" commit for the Hudi 
table
    */
-  ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull 
String commit) {
+  ReplaceMetadata convertDiff(
+      @NonNull InternalFilesDiff internalFilesDiff, @NonNull String commit) {
     // For all removed files, group by partition and extract the file id
     Map<String, List<String>> partitionToReplacedFileIds =
-        dataFilesDiff.getFilesRemoved().stream()
+        internalFilesDiff.dataFilesRemoved().stream()
             .map(file -> new CachingPath(file.getPhysicalPath()))
             .collect(
                 Collectors.groupingBy(
@@ -180,9 +181,9 @@ public class BaseFileUpdatesExtractor {
                     Collectors.mapping(this::getFileId, Collectors.toList())));
     // For all added files, group by partition and extract the file id
     List<WriteStatus> writeStatuses =
-        dataFilesDiff.getFilesAdded().stream()
+        internalFilesDiff.dataFilesAdded().stream()
             .map(file -> toWriteStatus(tableBasePath, commit, file, 
Optional.empty()))
-            
.collect(CustomCollectors.toList(dataFilesDiff.getFilesAdded().size()));
+            
.collect(CustomCollectors.toList(internalFilesDiff.dataFilesAdded().size()));
     return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses);
   }
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index 97c2f9e3..b8aad22d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -87,7 +87,7 @@ import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.sync.ConversionTarget;
@@ -251,9 +251,9 @@ public class HudiConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+  public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
     BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata =
-        baseFileUpdatesExtractor.convertDiff(dataFilesDiff, 
commitState.getInstantTime());
+        baseFileUpdatesExtractor.convertDiff(internalFilesDiff, 
commitState.getInstantTime());
     commitState.setReplaceMetadata(replaceMetadata);
   }
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index 5739328c..77c0ca98 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -65,9 +65,9 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.exception.ParseException;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 /** Extracts all the files for Hudi table represented by {@link 
InternalTable}. */
@@ -122,7 +122,7 @@ public class HudiDataFileExtractor implements AutoCloseable 
{
     }
   }
 
-  public DataFilesDiff getDiffForCommit(
+  public InternalFilesDiff getDiffForCommit(
       HoodieInstant hoodieInstantForDiff,
       InternalTable table,
       HoodieInstant instant,
@@ -139,7 +139,7 @@ public class HudiDataFileExtractor implements AutoCloseable 
{
             .collect(Collectors.toList());
     List<InternalDataFile> filesRemoved = allInfo.getRemoved();
 
-    return 
DataFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build();
+    return 
InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build();
   }
 
   private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo(
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index cb75b6d0..5f210f1f 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -58,9 +58,9 @@ import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.extractor.ConversionSource;
@@ -193,8 +193,11 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
             .map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable))
             .collect(Collectors.toSet());
 
-    DataFilesDiff filesDiff =
-        
DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build();
+    InternalFilesDiff filesDiff =
+        InternalFilesDiff.builder()
+            .filesAdded(dataFilesAdded)
+            .filesRemoved(dataFilesRemoved)
+            .build();
 
     InternalTable table = getTable(snapshot);
     return TableChange.builder()
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index 69faf1ea..a57ac4f6 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -45,7 +45,7 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.sync.ConversionTarget;
@@ -209,10 +209,10 @@ public class IcebergConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+  public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
     dataFileUpdatesExtractor.applyDiff(
         transaction,
-        dataFilesDiff,
+        internalFilesDiff,
         transaction.table().schema(),
         transaction.table().spec(),
         tableSyncMetadata);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 0c8fa9a4..77a4eeed 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -31,9 +31,10 @@ import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 @AllArgsConstructor(staticName = "of")
@@ -59,8 +60,8 @@ public class IcebergDataFileUpdatesSync {
       throw new ReadException("Failed to iterate through Iceberg data files", 
e);
     }
 
-    FilesDiff<InternalDataFile, DataFile> diff =
-        DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, 
previousFiles);
+    FilesDiff<InternalFile, DataFile> diff =
+        InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, 
previousFiles);
 
     applyDiff(
         transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, 
partitionSpec, metadata);
@@ -68,29 +69,37 @@ public class IcebergDataFileUpdatesSync {
 
   public void applyDiff(
       Transaction transaction,
-      DataFilesDiff dataFilesDiff,
+      InternalFilesDiff internalFilesDiff,
       Schema schema,
       PartitionSpec partitionSpec,
       TableSyncMetadata metadata) {
 
     Collection<DataFile> filesRemoved =
-        dataFilesDiff.getFilesRemoved().stream()
+        internalFilesDiff.dataFilesRemoved().stream()
             .map(file -> getDataFile(partitionSpec, schema, file))
             .collect(Collectors.toList());
 
     applyDiff(
-        transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, 
partitionSpec, metadata);
+        transaction,
+        internalFilesDiff.dataFilesAdded(),
+        filesRemoved,
+        schema,
+        partitionSpec,
+        metadata);
   }
 
   private void applyDiff(
       Transaction transaction,
-      Collection<InternalDataFile> filesAdded,
+      Collection<? extends InternalFile> filesAdded,
       Collection<DataFile> filesRemoved,
       Schema schema,
       PartitionSpec partitionSpec,
       TableSyncMetadata metadata) {
     OverwriteFiles overwriteFiles = transaction.newOverwrite();
-    filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, 
schema, f)));
+    filesAdded.stream()
+        .filter(InternalDataFile.class::isInstance)
+        .map(file -> (InternalDataFile) file)
+        .forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, 
schema, f)));
     filesRemoved.forEach(overwriteFiles::deleteFile);
     overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
     overwriteFiles.commit();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java 
b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index a20ae4e7..9e95f279 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -39,7 +39,7 @@ public class ValidationTestHelper {
     assertNotNull(internalSnapshot.getTable());
     List<String> filePaths =
         internalSnapshot.getPartitionedDataFiles().stream()
-            .flatMap(group -> group.getFiles().stream())
+            .flatMap(group -> group.getDataFiles().stream())
             .map(InternalDataFile::getPhysicalPath)
             .collect(Collectors.toList());
     replaceFileScheme(allActivePaths);
@@ -83,14 +83,14 @@ public class ValidationTestHelper {
         filesForCommitBefore.stream()
             .filter(file -> !filesForCommitAfter.contains(file))
             .collect(Collectors.toSet());
-    assertEquals(filesAdded, 
extractPathsFromDataFile(tableChange.getFilesDiff().getFilesAdded()));
+    assertEquals(filesAdded, 
extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesAdded()));
     assertEquals(
-        filesRemoved, 
extractPathsFromDataFile(tableChange.getFilesDiff().getFilesRemoved()));
+        filesRemoved, 
extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesRemoved()));
   }
 
   public static List<String> getAllFilePaths(InternalSnapshot 
internalSnapshot) {
     return internalSnapshot.getPartitionedDataFiles().stream()
-        .flatMap(fileGroup -> fileGroup.getFiles().stream())
+        .flatMap(fileGroup -> fileGroup.getDataFiles().stream())
         .map(InternalDataFile::getPhysicalPath)
         .collect(Collectors.toList());
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
index 3b1cc529..4afb6bdb 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
@@ -704,7 +704,7 @@ public class ITDeltaConversionSource {
       throws URISyntaxException {
     assertEquals(
         expectedPartitionFiles.getPartitionValues(), 
actualPartitionFiles.getPartitionValues());
-    validateDataFiles(expectedPartitionFiles.getFiles(), 
actualPartitionFiles.getFiles());
+    validateDataFiles(expectedPartitionFiles.getDataFiles(), 
actualPartitionFiles.getDataFiles());
   }
 
   private void validateDataFiles(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
index 82125d25..99965f1f 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
@@ -84,10 +84,10 @@ import 
org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.stat.PartitionValue;
 import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.sync.ConversionTarget;
@@ -195,8 +195,8 @@ public class ITHudiConversionTarget {
     String fileName = "file_1.parquet";
     String filePath = getFilePath(partitionPath, fileName);
 
-    DataFilesDiff dataFilesDiff =
-        DataFilesDiff.builder()
+    InternalFilesDiff internalFilesDiff =
+        InternalFilesDiff.builder()
             .fileAdded(getTestFile(partitionPath, fileName))
             .fileRemoved(fileToRemove)
             .build();
@@ -204,7 +204,7 @@ public class ITHudiConversionTarget {
     HudiConversionTarget targetClient = getTargetClient();
     InternalTable initialState = getState(Instant.now());
     targetClient.beginSync(initialState);
-    targetClient.syncFilesForDiff(dataFilesDiff);
+    targetClient.syncFilesForDiff(internalFilesDiff);
     targetClient.syncSchema(SCHEMA);
     TableSyncMetadata latestState =
         TableSyncMetadata.of(
@@ -518,11 +518,11 @@ public class ITHudiConversionTarget {
       List<InternalDataFile> filesToRemove,
       Instant commitStart,
       String sourceIdentifier) {
-    DataFilesDiff dataFilesDiff2 =
-        
DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
+    InternalFilesDiff internalFilesDiff2 =
+        
InternalFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
     InternalTable state3 = getState(commitStart);
     conversionTarget.beginSync(state3);
-    conversionTarget.syncFilesForDiff(dataFilesDiff2);
+    conversionTarget.syncFilesForDiff(internalFilesDiff2);
     TableSyncMetadata latestState =
         TableSyncMetadata.of(
             state3.getLatestCommitTime(), Collections.emptyList(), "TEST", 
sourceIdentifier);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
index 8f3b3f7e..4958714c 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
@@ -59,9 +59,9 @@ import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.stat.PartitionValue;
 import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.testutil.ColumnStatMapUtil;
 
@@ -119,8 +119,8 @@ public class TestBaseFileUpdatesExtractor {
             String.format("%s/%s/%s", tableBasePath, partitionPath2, 
fileName5),
             Collections.emptyList());
 
-    DataFilesDiff diff =
-        DataFilesDiff.builder()
+    InternalFilesDiff diff =
+        InternalFilesDiff.builder()
             .filesAdded(Arrays.asList(addedFile1, addedFile2))
             .filesRemoved(Arrays.asList(removedFile1, removedFile2, 
removedFile3))
             .build();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
index da1ec033..d165053f 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
@@ -49,7 +49,7 @@ import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 
 /**
@@ -212,7 +212,7 @@ public class TestHudiConversionTarget {
     HudiConversionTarget.CommitState mockCommitState =
         initMocksForBeginSync(targetClient).getLeft();
     String instant = "commit";
-    DataFilesDiff input = DataFilesDiff.builder().build();
+    InternalFilesDiff input = InternalFilesDiff.builder().build();
     BaseFileUpdatesExtractor.ReplaceMetadata output =
         BaseFileUpdatesExtractor.ReplaceMetadata.of(
             Collections.emptyMap(), Collections.emptyList());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index 4006b543..ab13ae2d 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -148,7 +148,7 @@ class TestIcebergConversionSource {
     List<PartitionFileGroup> dataFileChunks = 
internalSnapshot.getPartitionedDataFiles();
     assertEquals(5, dataFileChunks.size());
     for (PartitionFileGroup dataFilesChunk : dataFileChunks) {
-      List<InternalDataFile> internalDataFiles = dataFilesChunk.getFiles();
+      List<InternalDataFile> internalDataFiles = dataFilesChunk.getDataFiles();
       assertEquals(1, internalDataFiles.size());
       InternalDataFile internalDataFile = internalDataFiles.get(0);
       assertEquals(FileFormat.APACHE_PARQUET, 
internalDataFile.getFileFormat());
@@ -315,7 +315,7 @@ class TestIcebergConversionSource {
     TableChange tableChange = 
conversionSource.getTableChangeForCommit(snapshot);
     assertEquals(addedFiles, 
tableChange.getFilesDiff().getFilesAdded().size());
     assertTrue(
-        tableChange.getFilesDiff().getFilesAdded().stream()
+        tableChange.getFilesDiff().dataFilesAdded().stream()
             .allMatch(file -> file.getColumnStats().size() == 
numberOfColumns));
     assertEquals(removedFiles, 
tableChange.getFilesDiff().getFilesRemoved().size());
   }

Reply via email to