This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new c76692b8 Add base representation of storage files
c76692b8 is described below
commit c76692b85a646adb449a263652747c1c87de37b8
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Sun Feb 16 12:48:22 2025 -0800
Add base representation of storage files
---
.../java/org/apache/xtable/model/TableChange.java | 4 +-
.../org/apache/xtable/model/storage/FilesDiff.java | 6 +--
.../xtable/model/storage/InternalDataFile.java | 19 ++++----
.../apache/xtable/model/storage/InternalFile.java | 56 ++++++++++++++++++++++
.../{DataFilesDiff.java => InternalFilesDiff.java} | 38 +++++++++++++--
.../xtable/model/storage/PartitionFileGroup.java | 12 ++++-
.../apache/xtable/spi/sync/ConversionTarget.java | 6 +--
.../xtable/model/storage/TestDataFilesDiff.java | 8 ++--
.../apache/xtable/model/storage/TestFilesDiff.java | 2 +-
.../spi/extractor/TestExtractFromSource.java | 10 ++--
.../xtable/spi/sync/TestTableFormatSync.java | 52 ++++++++++----------
.../apache/xtable/delta/DeltaConversionSource.java | 8 ++--
.../apache/xtable/delta/DeltaConversionTarget.java | 6 +--
.../delta/DeltaDataFileUpdatesExtractor.java | 19 ++++----
.../xtable/hudi/BaseFileUpdatesExtractor.java | 17 +++----
.../apache/xtable/hudi/HudiConversionTarget.java | 6 +--
.../apache/xtable/hudi/HudiDataFileExtractor.java | 6 +--
.../xtable/iceberg/IcebergConversionSource.java | 9 ++--
.../xtable/iceberg/IcebergConversionTarget.java | 6 +--
.../xtable/iceberg/IcebergDataFileUpdatesSync.java | 25 ++++++----
.../org/apache/xtable/ValidationTestHelper.java | 8 ++--
.../xtable/delta/ITDeltaConversionSource.java | 2 +-
.../apache/xtable/hudi/ITHudiConversionTarget.java | 14 +++---
.../xtable/hudi/TestBaseFileUpdatesExtractor.java | 6 +--
.../xtable/hudi/TestHudiConversionTarget.java | 4 +-
.../iceberg/TestIcebergConversionSource.java | 4 +-
26 files changed, 232 insertions(+), 121 deletions(-)
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index fe3907ee..b425fd01 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -22,7 +22,7 @@ import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
/**
* Captures the changes in a single commit/instant from the source table.
@@ -33,7 +33,7 @@ import org.apache.xtable.model.storage.DataFilesDiff;
@Builder(toBuilder = true)
public class TableChange {
// Change in files at the specified instant
- DataFilesDiff filesDiff;
+ InternalFilesDiff filesDiff;
/** The {@link InternalTable} at the commit time to which this table change
belongs. */
InternalTable tableAsOfChange;
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
index 7d628ce4..f86e64bc 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
@@ -92,12 +92,12 @@ public class FilesDiff<L, P> {
* @param <P> the type of the previous files
* @return the set of files that are added
*/
- public static <P> FilesDiff<InternalDataFile, P> findNewAndRemovedFiles(
+ public static <P> FilesDiff<InternalFile, P> findNewAndRemovedFiles(
List<PartitionFileGroup> latestFileGroups, Map<String, P> previousFiles)
{
- Map<String, InternalDataFile> latestFiles =
+ Map<String, InternalFile> latestFiles =
latestFileGroups.stream()
.flatMap(group -> group.getFiles().stream())
- .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalFile::getPhysicalPath,
Function.identity()));
return findNewAndRemovedFiles(latestFiles, previousFiles);
}
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index 3aee766e..0dcb7359 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -22,8 +22,12 @@ import java.util.Collections;
import java.util.List;
import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
import lombok.NonNull;
-import lombok.Value;
+import lombok.ToString;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
@@ -33,18 +37,17 @@ import org.apache.xtable.model.stat.PartitionValue;
*
* @since 0.1
*/
-@Builder(toBuilder = true)
-@Value
-public class InternalDataFile {
- // physical path of the file (absolute with scheme)
- @NonNull String physicalPath;
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class InternalDataFile extends InternalFile {
// file format
@Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET;
// partition ranges for the data file
@Builder.Default @NonNull List<PartitionValue> partitionValues =
Collections.emptyList();
- long fileSizeBytes;
- long recordCount;
// column stats for each column in the data file
@Builder.Default @NonNull List<ColumnStat> columnStats =
Collections.emptyList();
// last modified time in millis since epoch
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java
new file mode 100644
index 00000000..5a898649
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.storage;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * This class is an internal representation of a logical storage file of a
table and is the base
+ * class of all types of storage files. The most common type of storage file
is a data-file, which
+ * contains the actual records of a table. Other examples of a storage file
are positional delete
+ * files (containing ordinals of the records to be deleted from a data file),
stat files, and index
+ * files. For completeness of the conversion process, XTable needs to
recognize current storage
+ * files of a table, and also the storage files that are added and removed
over time as the state of
+ * the table changes. Different table formats support different storage file
types. This base file
+ * representation is generic and extensible and is needed to design stable
interfaces.
+ */
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@ToString(callSuper = true)
+@EqualsAndHashCode
+@AllArgsConstructor
+@Getter
+public abstract class InternalFile {
+ // Absolute path of the storage file, with the scheme, that contains this
logical file. Typically,
+ // one physical storage file contains only one base file, for e.g. a parquet
data file. However,
+ // in some cases, one storage file can contain multiple logical storage
files for optimizations.
+ @NonNull String physicalPath;
+
+ // The size of the logical file in the physical storage file.
+ long fileSizeBytes;
+
+ // The number of records in the storage file.
+ long recordCount;
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
similarity index 62%
rename from
xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
rename to
xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
index 9025ec31..2cb4ea9d 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.storage;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -31,17 +32,18 @@ import lombok.experimental.SuperBuilder;
@Value
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
-public class DataFilesDiff extends FilesDiff<InternalDataFile,
InternalDataFile> {
+public class InternalFilesDiff extends FilesDiff<InternalFile, InternalFile> {
/**
- * Creates a DataFilesDiff from the list of files in the target table and
the list of files in the
- * source table.
+ * Creates a InternalFilesDiff from the list of files in the target table
and the list of files in
+ * the source table.
*
* @param source list of files currently in the source table
* @param target list of files currently in the target table
* @return files that need to be added and removed for the target table
match the source table
*/
- public static DataFilesDiff from(List<InternalDataFile> source,
List<InternalDataFile> target) {
+ public static InternalFilesDiff from(
+ List<InternalDataFile> source, List<InternalDataFile> target) {
Map<String, InternalDataFile> targetPaths =
target.stream()
.collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
@@ -51,9 +53,35 @@ public class DataFilesDiff extends
FilesDiff<InternalDataFile, InternalDataFile>
FilesDiff<InternalDataFile, InternalDataFile> diff =
findNewAndRemovedFiles(sourcePaths, targetPaths);
- return DataFilesDiff.builder()
+ return InternalFilesDiff.builder()
.filesAdded(diff.getFilesAdded())
.filesRemoved(diff.getFilesRemoved())
.build();
}
+
+ /**
+ * Filters files of type {@link InternalDataFile} from the list of files
added to the source table
+ * and returns the list.
+ */
+ public Set<InternalDataFile> dataFilesAdded() {
+ Set<InternalDataFile> result =
+ getFilesAdded().stream()
+ .filter(InternalDataFile.class::isInstance)
+ .map(file -> (InternalDataFile) file)
+ .collect(Collectors.toSet());
+ return result;
+ }
+
+ /**
+ * Filters files of type {@link InternalDataFile} from the list of files
removed to the source
+ * table and returns the list.
+ */
+ public Set<InternalDataFile> dataFilesRemoved() {
+ Set<InternalDataFile> result =
+ getFilesRemoved().stream()
+ .filter(InternalDataFile.class::isInstance)
+ .map(file -> (InternalDataFile) file)
+ .collect(Collectors.toSet());
+ return result;
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
index 500fa1b5..e7eadfd5 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
@@ -28,12 +28,12 @@ import lombok.Value;
import org.apache.xtable.model.stat.PartitionValue;
-/** Represents a grouping of {@link InternalDataFile} with the same partition
values. */
+/** Represents a grouping of {@link InternalFile} with the same partition
values. */
@Value
@Builder
public class PartitionFileGroup {
List<PartitionValue> partitionValues;
- List<InternalDataFile> files;
+ List<? extends InternalFile> files;
public static List<PartitionFileGroup> fromFiles(List<InternalDataFile>
files) {
return fromFiles(files.stream());
@@ -51,4 +51,12 @@ public class PartitionFileGroup {
.build())
.collect(Collectors.toList());
}
+
+ /** Filters storage files of type {@link InternalDataFile} and returns them.
*/
+ public List<InternalDataFile> getDataFiles() {
+ return files.stream()
+ .filter(InternalDataFile.class::isInstance)
+ .map(file -> (InternalDataFile) file)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index 476578cb..b763be8b 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -28,7 +28,7 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
/** A client that provides the major functionality for syncing changes to a
target system. */
@@ -68,9 +68,9 @@ public interface ConversionTarget {
* Syncs the changes in files to the target system. This method is required
to both add and remove
* files.
*
- * @param dataFilesDiff the diff that needs to be synced
+ * @param internalFilesDiff the diff that needs to be synced
*/
- void syncFilesForDiff(DataFilesDiff dataFilesDiff);
+ void syncFilesForDiff(InternalFilesDiff internalFilesDiff);
/**
* Starts the sync and performs any initialization required
diff --git
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
index 6896dce8..0e780bb3 100644
---
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
+++
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
@@ -37,13 +37,13 @@ public class TestDataFilesDiff {
InternalDataFile.builder().physicalPath("file://already_in_target2.parquet").build();
InternalDataFile sourceFileInTargetAlready =
InternalDataFile.builder().physicalPath("file://already_in_target3.parquet").build();
- DataFilesDiff actual =
- DataFilesDiff.from(
+ InternalFilesDiff actual =
+ InternalFilesDiff.from(
Arrays.asList(sourceFile1, sourceFile2, sourceFileInTargetAlready),
Arrays.asList(targetFile1, targetFile2,
sourceFileInTargetAlready));
- DataFilesDiff expected =
- DataFilesDiff.builder()
+ InternalFilesDiff expected =
+ InternalFilesDiff.builder()
.filesAdded(Arrays.asList(sourceFile1, sourceFile2))
.filesRemoved(Arrays.asList(targetFile1, targetFile2))
.build();
diff --git
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
index 32e33988..1776c345 100644
---
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
+++
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
@@ -50,7 +50,7 @@ public class TestFilesDiff {
previousFiles.put("file2NoGroup", file2);
previousFiles.put("file2Group2", file3);
- FilesDiff<InternalDataFile, File> diff =
+ FilesDiff<InternalFile, File> diff =
FilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
assertEquals(2, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file1Group2));
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 5c7f8cfc..9271acf3 100644
---
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -41,8 +41,8 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
public class TestExtractFromSource {
@@ -89,7 +89,7 @@ public class TestExtractFromSource {
TableChange.builder()
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
-
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+
InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();
when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
@@ -98,7 +98,7 @@ public class TestExtractFromSource {
TableChange.builder()
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
-
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+
InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();
@@ -112,7 +112,7 @@ public class TestExtractFromSource {
TableChange.builder()
.tableAsOfChange(tableAtSecondInstant)
.filesDiff(
- DataFilesDiff.builder()
+ InternalFilesDiff.builder()
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
@@ -124,7 +124,7 @@ public class TestExtractFromSource {
TableChange.builder()
.tableAsOfChange(tableAtSecondInstant)
.filesDiff(
- DataFilesDiff.builder()
+ InternalFilesDiff.builder()
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 852d4e13..e15e3301 100644
---
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -48,8 +48,8 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
@@ -129,27 +129,27 @@ public class TestTableFormatSync {
void syncChangesWithFailureForOneFormat() {
Instant start = Instant.now();
InternalTable tableState1 = getTableState(1);
- DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+ InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder()
.tableAsOfChange(tableState1)
- .filesDiff(dataFilesDiff1)
+ .filesDiff(internalFilesDiff1)
.sourceIdentifier("0")
.build();
InternalTable tableState2 = getTableState(2);
- DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
+ InternalFilesDiff internalFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder()
.tableAsOfChange(tableState2)
- .filesDiff(dataFilesDiff2)
+ .filesDiff(internalFilesDiff2)
.sourceIdentifier("1")
.build();
InternalTable tableState3 = getTableState(3);
- DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
+ InternalFilesDiff internalFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder()
.tableAsOfChange(tableState3)
- .filesDiff(dataFilesDiff3)
+ .filesDiff(internalFilesDiff3)
.sourceIdentifier("2")
.build();
@@ -224,25 +224,25 @@ public class TestTableFormatSync {
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
- verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
+ verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3);
verify(mockConversionTarget1, times(1)).completeSync();
verify(mockConversionTarget2, times(3)).completeSync();
}
@@ -251,27 +251,27 @@ public class TestTableFormatSync {
void syncChangesWithDifferentFormatsAndMetadata() {
Instant start = Instant.now();
InternalTable tableState1 = getTableState(1);
- DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+ InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder()
.tableAsOfChange(tableState1)
- .filesDiff(dataFilesDiff1)
+ .filesDiff(internalFilesDiff1)
.sourceIdentifier("0")
.build();
InternalTable tableState2 = getTableState(2);
- DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
+ InternalFilesDiff internalFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder()
.tableAsOfChange(tableState2)
- .filesDiff(dataFilesDiff2)
+ .filesDiff(internalFilesDiff2)
.sourceIdentifier("1")
.build();
InternalTable tableState3 = getTableState(3);
- DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
+ InternalFilesDiff internalFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder()
.tableAsOfChange(tableState3)
- .filesDiff(dataFilesDiff3)
+ .filesDiff(internalFilesDiff3)
.sourceIdentifier("2")
.build();
@@ -346,13 +346,13 @@ public class TestTableFormatSync {
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
- verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
+ verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
- verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
+ verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff3);
verify(mockConversionTarget1, times(2)).completeSync();
// conversionTarget2 syncs table changes 2 and 3
verifyBaseConversionTargetCalls(
@@ -360,13 +360,13 @@ public class TestTableFormatSync {
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3);
verify(mockConversionTarget2, times(2)).completeSync();
}
@@ -374,11 +374,11 @@ public class TestTableFormatSync {
void syncChangesOneFormatWithNoRequiredChanges() {
Instant start = Instant.now();
InternalTable tableState1 = getTableState(1);
- DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
+ InternalFilesDiff internalFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder()
.tableAsOfChange(tableState1)
- .filesDiff(dataFilesDiff1)
+ .filesDiff(internalFilesDiff1)
.sourceIdentifier("0")
.build();
@@ -426,7 +426,7 @@ public class TestTableFormatSync {
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
- verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
+ verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1);
}
/**
@@ -453,8 +453,8 @@ public class TestTableFormatSync {
.build();
}
- private DataFilesDiff getFilesDiff(int id) {
- return DataFilesDiff.builder()
+ private InternalFilesDiff getFilesDiff(int id) {
+ return InternalFilesDiff.builder()
.filesAdded(
Collections.singletonList(
InternalDataFile.builder()
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index de4a2b69..97804d5f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -51,9 +51,9 @@ import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.spi.extractor.ConversionSource;
import org.apache.xtable.spi.extractor.DataFileIterator;
@@ -163,14 +163,14 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
}
}
- DataFilesDiff dataFilesDiff =
- DataFilesDiff.builder()
+ InternalFilesDiff internalFilesDiff =
+ InternalFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
- .filesDiff(dataFilesDiff)
+ .filesDiff(internalFilesDiff)
.sourceIdentifier(getCommitIdentifier(versionNumber))
.build();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index fbe99801..b487e2cb 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -65,7 +65,7 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.schema.SparkSchemaExtractor;
@@ -193,10 +193,10 @@ public class DeltaConversionTarget implements
ConversionTarget {
}
@Override
- public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+ public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
transactionState.setActions(
dataFileUpdatesExtractor.applyDiff(
- dataFilesDiff,
+ internalFilesDiff,
transactionState.getLatestSchemaInternal(),
deltaLog.dataPath().toString()));
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
index 96776750..caee22f6 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
@@ -39,9 +39,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.paths.PathUtils;
@@ -74,30 +75,32 @@ public class DeltaDataFileUpdatesExtractor {
file -> DeltaActionsConverter.getFullPathToFile(snapshot,
file.path()),
file -> file));
- FilesDiff<InternalDataFile, Action> diff =
- DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
+ FilesDiff<InternalFile, Action> diff =
+ InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
return applyDiff(
diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema,
deltaLog.dataPath().toString());
}
public Seq<Action> applyDiff(
- DataFilesDiff dataFilesDiff, InternalSchema tableSchema, String
tableBasePath) {
+ InternalFilesDiff internalFilesDiff, InternalSchema tableSchema, String
tableBasePath) {
List<Action> removeActions =
- dataFilesDiff.getFilesRemoved().stream()
+ internalFilesDiff.dataFilesRemoved().stream()
.flatMap(dFile -> createAddFileAction(dFile, tableSchema,
tableBasePath))
.map(AddFile::remove)
-
.collect(CustomCollectors.toList(dataFilesDiff.getFilesRemoved().size()));
- return applyDiff(dataFilesDiff.getFilesAdded(), removeActions,
tableSchema, tableBasePath);
+
.collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size()));
+ return applyDiff(internalFilesDiff.dataFilesAdded(), removeActions,
tableSchema, tableBasePath);
}
private Seq<Action> applyDiff(
- Set<InternalDataFile> filesAdded,
+ Set<? extends InternalFile> filesAdded,
Collection<Action> removeFileActions,
InternalSchema tableSchema,
String tableBasePath) {
Stream<Action> addActions =
filesAdded.stream()
+ .filter(InternalDataFile.class::isInstance)
+ .map(file -> (InternalDataFile) file)
.flatMap(dFile -> createAddFileAction(dFile, tableSchema,
tableBasePath));
int totalActions = filesAdded.size() + removeFileActions.size();
List<Action> allActions =
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index bef00013..ba1f9f9d 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -54,8 +54,8 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
@AllArgsConstructor(staticName = "of")
@@ -96,7 +96,7 @@ public class BaseFileUpdatesExtractor {
partitionedDataFiles.stream()
.map(
partitionFileGroup -> {
- List<InternalDataFile> dataFiles =
partitionFileGroup.getFiles();
+ List<InternalDataFile> dataFiles =
partitionFileGroup.getDataFiles();
String partitionPath = getPartitionPath(tableBasePath,
dataFiles);
// remove the partition from the set of partitions to drop
since it is present in
// the snapshot
@@ -163,16 +163,17 @@ public class BaseFileUpdatesExtractor {
}
/**
- * Converts the provided {@link DataFilesDiff}.
+ * Converts the provided {@link InternalFilesDiff}.
*
- * @param dataFilesDiff the diff to apply to the Hudi table
+ * @param internalFilesDiff the diff to apply to the Hudi table
* @param commit The current commit started by the Hudi client
* @return The information needed to create a "replace" commit for the Hudi
table
*/
- ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull
String commit) {
+ ReplaceMetadata convertDiff(
+ @NonNull InternalFilesDiff internalFilesDiff, @NonNull String commit) {
// For all removed files, group by partition and extract the file id
Map<String, List<String>> partitionToReplacedFileIds =
- dataFilesDiff.getFilesRemoved().stream()
+ internalFilesDiff.dataFilesRemoved().stream()
.map(file -> new CachingPath(file.getPhysicalPath()))
.collect(
Collectors.groupingBy(
@@ -180,9 +181,9 @@ public class BaseFileUpdatesExtractor {
Collectors.mapping(this::getFileId, Collectors.toList())));
// For all added files, group by partition and extract the file id
List<WriteStatus> writeStatuses =
- dataFilesDiff.getFilesAdded().stream()
+ internalFilesDiff.dataFilesAdded().stream()
.map(file -> toWriteStatus(tableBasePath, commit, file,
Optional.empty()))
-
.collect(CustomCollectors.toList(dataFilesDiff.getFilesAdded().size()));
+
.collect(CustomCollectors.toList(internalFilesDiff.dataFilesAdded().size()));
return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index 97c2f9e3..b8aad22d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -87,7 +87,7 @@ import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.sync.ConversionTarget;
@@ -251,9 +251,9 @@ public class HudiConversionTarget implements
ConversionTarget {
}
@Override
- public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+ public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata =
- baseFileUpdatesExtractor.convertDiff(dataFilesDiff,
commitState.getInstantTime());
+ baseFileUpdatesExtractor.convertDiff(internalFilesDiff,
commitState.getInstantTime());
commitState.setReplaceMetadata(replaceMetadata);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index 5739328c..77c0ca98 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -65,9 +65,9 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.exception.ParseException;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
/** Extracts all the files for Hudi table represented by {@link
InternalTable}. */
@@ -122,7 +122,7 @@ public class HudiDataFileExtractor implements AutoCloseable
{
}
}
- public DataFilesDiff getDiffForCommit(
+ public InternalFilesDiff getDiffForCommit(
HoodieInstant hoodieInstantForDiff,
InternalTable table,
HoodieInstant instant,
@@ -139,7 +139,7 @@ public class HudiDataFileExtractor implements AutoCloseable
{
.collect(Collectors.toList());
List<InternalDataFile> filesRemoved = allInfo.getRemoved();
- return
DataFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build();
+ return
InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build();
}
private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo(
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index cb75b6d0..5f210f1f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -58,9 +58,9 @@ import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;
@@ -193,8 +193,11 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
.map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable))
.collect(Collectors.toSet());
- DataFilesDiff filesDiff =
-
DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build();
+ InternalFilesDiff filesDiff =
+ InternalFilesDiff.builder()
+ .filesAdded(dataFilesAdded)
+ .filesRemoved(dataFilesRemoved)
+ .build();
InternalTable table = getTable(snapshot);
return TableChange.builder()
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index 69faf1ea..a57ac4f6 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -45,7 +45,7 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.sync.ConversionTarget;
@@ -209,10 +209,10 @@ public class IcebergConversionTarget implements
ConversionTarget {
}
@Override
- public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
+ public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
dataFileUpdatesExtractor.applyDiff(
transaction,
- dataFilesDiff,
+ internalFilesDiff,
transaction.table().schema(),
transaction.table().spec(),
tableSyncMetadata);
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 0c8fa9a4..77a4eeed 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -31,9 +31,10 @@ import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
@AllArgsConstructor(staticName = "of")
@@ -59,8 +60,8 @@ public class IcebergDataFileUpdatesSync {
throw new ReadException("Failed to iterate through Iceberg data files",
e);
}
- FilesDiff<InternalDataFile, DataFile> diff =
- DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
+ FilesDiff<InternalFile, DataFile> diff =
+ InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
applyDiff(
transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema,
partitionSpec, metadata);
@@ -68,29 +69,37 @@ public class IcebergDataFileUpdatesSync {
public void applyDiff(
Transaction transaction,
- DataFilesDiff dataFilesDiff,
+ InternalFilesDiff internalFilesDiff,
Schema schema,
PartitionSpec partitionSpec,
TableSyncMetadata metadata) {
Collection<DataFile> filesRemoved =
- dataFilesDiff.getFilesRemoved().stream()
+ internalFilesDiff.dataFilesRemoved().stream()
.map(file -> getDataFile(partitionSpec, schema, file))
.collect(Collectors.toList());
applyDiff(
- transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema,
partitionSpec, metadata);
+ transaction,
+ internalFilesDiff.dataFilesAdded(),
+ filesRemoved,
+ schema,
+ partitionSpec,
+ metadata);
}
private void applyDiff(
Transaction transaction,
- Collection<InternalDataFile> filesAdded,
+ Collection<? extends InternalFile> filesAdded,
Collection<DataFile> filesRemoved,
Schema schema,
PartitionSpec partitionSpec,
TableSyncMetadata metadata) {
OverwriteFiles overwriteFiles = transaction.newOverwrite();
- filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec,
schema, f)));
+ filesAdded.stream()
+ .filter(InternalDataFile.class::isInstance)
+ .map(file -> (InternalDataFile) file)
+ .forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec,
schema, f)));
filesRemoved.forEach(overwriteFiles::deleteFile);
overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
overwriteFiles.commit();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index a20ae4e7..9e95f279 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -39,7 +39,7 @@ public class ValidationTestHelper {
assertNotNull(internalSnapshot.getTable());
List<String> filePaths =
internalSnapshot.getPartitionedDataFiles().stream()
- .flatMap(group -> group.getFiles().stream())
+ .flatMap(group -> group.getDataFiles().stream())
.map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
replaceFileScheme(allActivePaths);
@@ -83,14 +83,14 @@ public class ValidationTestHelper {
filesForCommitBefore.stream()
.filter(file -> !filesForCommitAfter.contains(file))
.collect(Collectors.toSet());
- assertEquals(filesAdded,
extractPathsFromDataFile(tableChange.getFilesDiff().getFilesAdded()));
+ assertEquals(filesAdded,
extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesAdded()));
assertEquals(
- filesRemoved,
extractPathsFromDataFile(tableChange.getFilesDiff().getFilesRemoved()));
+ filesRemoved,
extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesRemoved()));
}
public static List<String> getAllFilePaths(InternalSnapshot
internalSnapshot) {
return internalSnapshot.getPartitionedDataFiles().stream()
- .flatMap(fileGroup -> fileGroup.getFiles().stream())
+ .flatMap(fileGroup -> fileGroup.getDataFiles().stream())
.map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
index 3b1cc529..4afb6bdb 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
@@ -704,7 +704,7 @@ public class ITDeltaConversionSource {
throws URISyntaxException {
assertEquals(
expectedPartitionFiles.getPartitionValues(),
actualPartitionFiles.getPartitionValues());
- validateDataFiles(expectedPartitionFiles.getFiles(),
actualPartitionFiles.getFiles());
+ validateDataFiles(expectedPartitionFiles.getDataFiles(),
actualPartitionFiles.getDataFiles());
}
private void validateDataFiles(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
index 82125d25..99965f1f 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
@@ -84,10 +84,10 @@ import
org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.sync.ConversionTarget;
@@ -195,8 +195,8 @@ public class ITHudiConversionTarget {
String fileName = "file_1.parquet";
String filePath = getFilePath(partitionPath, fileName);
- DataFilesDiff dataFilesDiff =
- DataFilesDiff.builder()
+ InternalFilesDiff internalFilesDiff =
+ InternalFilesDiff.builder()
.fileAdded(getTestFile(partitionPath, fileName))
.fileRemoved(fileToRemove)
.build();
@@ -204,7 +204,7 @@ public class ITHudiConversionTarget {
HudiConversionTarget targetClient = getTargetClient();
InternalTable initialState = getState(Instant.now());
targetClient.beginSync(initialState);
- targetClient.syncFilesForDiff(dataFilesDiff);
+ targetClient.syncFilesForDiff(internalFilesDiff);
targetClient.syncSchema(SCHEMA);
TableSyncMetadata latestState =
TableSyncMetadata.of(
@@ -518,11 +518,11 @@ public class ITHudiConversionTarget {
List<InternalDataFile> filesToRemove,
Instant commitStart,
String sourceIdentifier) {
- DataFilesDiff dataFilesDiff2 =
-
DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
+ InternalFilesDiff internalFilesDiff2 =
+
InternalFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
InternalTable state3 = getState(commitStart);
conversionTarget.beginSync(state3);
- conversionTarget.syncFilesForDiff(dataFilesDiff2);
+ conversionTarget.syncFilesForDiff(internalFilesDiff2);
TableSyncMetadata latestState =
TableSyncMetadata.of(
state3.getLatestCommitTime(), Collections.emptyList(), "TEST",
sourceIdentifier);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
index 8f3b3f7e..4958714c 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
@@ -59,9 +59,9 @@ import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.testutil.ColumnStatMapUtil;
@@ -119,8 +119,8 @@ public class TestBaseFileUpdatesExtractor {
String.format("%s/%s/%s", tableBasePath, partitionPath2,
fileName5),
Collections.emptyList());
- DataFilesDiff diff =
- DataFilesDiff.builder()
+ InternalFilesDiff diff =
+ InternalFilesDiff.builder()
.filesAdded(Arrays.asList(addedFile1, addedFile2))
.filesRemoved(Arrays.asList(removedFile1, removedFile2,
removedFile3))
.build();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
index da1ec033..d165053f 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
@@ -49,7 +49,7 @@ import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
/**
@@ -212,7 +212,7 @@ public class TestHudiConversionTarget {
HudiConversionTarget.CommitState mockCommitState =
initMocksForBeginSync(targetClient).getLeft();
String instant = "commit";
- DataFilesDiff input = DataFilesDiff.builder().build();
+ InternalFilesDiff input = InternalFilesDiff.builder().build();
BaseFileUpdatesExtractor.ReplaceMetadata output =
BaseFileUpdatesExtractor.ReplaceMetadata.of(
Collections.emptyMap(), Collections.emptyList());
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index 4006b543..ab13ae2d 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -148,7 +148,7 @@ class TestIcebergConversionSource {
List<PartitionFileGroup> dataFileChunks =
internalSnapshot.getPartitionedDataFiles();
assertEquals(5, dataFileChunks.size());
for (PartitionFileGroup dataFilesChunk : dataFileChunks) {
- List<InternalDataFile> internalDataFiles = dataFilesChunk.getFiles();
+ List<InternalDataFile> internalDataFiles = dataFilesChunk.getDataFiles();
assertEquals(1, internalDataFiles.size());
InternalDataFile internalDataFile = internalDataFiles.get(0);
assertEquals(FileFormat.APACHE_PARQUET,
internalDataFile.getFileFormat());
@@ -315,7 +315,7 @@ class TestIcebergConversionSource {
TableChange tableChange =
conversionSource.getTableChangeForCommit(snapshot);
assertEquals(addedFiles,
tableChange.getFilesDiff().getFilesAdded().size());
assertTrue(
- tableChange.getFilesDiff().getFilesAdded().stream()
+ tableChange.getFilesDiff().dataFilesAdded().stream()
.allMatch(file -> file.getColumnStats().size() ==
numberOfColumns));
assertEquals(removedFiles,
tableChange.getFilesDiff().getFilesRemoved().size());
}