This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch 652-generic-internal-file-representation in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 829ac27e4915b2fc365b3927f027366a38113e06 Author: Ashvin Agrawal <ash...@apache.org> AuthorDate: Sun Feb 16 12:48:22 2025 -0800 Add base representation of storage files --- .../java/org/apache/xtable/model/TableChange.java | 4 +- .../java/org/apache/xtable/model/stat/Range.java | 5 +- .../{TableChange.java => storage/FileType.java} | 34 ++++++----- .../org/apache/xtable/model/storage/FilesDiff.java | 2 +- .../xtable/model/storage/InternalBaseFile.java | 67 ++++++++++++++++++++++ .../xtable/model/storage/InternalDataFile.java | 20 ++++--- .../{DataFilesDiff.java => InternalFilesDiff.java} | 42 +++++++++++--- .../xtable/model/storage/PartitionFileGroup.java | 6 +- .../apache/xtable/spi/sync/ConversionTarget.java | 6 +- .../xtable/model/storage/TestDataFilesDiff.java | 8 +-- .../spi/extractor/TestExtractFromSource.java | 10 ++-- .../xtable/spi/sync/TestTableFormatSync.java | 52 ++++++++--------- .../apache/xtable/delta/DeltaConversionSource.java | 15 +++-- .../apache/xtable/delta/DeltaConversionTarget.java | 6 +- .../delta/DeltaDataFileUpdatesExtractor.java | 20 +++---- .../xtable/delta/DeltaPartitionExtractor.java | 6 +- .../xtable/hudi/BaseFileUpdatesExtractor.java | 31 +++++----- .../apache/xtable/hudi/HudiConversionTarget.java | 6 +- .../apache/xtable/hudi/HudiDataFileExtractor.java | 6 +- .../apache/xtable/hudi/HudiFileStatsExtractor.java | 4 +- .../xtable/iceberg/IcebergConversionSource.java | 9 ++- .../xtable/iceberg/IcebergConversionTarget.java | 6 +- .../xtable/iceberg/IcebergDataFileUpdatesSync.java | 21 +++---- .../org/apache/xtable/ValidationTestHelper.java | 10 ++-- .../xtable/delta/ITDeltaConversionSource.java | 20 +++---- .../org/apache/xtable/delta/TestDeltaSync.java | 4 +- .../apache/xtable/hudi/ITHudiConversionSource.java | 8 +-- .../apache/xtable/hudi/ITHudiConversionTarget.java | 14 ++--- .../xtable/hudi/TestBaseFileUpdatesExtractor.java | 6 +- .../xtable/hudi/TestHudiConversionTarget.java | 4 +- .../xtable/hudi/TestHudiFileStatsExtractor.java | 4 +- .../iceberg/TestIcebergConversionSource.java | 14 ++--- .../org/apache/xtable/iceberg/TestIcebergSync.java | 11 ++-- 33 files changed, 297 insertions(+), 184 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index 51f0ee0b..c2517bfa 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -21,7 +21,7 @@ package org.apache.xtable.model; import lombok.Builder; import lombok.Value; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; /** * Captures the changes in a single commit/instant from the source table. @@ -32,7 +32,7 @@ import org.apache.xtable.model.storage.DataFilesDiff; @Builder(toBuilder = true) public class TableChange { // Change in files at the specified instant - DataFilesDiff filesDiff; + InternalFilesDiff filesDiff; /** The {@link InternalTable} at the commit time to which this table change belongs. */ InternalTable tableAsOfChange; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/stat/Range.java b/xtable-api/src/main/java/org/apache/xtable/model/stat/Range.java index 1540ebed..417c0ffa 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/stat/Range.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/stat/Range.java @@ -25,14 +25,13 @@ import lombok.Value; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.storage.InternalDataFile; /** * Represents a range of values in the specified data type. Can represent a scalar value when the * {@link #minValue} and {@link #maxValue} are the same. * - * <p>For the ranges stored in {@link InternalDataFile#getPartitionValues()}, the values will be - * based off the {@link PartitionTransformType}. {@link PartitionTransformType#HOUR}, {@link + * <p>For the ranges stored in {@link InternalDataFile#partitionValues()}, the values will be based + * off the {@link PartitionTransformType}. {@link PartitionTransformType#HOUR}, {@link * PartitionTransformType#DAY}, {@link PartitionTransformType#MONTH}, {@link * PartitionTransformType#YEAR} are all stored as a long representing a point in time as * milliseconds since epoch. {@link PartitionTransformType#VALUE} will match the rules below. diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java similarity index 55% copy from xtable-api/src/main/java/org/apache/xtable/model/TableChange.java copy to xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java index 51f0ee0b..7d886181 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java @@ -16,24 +16,30 @@ * limitations under the License. */ -package org.apache.xtable.model; - -import lombok.Builder; -import lombok.Value; - -import org.apache.xtable.model.storage.DataFilesDiff; +package org.apache.xtable.model.storage; /** - * Captures the changes in a single commit/instant from the source table. + * Known types of storage files. For e.g. data file of a table, a position deletion, statistics + * file. * * @since 0.1 */ -@Value -@Builder(toBuilder = true) -public class TableChange { - // Change in files at the specified instant - DataFilesDiff filesDiff; +public enum FileType { + /** Files of type data contain the actual data records of the table. */ + DATA_FILE, + + /** + * Files of type deletion contain information of soft deleted records of the table, typically + * containing ordinal references + */ + DELETION_FILE, + + /** + * Files of type statistics typically contain supplemental statistics information related to a + * table, its partitions, or data files + */ + STATISTICS_FILE, - /** The {@link InternalTable} at the commit time to which this table change belongs. */ - InternalTable tableAsOfChange; + /** Files of type index contain information related to the indexes of a table */ + INDEX_FILE } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java index 7d628ce4..85450e82 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java @@ -97,7 +97,7 @@ public class FilesDiff<L, P> { Map<String, InternalDataFile> latestFiles = latestFileGroups.stream() .flatMap(group -> group.getFiles().stream()) - .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); return findNewAndRemovedFiles(latestFiles, previousFiles); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java new file mode 100644 index 00000000..67e0f6b8 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; + +/** + * This class is an internal representation of a logical storage file of a table and is the base + * class of all types of storage files. The most common type of storage file is a data-file, which + * contains the actual records of a table. Other examples of a storage file are positional delete + * files (containing ordinals of the records to be deleted from a data file), stat files, and index + * files. For completeness of the conversion process, XTable needs to recognize current storage + * files of a table, and also the storage files that are added and removed over time as the state of + * the table changes. Different table formats support different storage file types. This base file + * representation is generic and extensible and is needed to design stable interfaces. + */ +@SuperBuilder(toBuilder = true) +@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) +@ToString(callSuper = true) +@EqualsAndHashCode +@AllArgsConstructor +@Accessors(fluent = true) +@Getter +public abstract class InternalBaseFile { + // Absolute path of the storage file, with the scheme, that contains this logical file. Typically, + // one physical storage file contains only one base file, for e.g. a parquet data file. However, + // in some cases, one storage file can contain multiple logical storage files for optimizations. + @NonNull String physicalPath; + + // The offset of the logical file in the physical storage file. + @Builder.Default long offset = 0; + + // The size of the logical file in the physical storage file. + long fileSizeBytes; + + // File Type represents the type of the storage file. For example, a data file, a delete file, a + // stat file + @Builder.Default @NonNull FileType fileType = FileType.DATA_FILE; + + public boolean isDataFile() { + return fileType.equals(FileType.DATA_FILE); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java index 3aee766e..effbc887 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java @@ -22,8 +22,13 @@ import java.util.Collections; import java.util.List; import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; -import lombok.Value; +import lombok.ToString; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; @@ -33,17 +38,18 @@ import org.apache.xtable.model.stat.PartitionValue; * * @since 0.1 */ -@Builder(toBuilder = true) -@Value -public class InternalDataFile { - // physical path of the file (absolute with scheme) - @NonNull String physicalPath; +@SuperBuilder(toBuilder = true) +@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Accessors(fluent = true) +@Getter +public class InternalDataFile extends InternalBaseFile { // file format @Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET; // partition ranges for the data file @Builder.Default @NonNull List<PartitionValue> partitionValues = Collections.emptyList(); - long fileSizeBytes; long recordCount; // column stats for each column in the data file @Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList(); diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java similarity index 56% rename from xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java rename to xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java index 9025ec31..527d5abd 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java @@ -20,6 +20,7 @@ package org.apache.xtable.model.storage; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,29 +32,56 @@ import lombok.experimental.SuperBuilder; @Value @EqualsAndHashCode(callSuper = true) @SuperBuilder -public class DataFilesDiff extends FilesDiff<InternalDataFile, InternalDataFile> { +public class InternalFilesDiff extends FilesDiff<InternalBaseFile, InternalBaseFile> { /** - * Creates a DataFilesDiff from the list of files in the target table and the list of files in the - * source table. + * Creates a InternalFilesDiff from the list of files in the target table and the list of files in + * the source table. * * @param source list of files currently in the source table * @param target list of files currently in the target table * @return files that need to be added and removed for the target table match the source table */ - public static DataFilesDiff from(List<InternalDataFile> source, List<InternalDataFile> target) { + public static InternalFilesDiff from( + List<InternalDataFile> source, List<InternalDataFile> target) { Map<String, InternalDataFile> targetPaths = target.stream() - .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); Map<String, InternalDataFile> sourcePaths = source.stream() - .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); FilesDiff<InternalDataFile, InternalDataFile> diff = findNewAndRemovedFiles(sourcePaths, targetPaths); - return DataFilesDiff.builder() + return InternalFilesDiff.builder() .filesAdded(diff.getFilesAdded()) .filesRemoved(diff.getFilesRemoved()) .build(); } + + /** + * Filters files of type {@link FileType#DATA_FILE} from the list of files added to the source + * table and returns the list. + */ + public Set<InternalDataFile> dataFilesAdded() { + Set<InternalDataFile> result = + getFilesAdded().stream() + .filter(InternalBaseFile::isDataFile) + .map(file -> (InternalDataFile) file) + .collect(Collectors.toSet()); + return result; + } + + /** + * Filters files of type {@link FileType#DATA_FILE} from the list of files removed to the source + * table and returns the list. + */ + public Set<InternalDataFile> dataFilesRemoved() { + Set<InternalDataFile> result = + getFilesRemoved().stream() + .filter(InternalBaseFile::isDataFile) + .map(file -> (InternalDataFile) file) + .collect(Collectors.toSet()); + return result; + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java index 500fa1b5..a1fa6726 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java @@ -28,12 +28,12 @@ import lombok.Value; import org.apache.xtable.model.stat.PartitionValue; -/** Represents a grouping of {@link InternalDataFile} with the same partition values. */ +/** Represents a grouping of {@link InternalBaseFile} with the same partition values. */ @Value @Builder public class PartitionFileGroup { List<PartitionValue> partitionValues; - List<InternalDataFile> files; + List<? extends InternalBaseFile> files; public static List<PartitionFileGroup> fromFiles(List<InternalDataFile> files) { return fromFiles(files.stream()); @@ -41,7 +41,7 @@ public class PartitionFileGroup { public static List<PartitionFileGroup> fromFiles(Stream<InternalDataFile> files) { Map<List<PartitionValue>, List<InternalDataFile>> filesGrouped = - files.collect(Collectors.groupingBy(InternalDataFile::getPartitionValues)); + files.collect(Collectors.groupingBy(InternalDataFile::partitionValues)); return filesGrouped.entrySet().stream() .map( entry -> diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java index 736f49e4..e57a7d3d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java @@ -28,7 +28,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** A client that provides the major functionality for syncing changes to a target system. */ @@ -68,9 +68,9 @@ public interface ConversionTarget { * Syncs the changes in files to the target system. This method is required to both add and remove * files. * - * @param dataFilesDiff the diff that needs to be synced + * @param internalFilesDiff the diff that needs to be synced */ - void syncFilesForDiff(DataFilesDiff dataFilesDiff); + void syncFilesForDiff(InternalFilesDiff internalFilesDiff); /** * Starts the sync and performs any initialization required diff --git a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java index 6896dce8..0e780bb3 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java @@ -37,13 +37,13 @@ public class TestDataFilesDiff { InternalDataFile.builder().physicalPath("file://already_in_target2.parquet").build(); InternalDataFile sourceFileInTargetAlready = InternalDataFile.builder().physicalPath("file://already_in_target3.parquet").build(); - DataFilesDiff actual = - DataFilesDiff.from( + InternalFilesDiff actual = + InternalFilesDiff.from( Arrays.asList(sourceFile1, sourceFile2, sourceFileInTargetAlready), Arrays.asList(targetFile1, targetFile2, sourceFileInTargetAlready)); - DataFilesDiff expected = - DataFilesDiff.builder() + InternalFilesDiff expected = + InternalFilesDiff.builder() .filesAdded(Arrays.asList(sourceFile1, sourceFile2)) .filesRemoved(Arrays.asList(targetFile1, targetFile2)) .build(); diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java index 8503057d..2480e6a8 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java @@ -41,8 +41,8 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; public class TestExtractFromSource { @@ -85,7 +85,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtFirstInstant) .filesDiff( - DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) .build(); when(mockConversionSource.getTableChangeForCommit(firstCommitToSync)) .thenReturn(tableChangeToReturnAtFirstInstant); @@ -93,7 +93,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtFirstInstant) .filesDiff( - DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) + InternalFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build()) .build(); // add 2 new files, remove 2 files @@ -106,7 +106,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtSecondInstant) .filesDiff( - DataFilesDiff.builder() + InternalFilesDiff.builder() .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) @@ -117,7 +117,7 @@ public class TestExtractFromSource { TableChange.builder() .tableAsOfChange(tableAtSecondInstant) .filesDiff( - DataFilesDiff.builder() + InternalFilesDiff.builder() .filesAdded(Arrays.asList(newFile2, newFile3)) .filesRemoved(Arrays.asList(initialFile3, newFile1)) .build()) diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 39480f8b..548c784d 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -48,8 +48,8 @@ import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; @@ -125,17 +125,17 @@ public class TestTableFormatSync { void syncChangesWithFailureForOneFormat() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder().tableAsOfChange(tableState1).filesDiff(internalFilesDiff1).build(); InternalTable tableState2 = getTableState(2); - DataFilesDiff dataFilesDiff2 = getFilesDiff(2); + InternalFilesDiff internalFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = - TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build(); + TableChange.builder().tableAsOfChange(tableState2).filesDiff(internalFilesDiff2).build(); InternalTable tableState3 = getTableState(3); - DataFilesDiff dataFilesDiff3 = getFilesDiff(3); + InternalFilesDiff internalFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = - TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build(); + TableChange.builder().tableAsOfChange(tableState3).filesDiff(internalFilesDiff3).build(); List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now()); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -202,13 +202,13 @@ public class TestTableFormatSync { } verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2); verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget1, times(1)).completeSync(); verify(mockConversionTarget2, times(3)).completeSync(); } @@ -217,17 +217,17 @@ public class TestTableFormatSync { void syncChangesWithDifferentFormatsAndMetadata() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder().tableAsOfChange(tableState1).filesDiff(internalFilesDiff1).build(); InternalTable tableState2 = getTableState(2); - DataFilesDiff dataFilesDiff2 = getFilesDiff(2); + InternalFilesDiff internalFilesDiff2 = getFilesDiff(2); TableChange tableChange2 = - TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build(); + TableChange.builder().tableAsOfChange(tableState2).filesDiff(internalFilesDiff2).build(); InternalTable tableState3 = getTableState(3); - DataFilesDiff dataFilesDiff3 = getFilesDiff(3); + InternalFilesDiff internalFilesDiff3 = getFilesDiff(3); TableChange tableChange3 = - TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build(); + TableChange.builder().tableAsOfChange(tableState3).filesDiff(internalFilesDiff3).build(); List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now()); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -291,15 +291,15 @@ public class TestTableFormatSync { // conversionTarget1 syncs table changes 1 and 3 verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff1); verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, pendingCommitInstants); - verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget1).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget1, times(2)).completeSync(); // conversionTarget2 syncs table changes 2 and 3 verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff2); verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff3); verify(mockConversionTarget2, times(2)).completeSync(); } @@ -307,9 +307,9 @@ public class TestTableFormatSync { void syncChangesOneFormatWithNoRequiredChanges() { Instant start = Instant.now(); InternalTable tableState1 = getTableState(1); - DataFilesDiff dataFilesDiff1 = getFilesDiff(1); + InternalFilesDiff internalFilesDiff1 = getFilesDiff(1); TableChange tableChange1 = - TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build(); + TableChange.builder().tableAsOfChange(tableState1).filesDiff(internalFilesDiff1).build(); List<Instant> pendingCommitInstants = Collections.emptyList(); when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG); @@ -349,7 +349,7 @@ public class TestTableFormatSync { verify(mockConversionTarget1, never()).completeSync(); verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants); - verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1); + verify(mockConversionTarget2).syncFilesForDiff(internalFilesDiff1); } /** @@ -376,8 +376,8 @@ public class TestTableFormatSync { .build(); } - private DataFilesDiff getFilesDiff(int id) { - return DataFilesDiff.builder() + private InternalFilesDiff getFilesDiff(int id) { + return InternalFilesDiff.builder() .filesAdded( Collections.singletonList( InternalDataFile.builder() diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 140eb8ad..fc3f6b92 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -51,9 +51,9 @@ import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.DataFileIterator; @@ -127,7 +127,7 @@ public class DeltaConversionSource implements ConversionSource<Long> { true, DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); - addedFiles.put(dataFile.getPhysicalPath(), dataFile); + addedFiles.put(dataFile.physicalPath(), dataFile); String deleteVectorPath = actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); if (deleteVectorPath != null) { @@ -141,7 +141,7 @@ public class DeltaConversionSource implements ConversionSource<Long> { fileFormat, tableAtVersion.getPartitioningFields(), DeltaPartitionExtractor.getInstance()); - removedFiles.put(dataFile.getPhysicalPath(), dataFile); + removedFiles.put(dataFile.physicalPath(), dataFile); } } @@ -162,12 +162,15 @@ public class DeltaConversionSource implements ConversionSource<Long> { } } - DataFilesDiff dataFilesDiff = - DataFilesDiff.builder() + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder() .filesAdded(addedFiles.values()) .filesRemoved(removedFiles.values()) .build(); - return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .filesDiff(internalFilesDiff) + .build(); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index 343a2d21..db2e771f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -60,7 +60,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.schema.SparkSchemaExtractor; @@ -187,10 +187,10 @@ public class DeltaConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { transactionState.setActions( dataFileUpdatesExtractor.applyDiff( - dataFilesDiff, + internalFilesDiff, transactionState.getLatestSchemaInternal(), deltaLog.dataPath().toString())); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 96776750..d5c05bbb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -39,9 +39,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.ColumnStat; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.paths.PathUtils; @@ -75,20 +75,20 @@ public class DeltaDataFileUpdatesExtractor { file -> file)); FilesDiff<InternalDataFile, Action> diff = - DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); return applyDiff( diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, deltaLog.dataPath().toString()); } public Seq<Action> applyDiff( - DataFilesDiff dataFilesDiff, InternalSchema tableSchema, String tableBasePath) { + InternalFilesDiff internalFilesDiff, InternalSchema tableSchema, String tableBasePath) { List<Action> removeActions = - dataFilesDiff.getFilesRemoved().stream() + internalFilesDiff.dataFilesRemoved().stream() .flatMap(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath)) .map(AddFile::remove) - .collect(CustomCollectors.toList(dataFilesDiff.getFilesRemoved().size())); - return applyDiff(dataFilesDiff.getFilesAdded(), removeActions, tableSchema, tableBasePath); + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size())); + return applyDiff(internalFilesDiff.dataFilesAdded(), removeActions, tableSchema, tableBasePath); } private Seq<Action> applyDiff( @@ -112,12 +112,12 @@ public class DeltaDataFileUpdatesExtractor { new AddFile( // Delta Lake supports relative and absolute paths in theory but relative paths seem // more commonly supported by query engines in our testing - PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath), + PathUtils.getRelativePath(dataFile.physicalPath(), tableBasePath), convertJavaMapToScala(deltaPartitionExtractor.partitionValueSerialization(dataFile)), - dataFile.getFileSizeBytes(), - dataFile.getLastModified(), + dataFile.fileSizeBytes(), + dataFile.lastModified(), true, - getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()), + getColumnStats(schema, dataFile.recordCount(), dataFile.columnStats()), null, null)); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java index 3492857f..57568281 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java @@ -253,11 +253,11 @@ public class DeltaPartitionExtractor { public Map<String, String> partitionValueSerialization(InternalDataFile internalDataFile) { Map<String, String> partitionValuesSerialized = new HashMap<>(); - if (internalDataFile.getPartitionValues() == null - || internalDataFile.getPartitionValues().isEmpty()) { + if (internalDataFile.partitionValues() == null + || internalDataFile.partitionValues().isEmpty()) { return partitionValuesSerialized; } - for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) { + for (PartitionValue partitionValue : internalDataFile.partitionValues()) { InternalPartitionField partitionField = partitionValue.getPartitionField(); PartitionTransformType transformType = partitionField.getTransformType(); String partitionValueSerialized; diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java index bef00013..bec8cc4c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java @@ -54,8 +54,8 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.stat.ColumnStat; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -107,7 +107,7 @@ public class BaseFileUpdatesExtractor { dataFiles.stream() .collect( Collectors.toMap( - InternalDataFile::getPhysicalPath, Function.identity())); + InternalDataFile::physicalPath, Function.identity())); List<HoodieBaseFile> baseFiles = isTableInitialized ? fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()) @@ -163,26 +163,27 @@ public class BaseFileUpdatesExtractor { } /** - * Converts the provided {@link DataFilesDiff}. + * Converts the provided {@link InternalFilesDiff}. * - * @param dataFilesDiff the diff to apply to the Hudi table + * @param internalFilesDiff the diff to apply to the Hudi table * @param commit The current commit started by the Hudi client * @return The information needed to create a "replace" commit for the Hudi table */ - ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull String commit) { + ReplaceMetadata convertDiff( + @NonNull InternalFilesDiff internalFilesDiff, @NonNull String commit) { // For all removed files, group by partition and extract the file id Map<String, List<String>> partitionToReplacedFileIds = - dataFilesDiff.getFilesRemoved().stream() - .map(file -> new CachingPath(file.getPhysicalPath())) + internalFilesDiff.dataFilesRemoved().stream() + .map(file -> new CachingPath(file.physicalPath())) .collect( Collectors.groupingBy( path -> HudiPathUtils.getPartitionPath(tableBasePath, path), Collectors.mapping(this::getFileId, Collectors.toList()))); // For all added files, group by partition and extract the file id List<WriteStatus> writeStatuses = - dataFilesDiff.getFilesAdded().stream() + internalFilesDiff.dataFilesAdded().stream() .map(file -> toWriteStatus(tableBasePath, commit, file, Optional.empty())) - .collect(CustomCollectors.toList(dataFilesDiff.getFilesAdded().size())); + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesAdded().size())); return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses); } @@ -212,7 +213,7 @@ public class BaseFileUpdatesExtractor { InternalDataFile file, Optional<String> partitionPathOptional) { WriteStatus writeStatus = new WriteStatus(); - Path path = new CachingPath(file.getPhysicalPath()); + Path path = new CachingPath(file.physicalPath()); String partitionPath = partitionPathOptional.orElseGet(() -> HudiPathUtils.getPartitionPath(tableBasePath, path)); String fileId = getFileId(path); @@ -226,10 +227,10 @@ public class BaseFileUpdatesExtractor { writeStat.setPath( ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(filePath, commitTime)); writeStat.setPartitionPath(partitionPath); - writeStat.setNumWrites(file.getRecordCount()); - writeStat.setTotalWriteBytes(file.getFileSizeBytes()); - writeStat.setFileSizeInBytes(file.getFileSizeBytes()); - writeStat.putRecordsStats(convertColStats(fileName, file.getColumnStats())); + writeStat.setNumWrites(file.recordCount()); + writeStat.setTotalWriteBytes(file.fileSizeBytes()); + writeStat.setFileSizeInBytes(file.fileSizeBytes()); + writeStat.putRecordsStats(convertColStats(fileName, file.columnStats())); writeStatus.setStat(writeStat); return writeStatus; } @@ -275,6 +276,6 @@ public class BaseFileUpdatesExtractor { private String getPartitionPath(Path tableBasePath, List<InternalDataFile> files) { return HudiPathUtils.getPartitionPath( - tableBasePath, new CachingPath(files.get(0).getPhysicalPath())); + tableBasePath, new CachingPath(files.get(0).physicalPath())); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index c3ef6f92..06f188f2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -87,7 +87,7 @@ import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -251,9 +251,9 @@ public class HudiConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata = - baseFileUpdatesExtractor.convertDiff(dataFilesDiff, commitState.getInstantTime()); + baseFileUpdatesExtractor.convertDiff(internalFilesDiff, commitState.getInstantTime()); commitState.setReplaceMetadata(replaceMetadata); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5739328c..77c0ca98 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -65,9 +65,9 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.exception.ParseException; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** Extracts all the files for Hudi table represented by {@link InternalTable}. */ @@ -122,7 +122,7 @@ public class HudiDataFileExtractor implements AutoCloseable { } } - public DataFilesDiff getDiffForCommit( + public InternalFilesDiff getDiffForCommit( HoodieInstant hoodieInstantForDiff, InternalTable table, HoodieInstant instant, @@ -139,7 +139,7 @@ public class HudiDataFileExtractor implements AutoCloseable { .collect(Collectors.toList()); List<InternalDataFile> filesRemoved = allInfo.getRemoved(); - return DataFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); + return InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); } private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo( diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index e47ef72e..2d6ad726 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -107,7 +107,7 @@ public class HudiFileStatsExtractor { return files.map( file -> { HudiFileStats fileStats = - computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap); + computeColumnStatsForFile(new Path(file.physicalPath()), nameFieldMap); return file.toBuilder() .columnStats(fileStats.getColumnStats()) .recordCount(fileStats.getRowCount()) @@ -128,7 +128,7 @@ public class HudiFileStatsExtractor { Map<Pair<String, String>, InternalDataFile> filePathsToDataFile = files.collect( Collectors.toMap( - file -> getPartitionAndFileName(file.getPhysicalPath()), Function.identity())); + file -> getPartitionAndFileName(file.physicalPath()), Function.identity())); if (filePathsToDataFile.isEmpty()) { return Stream.empty(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 0d400e28..f6c2a420 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -58,9 +58,9 @@ import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.extractor.ConversionSource; @@ -192,8 +192,11 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { .map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable)) .collect(Collectors.toSet()); - DataFilesDiff filesDiff = - DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build(); + InternalFilesDiff filesDiff = + InternalFilesDiff.builder() + .filesAdded(dataFilesAdded) + .filesRemoved(dataFilesRemoved) + .build(); InternalTable table = getTable(snapshot); return TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build(); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index ecdbfa26..54a63a6d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -44,7 +44,7 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -204,9 +204,9 @@ public class IcebergConversionTarget implements ConversionTarget { } @Override - public void syncFilesForDiff(DataFilesDiff dataFilesDiff) { + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { dataFileUpdatesExtractor.applyDiff( - transaction, dataFilesDiff, transaction.table().schema(), transaction.table().spec()); + transaction, internalFilesDiff, transaction.table().schema(), transaction.table().spec()); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 80e1559f..f996a4e9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -30,9 +30,10 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FilesDiff; +import org.apache.xtable.model.storage.InternalBaseFile; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -58,23 +59,23 @@ public class IcebergDataFileUpdatesSync { } FilesDiff<InternalDataFile, DataFile> diff = - DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec); } public void applyDiff( Transaction transaction, - DataFilesDiff dataFilesDiff, + InternalFilesDiff internalFilesDiff, Schema schema, PartitionSpec partitionSpec) { Collection<DataFile> filesRemoved = - dataFilesDiff.getFilesRemoved().stream() + internalFilesDiff.dataFilesRemoved().stream() .map(file -> getDataFile(partitionSpec, schema, file)) .collect(Collectors.toList()); - applyDiff(transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, partitionSpec); + applyDiff(transaction, internalFilesDiff.dataFilesAdded(), filesRemoved, schema, partitionSpec); } private void applyDiff( @@ -93,15 +94,15 @@ public class IcebergDataFileUpdatesSync { PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) { DataFiles.Builder builder = DataFiles.builder(partitionSpec) - .withPath(dataFile.getPhysicalPath()) - .withFileSizeInBytes(dataFile.getFileSizeBytes()) + .withPath(dataFile.physicalPath()) + .withFileSizeInBytes(dataFile.fileSizeBytes()) .withMetrics( columnStatsConverter.toIceberg( - schema, dataFile.getRecordCount(), dataFile.getColumnStats())) - .withFormat(convertFileFormat(dataFile.getFileFormat())); + schema, dataFile.recordCount(), dataFile.columnStats())) + .withFormat(convertFileFormat(dataFile.fileFormat())); if (partitionSpec.isPartitioned()) { builder.withPartition( - partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.getPartitionValues())); + partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.partitionValues())); } return builder.build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java index a20ae4e7..61c9e8a9 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java @@ -40,7 +40,7 @@ public class ValidationTestHelper { List<String> filePaths = internalSnapshot.getPartitionedDataFiles().stream() .flatMap(group -> group.getFiles().stream()) - .map(InternalDataFile::getPhysicalPath) + .map(InternalDataFile::physicalPath) .collect(Collectors.toList()); replaceFileScheme(allActivePaths); replaceFileScheme(filePaths); @@ -83,20 +83,20 @@ public class ValidationTestHelper { filesForCommitBefore.stream() .filter(file -> !filesForCommitAfter.contains(file)) .collect(Collectors.toSet()); - assertEquals(filesAdded, extractPathsFromDataFile(tableChange.getFilesDiff().getFilesAdded())); + assertEquals(filesAdded, extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesAdded())); assertEquals( - filesRemoved, extractPathsFromDataFile(tableChange.getFilesDiff().getFilesRemoved())); + filesRemoved, extractPathsFromDataFile(tableChange.getFilesDiff().dataFilesAdded())); } public static List<String> getAllFilePaths(InternalSnapshot internalSnapshot) { return internalSnapshot.getPartitionedDataFiles().stream() .flatMap(fileGroup -> fileGroup.getFiles().stream()) - .map(InternalDataFile::getPhysicalPath) + .map(InternalDataFile::physicalPath) .collect(Collectors.toList()); } private static Set<String> extractPathsFromDataFile(Set<InternalDataFile> dataFiles) { - return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet()); + return dataFiles.stream().map(InternalDataFile::physicalPath).collect(Collectors.toSet()); } private static void replaceFileScheme(List<String> filePaths) { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java index 3b1cc529..2d022a09 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java @@ -721,25 +721,25 @@ public class ITDeltaConversionSource { private void validatePropertiesDataFile(InternalDataFile expected, InternalDataFile actual) throws URISyntaxException { Assertions.assertTrue( - Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(), - () -> "path == " + actual.getPhysicalPath() + " is not absolute"); - Assertions.assertEquals(expected.getFileFormat(), actual.getFileFormat()); - Assertions.assertEquals(expected.getPartitionValues(), actual.getPartitionValues()); - Assertions.assertEquals(expected.getFileSizeBytes(), actual.getFileSizeBytes()); - Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount()); + Paths.get(new URI(actual.physicalPath()).getPath()).isAbsolute(), + () -> "path == " + actual.physicalPath() + " is not absolute"); + Assertions.assertEquals(expected.fileFormat(), actual.fileFormat()); + Assertions.assertEquals(expected.partitionValues(), actual.partitionValues()); + Assertions.assertEquals(expected.fileSizeBytes(), actual.fileSizeBytes()); + Assertions.assertEquals(expected.recordCount(), actual.recordCount()); Instant now = Instant.now(); long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli(); long maxRange = now.toEpochMilli(); Assertions.assertTrue( - actual.getLastModified() > minRange && actual.getLastModified() <= maxRange, + actual.lastModified() > minRange && actual.lastModified() <= maxRange, () -> "last modified == " - + actual.getLastModified() + + actual.lastModified() + " is expected between " + minRange + " and " + maxRange); - Assertions.assertEquals(expected.getColumnStats(), actual.getColumnStats()); + Assertions.assertEquals(expected.columnStats(), actual.columnStats()); } private static Stream<Arguments> testWithPartitionToggle() { @@ -749,7 +749,7 @@ public class ITDeltaConversionSource { private boolean checkIfFileIsRemoved(String activePath, TableChange tableChange) { Set<String> filePathsRemoved = tableChange.getFilesDiff().getFilesRemoved().stream() - .map(oneDf -> oneDf.getPhysicalPath()) + .map(oneDf -> oneDf.physicalPath()) .collect(Collectors.toSet()); return filePathsRemoved.contains(activePath); } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index f0f889d2..61da728c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -391,7 +391,7 @@ public class TestDeltaSync { } Map<String, InternalDataFile> pathToFile = internalDataFiles.stream() - .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); int count = 0; try (CloseableIterator<AddFile> fileItr = deltaScan.getFiles()) { for (CloseableIterator<AddFile> it = fileItr; it.hasNext(); ) { @@ -400,7 +400,7 @@ public class TestDeltaSync { new org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString(); InternalDataFile expected = pathToFile.get(fullPath); assertNotNull(expected); - assertEquals(addFile.getSize(), expected.getFileSizeBytes()); + assertEquals(addFile.getSize(), expected.fileSizeBytes()); count++; } } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java index 376cceda..7efc20ef 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java @@ -705,10 +705,10 @@ public class ITHudiConversionSource { tableChange.getFilesDiff().getFilesAdded().stream() .collect( Collectors.groupingBy( - oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(), + oneDf -> getFileGroupInfo(oneDf.physicalPath()).getFileId(), Collectors.collectingAndThen( Collectors.mapping( - oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(), + oneDf -> getFileGroupInfo(oneDf.physicalPath()).getCommitTime(), Collectors.toList()), list -> { if (list.size() > 1) { @@ -733,10 +733,10 @@ public class ITHudiConversionSource { tableChange.getFilesDiff().getFilesRemoved().stream() .collect( Collectors.groupingBy( - oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(), + oneDf -> getFileGroupInfo(oneDf.physicalPath()).getFileId(), Collectors.collectingAndThen( Collectors.mapping( - oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(), + oneDf -> getFileGroupInfo(oneDf.physicalPath()).getCommitTime(), Collectors.toList()), list -> { if (list.size() > 1) { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java index 03bb6e2c..72298f39 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java @@ -81,10 +81,10 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; @@ -192,8 +192,8 @@ public class ITHudiConversionTarget { String fileName = "file_1.parquet"; String filePath = getFilePath(partitionPath, fileName); - DataFilesDiff dataFilesDiff = - DataFilesDiff.builder() + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder() .fileAdded(getTestFile(partitionPath, fileName)) .fileRemoved(fileToRemove) .build(); @@ -201,7 +201,7 @@ public class ITHudiConversionTarget { HudiConversionTarget targetClient = getTargetClient(); InternalTable initialState = getState(Instant.now()); targetClient.beginSync(initialState); - targetClient.syncFilesForDiff(dataFilesDiff); + targetClient.syncFilesForDiff(internalFilesDiff); targetClient.syncSchema(SCHEMA); TableSyncMetadata latestState = TableSyncMetadata.of(initialState.getLatestCommitTime(), Collections.emptyList()); @@ -377,11 +377,11 @@ public class ITHudiConversionTarget { List<InternalDataFile> filesToAdd, List<InternalDataFile> filesToRemove, Instant commitStart) { - DataFilesDiff dataFilesDiff2 = - DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build(); + InternalFilesDiff internalFilesDiff2 = + InternalFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build(); InternalTable state3 = getState(commitStart); conversionTarget.beginSync(state3); - conversionTarget.syncFilesForDiff(dataFilesDiff2); + conversionTarget.syncFilesForDiff(internalFilesDiff2); TableSyncMetadata latestState = TableSyncMetadata.of(state3.getLatestCommitTime(), Collections.emptyList()); conversionTarget.syncMetadata(latestState); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java index 8f3b3f7e..4958714c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java @@ -59,9 +59,9 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; -import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.testutil.ColumnStatMapUtil; @@ -119,8 +119,8 @@ public class TestBaseFileUpdatesExtractor { String.format("%s/%s/%s", tableBasePath, partitionPath2, fileName5), Collections.emptyList()); - DataFilesDiff diff = - DataFilesDiff.builder() + InternalFilesDiff diff = + InternalFilesDiff.builder() .filesAdded(Arrays.asList(addedFile1, addedFile2)) .filesRemoved(Arrays.asList(removedFile1, removedFile2, removedFile3)) .build(); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java index fe20db05..dfaf2866 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java @@ -49,7 +49,7 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; /** @@ -211,7 +211,7 @@ public class TestHudiConversionTarget { HudiConversionTarget.CommitState mockCommitState = initMocksForBeginSync(targetClient).getLeft(); String instant = "commit"; - DataFilesDiff input = DataFilesDiff.builder().build(); + InternalFilesDiff input = InternalFilesDiff.builder().build(); BaseFileUpdatesExtractor.ReplaceMetadata output = BaseFileUpdatesExtractor.ReplaceMetadata.of( Collections.emptyMap(), Collections.emptyList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index a18bb743..790a45cb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -202,8 +202,8 @@ public class TestHudiFileStatsExtractor { private void validateOutput(List<InternalDataFile> output) { assertEquals(1, output.size()); InternalDataFile fileWithStats = output.get(0); - assertEquals(2, fileWithStats.getRecordCount()); - List<ColumnStat> columnStats = fileWithStats.getColumnStats(); + assertEquals(2, fileWithStats.recordCount()); + List<ColumnStat> columnStats = fileWithStats.columnStats(); assertEquals(9, columnStats.size()); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index 4006b543..cc18fbfb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -151,16 +151,16 @@ class TestIcebergConversionSource { List<InternalDataFile> internalDataFiles = dataFilesChunk.getFiles(); assertEquals(1, internalDataFiles.size()); InternalDataFile internalDataFile = internalDataFiles.get(0); - assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.getFileFormat()); - assertEquals(1, internalDataFile.getRecordCount()); - Assertions.assertTrue(internalDataFile.getPhysicalPath().startsWith("file:" + workingDir)); + assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.fileFormat()); + assertEquals(1, internalDataFile.recordCount()); + Assertions.assertTrue(internalDataFile.physicalPath().startsWith("file:" + workingDir)); - List<PartitionValue> partitionValues = internalDataFile.getPartitionValues(); + List<PartitionValue> partitionValues = internalDataFile.partitionValues(); assertEquals(1, partitionValues.size()); PartitionValue partitionEntry = partitionValues.iterator().next(); assertEquals( "cs_sold_date_sk", partitionEntry.getPartitionField().getSourceField().getName()); - assertEquals(7, internalDataFile.getColumnStats().size()); + assertEquals(7, internalDataFile.columnStats().size()); } } @@ -315,8 +315,8 @@ class TestIcebergConversionSource { TableChange tableChange = conversionSource.getTableChangeForCommit(snapshot); assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size()); assertTrue( - tableChange.getFilesDiff().getFilesAdded().stream() - .allMatch(file -> file.getColumnStats().size() == numberOfColumns)); + tableChange.getFilesDiff().dataFilesAdded().stream() + .allMatch(file -> file.columnStats().size() == numberOfColumns)); assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size()); } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index bd36dde9..d0964bcd 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -737,14 +737,13 @@ public class TestIcebergSync { assertEquals(expectedFiles.size(), combinedScanTask.files().size()); Map<String, InternalDataFile> pathToFile = expectedFiles.stream() - .collect( - Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); for (FileScanTask fileScanTask : combinedScanTask.files()) { // check that path and other stats match InternalDataFile expected = pathToFile.get(fileScanTask.file().path()); assertNotNull(expected); - assertEquals(expected.getFileSizeBytes(), fileScanTask.file().fileSizeInBytes()); - assertEquals(expected.getRecordCount(), fileScanTask.file().recordCount()); + assertEquals(expected.fileSizeBytes(), fileScanTask.file().fileSizeInBytes()); + assertEquals(expected.recordCount(), fileScanTask.file().recordCount()); } } } @@ -762,11 +761,11 @@ public class TestIcebergSync { } private void mockColStatsForFile(InternalDataFile dataFile, int times) { - Metrics response = new Metrics(dataFile.getRecordCount(), null, null, null, null); + Metrics response = new Metrics(dataFile.recordCount(), null, null, null, null); Metrics[] responses = IntStream.of(times - 1).mapToObj(unused -> response).toArray(Metrics[]::new); when(mockColumnStatsConverter.toIceberg( - any(Schema.class), eq(dataFile.getRecordCount()), eq(Collections.emptyList()))) + any(Schema.class), eq(dataFile.recordCount()), eq(Collections.emptyList()))) .thenReturn(response, responses); } }