This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new c76692b8 Add base representation of storage files c76692b8 is described below commit c76692b85a646adb449a263652747c1c87de37b8 Author: Ashvin Agrawal <ash...@apache.org> AuthorDate: Sun Feb 16 12:48:22 2025 -0800 Add base representation of storage files --- .../java/org/apache/xtable/model/TableChange.java | 4 +- .../org/apache/xtable/model/storage/FilesDiff.java | 6 +-- .../xtable/model/storage/InternalDataFile.java | 19 ++++---- .../apache/xtable/model/storage/InternalFile.java | 56 ++++++++++++++++++++++ .../{DataFilesDiff.java => InternalFilesDiff.java} | 38 +++++++++++++-- .../xtable/model/storage/PartitionFileGroup.java | 12 ++++- .../apache/xtable/spi/sync/ConversionTarget.java | 6 +-- .../xtable/model/storage/TestDataFilesDiff.java | 8 ++-- .../apache/xtable/model/storage/TestFilesDiff.java | 2 +- .../spi/extractor/TestExtractFromSource.java | 10 ++-- .../xtable/spi/sync/TestTableFormatSync.java | 52 ++++++++++---------- .../apache/xtable/delta/DeltaConversionSource.java | 8 ++-- .../apache/xtable/delta/DeltaConversionTarget.java | 6 +-- .../delta/DeltaDataFileUpdatesExtractor.java | 19 ++++---- .../xtable/hudi/BaseFileUpdatesExtractor.java | 17 +++---- .../apache/xtable/hudi/HudiConversionTarget.java | 6 +-- .../apache/xtable/hudi/HudiDataFileExtractor.java | 6 +-- .../xtable/iceberg/IcebergConversionSource.java | 9 ++-- .../xtable/iceberg/IcebergConversionTarget.java | 6 +-- .../xtable/iceberg/IcebergDataFileUpdatesSync.java | 25 ++++++---- .../org/apache/xtable/ValidationTestHelper.java | 8 ++-- .../xtable/delta/ITDeltaConversionSource.java | 2 +- .../apache/xtable/hudi/ITHudiConversionTarget.java | 14 +++--- .../xtable/hudi/TestBaseFileUpdatesExtractor.java | 6 +-- .../xtable/hudi/TestHudiConversionTarget.java | 4 +- .../iceberg/TestIcebergConversionSource.java | 4 +- 26 files changed, 232 insertions(+), 121 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index fe3907ee..b425fd01 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -22,7 +22,7 @@ import lombok.Builder; import lombok.NonNull; import lombok.Value; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; /** * Captures the changes in a single commit/instant from the source table. @@ -33,7 +33,7 @@ import org.apache.xtable.model.storage.DataFilesDiff; @Builder(toBuilder = true) public class TableChange { // Change in files at the specified instant - DataFilesDiff filesDiff; + InternalFilesDiff filesDiff; /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java index 7d628ce4..f86e64bc 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java @@ -92,12 +92,12 @@ public class FilesDiff<L, P> { * @param <P> the type of the previous files * @return the set of files that are added */ - public static <P> FilesDiff<InternalDataFile, P> findNewAndRemovedFiles( + public static <P> FilesDiff<InternalFile, P> findNewAndRemovedFiles( List<PartitionFileGroup> latestFileGroups, Map<String, P> previousFiles) { - Map<String, InternalDataFile> latestFiles = + Map<String, InternalFile> latestFiles = latestFileGroups.stream() .flatMap(group -> group.getFiles().stream()) - .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalFile::getPhysicalPath, Function.identity())); return findNewAndRemovedFiles(latestFiles, previousFiles); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java index 3aee766e..0dcb7359 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java @@ -22,8 +22,12 @@ import java.util.Collections; import java.util.List; import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; -import lombok.Value; +import lombok.ToString; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; @@ -33,18 +37,17 @@ import org.apache.xtable.model.stat.PartitionValue; * * @since 0.1 */ -@Builder(toBuilder = true) -@Value -public class InternalDataFile { - // physical path of the file (absolute with scheme) - @NonNull String physicalPath; +@SuperBuilder(toBuilder = true) +@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Getter +public class InternalDataFile extends InternalFile { // file format @Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET; // partition ranges for the data file @Builder.Default @NonNull List<PartitionValue> partitionValues = Collections.emptyList(); - long fileSizeBytes; - long recordCount; // column stats for each column in the data file @Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList(); // last modified time in millis since epoch diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java new file mode 100644 index 00000000..5a898649 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFile.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; + +/** + * This class is an internal representation of a logical storage file of a table and is the base + * class of all types of storage files. The most common type of storage file is a data-file, which + * contains the actual records of a table. Other examples of a storage file are positional delete + * files (containing ordinals of the records to be deleted from a data file), stat files, and index + * files. For completeness of the conversion process, XTable needs to recognize current storage + * files of a table, and also the storage files that are added and removed over time as the state of + * the table changes. Different table formats support different storage file types. This base file + * representation is generic and extensible and is needed to design stable interfaces. + */ +@SuperBuilder(toBuilder = true) +@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) +@ToString(callSuper = true) +@EqualsAndHashCode +@AllArgsConstructor +@Getter +public abstract class InternalFile { + // Absolute path of the storage file, with the scheme, that contains this logical file. Typically, + // one physical storage file contains only one base file, for e.g. a parquet data file. However, + // in some cases, one storage file can contain multiple logical storage files for optimizations. + @NonNull String physicalPath; + + // The size of the logical file in the physical storage file. + long fileSizeBytes; + + // The number of records in the storage file. + long recordCount; +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java similarity index 62% rename from xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java rename to xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java index 9025ec31..2cb4ea9d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java @@ -20,6 +20,7 @@ package org.apache.xtable.model.storage; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,17 +32,18 @@ import lombok.experimental.SuperBuilder; @Value @EqualsAndHashCode(callSuper = true) @SuperBuilder -public class DataFilesDiff extends FilesDiff<InternalDataFile, InternalDataFile> { +public class InternalFilesDiff extends FilesDiff<InternalFile, InternalFile> { /** - * Creates a DataFilesDiff from the list of files in the target table and the list of files in the - * source table. + * Creates a InternalFilesDiff from the list of files in the target table and the list of files in + * the source table. * * @param source list of files currently in the source table * @param target list of files currently in the target table * @return files that need to be added and removed for the target table match the source table */ - public static DataFilesDiff from(List<InternalDataFile> source, List<InternalDataFile> target) { + public static InternalFilesDiff from( + List<InternalDataFile> source, List<InternalDataFile> target) { Map<String, InternalDataFile> targetPaths = target.stream() .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); @@ -51,9 +53,35 @@ public class DataFilesDiff extends FilesDiff<InternalDataFile, InternalDataFile> FilesDiff<InternalDataFile, InternalDataFile> diff = findNewAndRemovedFiles(sourcePaths, targetPaths); - return DataFilesDiff.builder() + return InternalFilesDiff.builder() .filesAdded(diff.getFilesAdded()) .filesRemoved(diff.getFilesRemoved()) .build(); } + + /** + * Filters files of type {@link InternalDataFile} from the list of files added to the source table + * and returns the list. + */ + public Set<InternalDataFile> dataFilesAdded() { + Set<InternalDataFile> result = + getFilesAdded().stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .collect(Collectors.toSet()); + return result; + } + + /** + * Filters files of type {@link InternalDataFile} from the list of files removed to the source + * table and returns the list. + */ + public Set<InternalDataFile> dataFilesRemoved() { + Set<InternalDataFile> result = + getFilesRemoved().stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .collect(Collectors.toSet()); + return result; + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java index 500fa1b5..e7eadfd5 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java @@ -28,12 +28,12 @@ import lombok.Value; import org.apache.xtable.model.stat.PartitionValue; -/** Represents a grouping of {@link InternalDataFile} with the same partition values. */ +/** Represents a grouping of {@link InternalFile} with the same partition values. */ @Value @Builder public class PartitionFileGroup { List<PartitionValue> partitionValues; - List<InternalDataFile> files; + List<? extends InternalFile> files; public static List<PartitionFileGroup> fromFiles(List<InternalDataFile> files) { return fromFiles(files.stream()); @@ -51,4 +51,12 @@ public class PartitionFileGroup { .build()) .collect(Collectors.toList()); } + + /** Filters storage files of type {@link InternalDataFile} and returns them. */ + public List<InternalDataFile> getDataFiles() { + return files.stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .collect(Collectors.toList()); + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java index 476578cb..b763be8b 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java @@ -28,7 +28,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** A client that provides the major functionality for syncing changes to a target system. */ @@ -68,9 +68,9 @@ public interface ConversionTarget { * Syncs the changes in files to the target system. This method is required to both add and remove * files. * - * @param dataFilesDiff the diff that needs to be synced + * @param internalFilesDiff the diff that needs to be synced */ - void syncFilesForDiff(DataFilesDiff dataFilesDiff); + void syncFilesForDiff(InternalFilesDiff internalFilesDiff); /** * Starts the sync and performs any initialization required diff --git a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java index 6896dce8..0e780bb3 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java @@ -37,13 +37,13 @@ public class TestDataFilesDiff { InternalDataFile.builder().physicalPath("file://already_in_target2.parquet").build(); InternalDataFile sourceFileInTargetAlready = InternalDataFile.builder().physicalPath("file://already_in_target3.parquet").build(); - DataFilesDiff actual = - DataFilesDiff.from( + InternalFilesDiff actual = + InternalFilesDiff.from( Arrays.asList(sourceFile1, sourceFile2, sourceFileInTargetAlready), Arrays.asList(targetFile1, targetFile2, sourceFileInTargetAlready)); - DataFilesDiff expected = - DataFilesDiff.builder() + InternalFilesDiff expected = + InternalFilesDiff.builder() .filesAdded(Arrays.asList(sourceFile1, sourceFile2)) .filesRemoved(Arrays.asList(targetFile1, targetFile2)) .build(); diff --git a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java index 32e33988..1776c345 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java @@ -50,7 +50,7 @@ public class TestFilesDiff { previousFiles.put("file2NoGroup", file2); previousFiles.put("file2Group2", file3); - FilesDiff<InternalDataFile, File> diff = + FilesDiff<InternalFile, File> diff = FilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles); assertEquals(2, diff.getFilesAdded().size()); assertTrue(diff.getFilesAdded().contains(file1Group2)); diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java index 5c7f8cfc..9271acf3 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java @@ -41,8 +41,8 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; public class TestExtractFromSource { @@ -89,7 +89,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtFirstInstant) .filesDiff( - DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) .sourceIdentifier("0") .build(); when(mockConversionSource.getTableChangeForCommit(firstCommitToSync)) @@ -98,7 +98,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtFirstInstant) .filesDiff( - DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) .sourceIdentifier("0") .build(); @@ -112,7 +112,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtSecondInstant) .filesDiff( - DataFilesDiff.builder() + InternalFilesDiff.builder() .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) @@ -124,7 +124,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtSecondInstant) .filesDiff( - DataFilesDiff.builder() + InternalFilesDiff.builder() .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 852d4e13..e15e3301 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -48,8 +48,8 @@ import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; @@ -129,27 +129,27 @@ public class TestTableFormatSync { void syncChangesWithFailureForOneFormat() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = TableChange.builder() .tableAsOfChange(tableState1) - .filesDiff(dataFilesDiff1) + .filesDiff(internalFilesDiff1) .sourceIdentifier("0") .build(); InternalTable tableState2 = getTableState(2); - DataFilesDiff dataFilesDiff2 = getFilesDiff(2); + InternalFilesDiff internalFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = TableChange.builder() .tableAsOfChange(tableState2) - .filesDiff(dataFilesDiff2) + .filesDiff(internalFilesDiff2) .sourceIdentifier("1") .build(); InternalTable tableState3 = getTableState(3); - DataFilesDiff dataFilesDiff3 = getFilesDiff(3); + InternalFilesDiff internalFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = TableChange.builder() .tableAsOfChange(tableState3) - .filesDiff(dataFilesDiff3) + .filesDiff(internalFilesDiff3) .sourceIdentifier("2") .build(); @@ -224,25 +224,25 @@ public class TestTableFormatSync { tableState1, pendingCommitInstants, tableChange1.getSourceIdentifier()); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls( mockConversionTarget2, tableState1, pendingCommitInstants, tableChange1.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls( mockConversionTarget2, tableState2, pendingCommitInstants, tableChange2.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2); verifyBaseConversionTargetCalls( mockConversionTarget2, tableState3, pendingCommitInstants, tableChange3.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget1, times(1)).completeSync(); verify(mockConversionTarget2, times(3)).completeSync(); } @@ -251,27 +251,27 @@ public class TestTableFormatSync { void syncChangesWithDifferentFormatsAndMetadata() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = TableChange.builder() .tableAsOfChange(tableState1) - .filesDiff(dataFilesDiff1) + .filesDiff(internalFilesDiff1) .sourceIdentifier("0") .build(); InternalTable tableState2 = getTableState(2); - DataFilesDiff dataFilesDiff2 = getFilesDiff(2); + InternalFilesDiff internalFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = TableChange.builder() .tableAsOfChange(tableState2) - .filesDiff(dataFilesDiff2) + .filesDiff(internalFilesDiff2) .sourceIdentifier("1") .build(); InternalTable tableState3 = getTableState(3); - DataFilesDiff dataFilesDiff3 = getFilesDiff(3); + InternalFilesDiff internalFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = TableChange.builder() .tableAsOfChange(tableState3) - .filesDiff(dataFilesDiff3) + .filesDiff(internalFilesDiff3) .sourceIdentifier("2") .build(); @@ -346,13 +346,13 @@ public class TestTableFormatSync { tableState1, pendingCommitInstants, tableChange1.getSourceIdentifier()); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls( mockConversionTarget1, tableState3, pendingCommitInstants, tableChange3.getSourceIdentifier()); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget1, times(2)).completeSync(); // conversionTarget2 syncs table changes 2 and 3 verifyBaseConversionTargetCalls( @@ -360,13 +360,13 @@ public class TestTableFormatSync { tableState2, pendingCommitInstants, tableChange2.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2); verifyBaseConversionTargetCalls( mockConversionTarget2, tableState3, pendingCommitInstants, tableChange3.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget2, times(2)).completeSync(); } @@ -374,11 +374,11 @@ public class TestTableFormatSync { void syncChangesOneFormatWithNoRequiredChanges() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = TableChange.builder() .tableAsOfChange(tableState1) - .filesDiff(dataFilesDiff1) + .filesDiff(internalFilesDiff1) .sourceIdentifier("0") .build(); @@ -426,7 +426,7 @@ public class TestTableFormatSync { tableState1, pendingCommitInstants, tableChange1.getSourceIdentifier()); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1); } /** @@ -453,8 +453,8 @@ public class TestTableFormatSync { .build(); } - private DataFilesDiff getFilesDiff(int id) { - return DataFilesDiff.builder() + private InternalFilesDiff getFilesDiff(int id) { + return InternalFilesDiff.builder() .filesAdded( Collections.singletonList( InternalDataFile.builder() diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index de4a2b69..97804d5f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -51,9 +51,9 @@ import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.DataFileIterator; @@ -163,14 +163,14 @@ public class DeltaConversionSource implements ConversionSource<Long> { } } - DataFilesDiff dataFilesDiff = - DataFilesDiff.builder() + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder() .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); return TableChange.builder() .tableAsOfChange(tableAtVersion) - .filesDiff(dataFilesDiff) + .filesDiff(internalFilesDiff) .sourceIdentifier(getCommitIdentifier(versionNumber)) .build(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index fbe99801..b487e2cb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -65,7 +65,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.schema.SparkSchemaExtractor; @@ -193,10 +193,10 @@ public class DeltaConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { transactionState.setActions( dataFileUpdatesExtractor.applyDiff( - dataFilesDiff, + internalFilesDiff, transactionState.getLatestSchemaInternal(), deltaLog.dataPath().toString())); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 96776750..caee22f6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -39,9 +39,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.ColumnStat; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.paths.PathUtils; @@ -74,30 +75,32 @@ public class DeltaDataFileUpdatesExtractor { file -> DeltaActionsConverter.getFullPathToFile(snapshot, file.path()), file -> file)); - FilesDiff<InternalDataFile, Action> diff = - DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + FilesDiff<InternalFile, Action> diff = + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); return applyDiff( diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, deltaLog.dataPath().toString()); } public Seq<Action> applyDiff( - DataFilesDiff dataFilesDiff, InternalSchema tableSchema, String tableBasePath) { + InternalFilesDiff internalFilesDiff, InternalSchema tableSchema, String tableBasePath) { List<Action> removeActions = - dataFilesDiff.getFilesRemoved().stream() + internalFilesDiff.dataFilesRemoved().stream() .flatMap(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath)) .map(AddFile::remove) - .collect(CustomCollectors.toList(dataFilesDiff.getFilesRemoved().size())); - return applyDiff(dataFilesDiff.getFilesAdded(), removeActions, tableSchema, tableBasePath); + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size())); + return applyDiff(internalFilesDiff.dataFilesAdded(), removeActions, tableSchema, tableBasePath); } private Seq<Action> applyDiff( - Set<InternalDataFile> filesAdded, + Set<? extends InternalFile> filesAdded, Collection<Action> removeFileActions, InternalSchema tableSchema, String tableBasePath) { Stream<Action> addActions = filesAdded.stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) .flatMap(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath)); int totalActions = filesAdded.size() + removeFileActions.size(); List<Action> allActions = diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java index bef00013..ba1f9f9d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java @@ -54,8 +54,8 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.stat.ColumnStat; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -96,7 +96,7 @@ public class BaseFileUpdatesExtractor { partitionedDataFiles.stream() .map( partitionFileGroup -> { - List<InternalDataFile> dataFiles = partitionFileGroup.getFiles(); + List<InternalDataFile> dataFiles = partitionFileGroup.getDataFiles(); String partitionPath = getPartitionPath(tableBasePath, dataFiles); // remove the partition from the set of partitions to drop since it is present in // the snapshot @@ -163,16 +163,17 @@ public class BaseFileUpdatesExtractor { } /** - * Converts the provided {@link DataFilesDiff}. + * Converts the provided {@link InternalFilesDiff}. * - * @param dataFilesDiff the diff to apply to the Hudi table + * @param internalFilesDiff the diff to apply to the Hudi table * @param commit The current commit started by the Hudi client * @return The information needed to create a "replace" commit for the Hudi table */ - ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull String commit) { + ReplaceMetadata convertDiff( + @NonNull InternalFilesDiff internalFilesDiff, @NonNull String commit) { // For all removed files, group by partition and extract the file id Map<String, List<String>> partitionToReplacedFileIds = - dataFilesDiff.getFilesRemoved().stream() + internalFilesDiff.dataFilesRemoved().stream() .map(file -> new CachingPath(file.getPhysicalPath())) .collect( Collectors.groupingBy( @@ -180,9 +181,9 @@ public class BaseFileUpdatesExtractor { Collectors.mapping(this::getFileId, Collectors.toList()))); // For all added files, group by partition and extract the file id List<WriteStatus> writeStatuses = - dataFilesDiff.getFilesAdded().stream() + internalFilesDiff.dataFilesAdded().stream() .map(file -> toWriteStatus(tableBasePath, commit, file, Optional.empty())) - .collect(CustomCollectors.toList(dataFilesDiff.getFilesAdded().size())); + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesAdded().size())); return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index 97c2f9e3..b8aad22d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -87,7 +87,7 @@ import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -251,9 +251,9 @@ public class HudiConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata = - baseFileUpdatesExtractor.convertDiff(dataFilesDiff, commitState.getInstantTime()); + baseFileUpdatesExtractor.convertDiff(internalFilesDiff, commitState.getInstantTime()); commitState.setReplaceMetadata(replaceMetadata); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5739328c..77c0ca98 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -65,9 +65,9 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.exception.ParseException; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** Extracts all the files for Hudi table represented by {@link InternalTable}. */ @@ -122,7 +122,7 @@ public class HudiDataFileExtractor implements AutoCloseable { } } - public DataFilesDiff getDiffForCommit( + public InternalFilesDiff getDiffForCommit( HoodieInstant hoodieInstantForDiff, InternalTable table, HoodieInstant instant, @@ -139,7 +139,7 @@ public class HudiDataFileExtractor implements AutoCloseable { .collect(Collectors.toList()); List<InternalDataFile> filesRemoved = allInfo.getRemoved(); - return DataFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); + return InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); } private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo( diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index cb75b6d0..5f210f1f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -58,9 +58,9 @@ import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.extractor.ConversionSource; @@ -193,8 +193,11 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { .map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable)) .collect(Collectors.toSet()); - DataFilesDiff filesDiff = - DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build(); + InternalFilesDiff filesDiff = + InternalFilesDiff.builder() + .filesAdded(dataFilesAdded) + .filesRemoved(dataFilesRemoved) + .build(); InternalTable table = getTable(snapshot); return TableChange.builder() diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index 69faf1ea..a57ac4f6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -45,7 +45,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -209,10 +209,10 @@ public class IcebergConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { dataFileUpdatesExtractor.applyDiff( transaction, - dataFilesDiff, + internalFilesDiff, transaction.table().schema(), transaction.table().spec(), tableSyncMetadata); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 0c8fa9a4..77a4eeed 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -31,9 +31,10 @@ import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -59,8 +60,8 @@ public class IcebergDataFileUpdatesSync { throw new ReadException("Failed to iterate through Iceberg data files", e); } - FilesDiff<InternalDataFile, DataFile> diff = - DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + FilesDiff<InternalFile, DataFile> diff = + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); applyDiff( transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec, metadata); @@ -68,29 +69,37 @@ public class IcebergDataFileUpdatesSync { public void applyDiff( Transaction transaction, - DataFilesDiff dataFilesDiff, + InternalFilesDiff internalFilesDiff, Schema schema, PartitionSpec partitionSpec, TableSyncMetadata metadata) { Collection<DataFile> filesRemoved = - dataFilesDiff.getFilesRemoved().stream() + internalFilesDiff.dataFilesRemoved().stream() .map(file -> getDataFile(partitionSpec, schema, file)) .collect(Collectors.toList()); applyDiff( - transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, partitionSpec, metadata); + transaction, + internalFilesDiff.dataFilesAdded(), + filesRemoved, + schema, + partitionSpec, + metadata); } private void applyDiff( Transaction transaction, - Collection<InternalDataFile> filesAdded, + Collection<? extends InternalFile> filesAdded, Collection<DataFile> filesRemoved, Schema schema, PartitionSpec partitionSpec, TableSyncMetadata metadata) { OverwriteFiles overwriteFiles = transaction.newOverwrite(); - filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f))); + filesAdded.stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f))); filesRemoved.forEach(overwriteFiles::deleteFile); overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); overwriteFiles.commit(); diff --git a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java index a20ae4e7..9e95f279 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java @@ -39,7 +39,7 @@ public class ValidationTestHelper { assertNotNull(internalSnapshot.getTable()); List<String> filePaths = internalSnapshot.getPartitionedDataFiles().stream() - .flatMap(group -> group.getFiles().stream()) + .flatMap(group -> group.getDataFiles().stream()) .map(InternalDataFile::getPhysicalPath) .collect(Collectors.toList()); replaceFileScheme(allActivePaths); @@ -83,14 +83,14 @@ public class ValidationTestHelper { filesForCommitBefore.stream() .filter(file -> !filesForCommitAfter.contains(file)) .collect(Collectors.toSet()); - assertEquals(filesAdded, extractPathsFromDataFile(tableChange.getFilesDiff().getFilesAdded())); + assertEquals(filesAdded, extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesAdded())); assertEquals( - filesRemoved, extractPathsFromDataFile(tableChange.getFilesDiff().getFilesRemoved())); + filesRemoved, extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesRemoved())); } public static List<String> getAllFilePaths(InternalSnapshot internalSnapshot) { return internalSnapshot.getPartitionedDataFiles().stream() - .flatMap(fileGroup -> fileGroup.getFiles().stream()) + .flatMap(fileGroup -> fileGroup.getDataFiles().stream()) .map(InternalDataFile::getPhysicalPath) .collect(Collectors.toList()); } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java index 3b1cc529..4afb6bdb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java @@ -704,7 +704,7 @@ public class ITDeltaConversionSource { throws URISyntaxException { assertEquals( expectedPartitionFiles.getPartitionValues(), actualPartitionFiles.getPartitionValues()); - validateDataFiles(expectedPartitionFiles.getFiles(), actualPartitionFiles.getFiles()); + validateDataFiles(expectedPartitionFiles.getDataFiles(), actualPartitionFiles.getDataFiles()); } private void validateDataFiles( diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java index 82125d25..99965f1f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java @@ -84,10 +84,10 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -195,8 +195,8 @@ public class ITHudiConversionTarget { String fileName = "file_1.parquet"; String filePath = getFilePath(partitionPath, fileName); - DataFilesDiff dataFilesDiff = - DataFilesDiff.builder() + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder() .fileAdded(getTestFile(partitionPath, fileName)) .fileRemoved(fileToRemove) .build(); @@ -204,7 +204,7 @@ public class ITHudiConversionTarget { HudiConversionTarget targetClient = getTargetClient(); InternalTable initialState = getState(Instant.now()); targetClient.beginSync(initialState); - targetClient.syncFilesForDiff(dataFilesDiff); + targetClient.syncFilesForDiff(internalFilesDiff); targetClient.syncSchema(SCHEMA); TableSyncMetadata latestState = TableSyncMetadata.of( @@ -518,11 +518,11 @@ public class ITHudiConversionTarget { List<InternalDataFile> filesToRemove, Instant commitStart, String sourceIdentifier) { - DataFilesDiff dataFilesDiff2 = - DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build(); + InternalFilesDiff internalFilesDiff2 = + InternalFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build(); InternalTable state3 = getState(commitStart); conversionTarget.beginSync(state3); - conversionTarget.syncFilesForDiff(dataFilesDiff2); + conversionTarget.syncFilesForDiff(internalFilesDiff2); TableSyncMetadata latestState = TableSyncMetadata.of( state3.getLatestCommitTime(), Collections.emptyList(), "TEST", sourceIdentifier); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java index 8f3b3f7e..4958714c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java @@ -59,9 +59,9 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.testutil.ColumnStatMapUtil; @@ -119,8 +119,8 @@ public class TestBaseFileUpdatesExtractor { String.format("%s/%s/%s", tableBasePath, partitionPath2, fileName5), Collections.emptyList()); - DataFilesDiff diff = - DataFilesDiff.builder() + InternalFilesDiff diff = + InternalFilesDiff.builder() .filesAdded(Arrays.asList(addedFile1, addedFile2)) .filesRemoved(Arrays.asList(removedFile1, removedFile2, removedFile3)) .build(); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java index da1ec033..d165053f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java @@ -49,7 +49,7 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** @@ -212,7 +212,7 @@ public class TestHudiConversionTarget { HudiConversionTarget.CommitState mockCommitState = initMocksForBeginSync(targetClient).getLeft(); String instant = "commit"; - DataFilesDiff input = DataFilesDiff.builder().build(); + InternalFilesDiff input = InternalFilesDiff.builder().build(); BaseFileUpdatesExtractor.ReplaceMetadata output = BaseFileUpdatesExtractor.ReplaceMetadata.of( Collections.emptyMap(), Collections.emptyList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index 4006b543..ab13ae2d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -148,7 +148,7 @@ class TestIcebergConversionSource { List<PartitionFileGroup> dataFileChunks = internalSnapshot.getPartitionedDataFiles(); assertEquals(5, dataFileChunks.size()); for (PartitionFileGroup dataFilesChunk : dataFileChunks) { - List<InternalDataFile> internalDataFiles = dataFilesChunk.getFiles(); + List<InternalDataFile> internalDataFiles = dataFilesChunk.getDataFiles(); assertEquals(1, internalDataFiles.size()); InternalDataFile internalDataFile = internalDataFiles.get(0); assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.getFileFormat()); @@ -315,7 +315,7 @@ class TestIcebergConversionSource { TableChange tableChange = conversionSource.getTableChangeForCommit(snapshot); assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size()); assertTrue( - tableChange.getFilesDiff().getFilesAdded().stream() + tableChange.getFilesDiff().dataFilesAdded().stream() .allMatch(file -> file.getColumnStats().size() == numberOfColumns)); assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size()); }