This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch 652-generic-internal-file-representation in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/652-generic-internal-file-representation by this push: new bd3f38ff Address review comments bd3f38ff is described below commit bd3f38ff24f3bc2fba8db299352d6ea870f39b0b Author: Ashvin Agrawal <ash...@apache.org> AuthorDate: Tue Feb 18 16:51:11 2025 -0800 Address review comments --- .../org/apache/xtable/model/storage/FileType.java | 45 ---------------------- .../org/apache/xtable/model/storage/FilesDiff.java | 6 +-- .../xtable/model/storage/InternalDataFile.java | 5 +-- .../xtable/model/storage/InternalFilesDiff.java | 16 ++++---- ...ernalBaseFile.java => InternalStorageFile.java} | 17 ++------ .../xtable/model/storage/PartitionFileGroup.java | 12 +++--- .../apache/xtable/model/storage/TestFilesDiff.java | 2 +- .../apache/xtable/delta/DeltaConversionSource.java | 4 +- .../delta/DeltaDataFileUpdatesExtractor.java | 16 ++++---- .../xtable/delta/DeltaPartitionExtractor.java | 6 +-- .../xtable/hudi/BaseFileUpdatesExtractor.java | 18 ++++----- .../apache/xtable/hudi/HudiFileStatsExtractor.java | 4 +- .../xtable/iceberg/IcebergDataFileUpdatesSync.java | 18 ++++----- .../org/apache/xtable/ValidationTestHelper.java | 10 ++--- .../xtable/delta/ITDeltaConversionSource.java | 22 +++++------ .../org/apache/xtable/delta/TestDeltaSync.java | 4 +- .../apache/xtable/hudi/ITHudiConversionSource.java | 8 ++-- .../xtable/hudi/TestHudiFileStatsExtractor.java | 4 +- .../iceberg/TestIcebergConversionSource.java | 14 +++---- .../org/apache/xtable/iceberg/TestIcebergSync.java | 11 +++--- 20 files changed, 92 insertions(+), 150 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java deleted file mode 100644 index 7d886181..00000000 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.model.storage; - -/** - * Known types of storage files. For e.g. data file of a table, a position deletion, statistics - * file. - * - * @since 0.1 - */ -public enum FileType { - /** Files of type data contain the actual data records of the table. */ - DATA_FILE, - - /** - * Files of type deletion contain information of soft deleted records of the table, typically - * containing ordinal references - */ - DELETION_FILE, - - /** - * Files of type statistics typically contain supplemental statistics information related to a - * table, its partitions, or data files - */ - STATISTICS_FILE, - - /** Files of type index contain information related to the indexes of a table */ - INDEX_FILE -} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java index 05f760e6..0bb00aa2 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java @@ -92,12 +92,12 @@ public class FilesDiff<L, P> { * @param <P> the type of the previous files * @return the set of files that are added */ - public static <P> FilesDiff<? extends InternalBaseFile, P> findNewAndRemovedFiles( + public static <P> FilesDiff<InternalStorageFile, P> findNewAndRemovedFiles( List<PartitionFileGroup> latestFileGroups, Map<String, P> previousFiles) { - Map<String, ? extends InternalBaseFile> latestFiles = + Map<String, InternalStorageFile> latestFiles = latestFileGroups.stream() .flatMap(group -> group.getFiles().stream()) - .collect(Collectors.toMap(InternalBaseFile::physicalPath, Function.identity())); + .collect(Collectors.toMap(InternalStorageFile::getPhysicalPath, Function.identity())); return findNewAndRemovedFiles(latestFiles, previousFiles); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java index effbc887..3f1a7fa5 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java @@ -26,7 +26,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; import lombok.ToString; -import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; import lombok.experimental.SuperBuilder; @@ -42,15 +41,13 @@ import org.apache.xtable.model.stat.PartitionValue; @FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -@Accessors(fluent = true) @Getter -public class InternalDataFile extends InternalBaseFile { +public class InternalDataFile extends InternalStorageFile { // file format @Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET; // partition ranges for the data file @Builder.Default @NonNull List<PartitionValue> partitionValues = Collections.emptyList(); - long recordCount; // column stats for each column in the data file @Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList(); // last modified time in millis since epoch diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java index 527d5abd..4de0c976 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java @@ -32,7 +32,7 @@ import lombok.experimental.SuperBuilder; @Value @EqualsAndHashCode(callSuper = true) @SuperBuilder -public class InternalFilesDiff extends FilesDiff<InternalBaseFile, InternalBaseFile> { +public class InternalFilesDiff extends FilesDiff<InternalStorageFile, InternalStorageFile> { /** * Creates a InternalFilesDiff from the list of files in the target table and the list of files in @@ -46,10 +46,10 @@ public class InternalFilesDiff extends FilesDiff<InternalBaseFile, InternalBaseF List<InternalDataFile> source, List<InternalDataFile> target) { Map<String, InternalDataFile> targetPaths = target.stream() - .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); Map<String, InternalDataFile> sourcePaths = source.stream() - .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); FilesDiff<InternalDataFile, InternalDataFile> diff = findNewAndRemovedFiles(sourcePaths, targetPaths); @@ -60,26 +60,26 @@ public class InternalFilesDiff extends FilesDiff<InternalBaseFile, InternalBaseF } /** - * Filters files of type {@link FileType#DATA_FILE} from the list of files added to the source - * table and returns the list. + * Filters files of type {@link InternalDataFile} from the list of files added to the source table + * and returns the list. */ public Set<InternalDataFile> dataFilesAdded() { Set<InternalDataFile> result = getFilesAdded().stream() - .filter(InternalBaseFile::isDataFile) + .filter(InternalDataFile.class::isInstance) .map(file -> (InternalDataFile) file) .collect(Collectors.toSet()); return result; } /** - * Filters files of type {@link FileType#DATA_FILE} from the list of files removed to the source + * Filters files of type {@link InternalDataFile} from the list of files removed to the source * table and returns the list. */ public Set<InternalDataFile> dataFilesRemoved() { Set<InternalDataFile> result = getFilesRemoved().stream() - .filter(InternalBaseFile::isDataFile) + .filter(InternalDataFile.class::isInstance) .map(file -> (InternalDataFile) file) .collect(Collectors.toSet()); return result; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java similarity index 83% rename from xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java rename to xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java index 67e0f6b8..04591d89 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java @@ -19,12 +19,10 @@ package org.apache.xtable.model.storage; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; import lombok.ToString; -import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; import lombok.experimental.SuperBuilder; @@ -43,25 +41,16 @@ import lombok.experimental.SuperBuilder; @ToString(callSuper = true) @EqualsAndHashCode @AllArgsConstructor -@Accessors(fluent = true) @Getter -public abstract class InternalBaseFile { +public abstract class InternalStorageFile { // Absolute path of the storage file, with the scheme, that contains this logical file. Typically, // one physical storage file contains only one base file, for e.g. a parquet data file. However, // in some cases, one storage file can contain multiple logical storage files for optimizations. @NonNull String physicalPath; - // The offset of the logical file in the physical storage file. - @Builder.Default long offset = 0; - // The size of the logical file in the physical storage file. long fileSizeBytes; - // File Type represents the type of the storage file. For example, a data file, a delete file, a - // stat file - @Builder.Default @NonNull FileType fileType = FileType.DATA_FILE; - - public boolean isDataFile() { - return fileType.equals(FileType.DATA_FILE); - } + // The number of records in the storage file. + long recordCount; } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java index b386cd23..cd6dc6c4 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java @@ -28,12 +28,12 @@ import lombok.Value; import org.apache.xtable.model.stat.PartitionValue; -/** Represents a grouping of {@link InternalBaseFile} with the same partition values. */ +/** Represents a grouping of {@link InternalStorageFile} with the same partition values. */ @Value @Builder public class PartitionFileGroup { List<PartitionValue> partitionValues; - List<? extends InternalBaseFile> files; + List<? extends InternalStorageFile> files; public static List<PartitionFileGroup> fromFiles(List<InternalDataFile> files) { return fromFiles(files.stream()); @@ -41,7 +41,7 @@ public class PartitionFileGroup { public static List<PartitionFileGroup> fromFiles(Stream<InternalDataFile> files) { Map<List<PartitionValue>, List<InternalDataFile>> filesGrouped = - files.collect(Collectors.groupingBy(InternalDataFile::partitionValues)); + files.collect(Collectors.groupingBy(InternalDataFile::getPartitionValues)); return filesGrouped.entrySet().stream() .map( entry -> @@ -52,10 +52,10 @@ public class PartitionFileGroup { .collect(Collectors.toList()); } - /** Filters storage files of type {@link FileType#DATA_FILE} and returns them. */ - public List<InternalDataFile> dataFiles() { + /** Filters storage files of type {@link InternalDataFile} and returns them. */ + public List<InternalDataFile> getDataFiles() { return files.stream() - .filter(InternalBaseFile::isDataFile) + .filter(InternalDataFile.class::isInstance) .map(file -> (InternalDataFile) file) .collect(Collectors.toList()); } diff --git a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java index a93bb622..ace58cd8 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java @@ -50,7 +50,7 @@ public class TestFilesDiff { previousFiles.put("file2NoGroup", file2); previousFiles.put("file2Group2", file3); - FilesDiff<? extends InternalBaseFile, File> diff = + FilesDiff<InternalStorageFile, File> diff = FilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles); assertEquals(2, diff.getFilesAdded().size()); assertTrue(diff.getFilesAdded().contains(file1Group2)); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index fc3f6b92..47d932f7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -127,7 +127,7 @@ public class DeltaConversionSource implements ConversionSource<Long> { true, DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); - addedFiles.put(dataFile.physicalPath(), dataFile); + addedFiles.put(dataFile.getPhysicalPath(), dataFile); String deleteVectorPath = actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); if (deleteVectorPath != null) { @@ -141,7 +141,7 @@ public class DeltaConversionSource implements ConversionSource<Long> { fileFormat, tableAtVersion.getPartitioningFields(), DeltaPartitionExtractor.getInstance()); - removedFiles.put(dataFile.physicalPath(), dataFile); + removedFiles.put(dataFile.getPhysicalPath(), dataFile); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 022c5591..f33ccaef 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -40,9 +40,9 @@ import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.FilesDiff; -import org.apache.xtable.model.storage.InternalBaseFile; import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.InternalStorageFile; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.paths.PathUtils; @@ -75,7 +75,7 @@ public class DeltaDataFileUpdatesExtractor { file -> DeltaActionsConverter.getFullPathToFile(snapshot, file.path()), file -> file)); - FilesDiff<? extends InternalBaseFile, Action> diff = + FilesDiff<InternalStorageFile, Action> diff = InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); return applyDiff( @@ -93,13 +93,13 @@ public class DeltaDataFileUpdatesExtractor { } private Seq<Action> applyDiff( - Set<? extends InternalBaseFile> filesAdded, + Set<? extends InternalStorageFile> filesAdded, Collection<Action> removeFileActions, InternalSchema tableSchema, String tableBasePath) { Stream<Action> addActions = filesAdded.stream() - .filter(InternalBaseFile::isDataFile) + .filter(InternalDataFile.class::isInstance) .map(file -> (InternalDataFile) file) .flatMap(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath)); int totalActions = filesAdded.size() + removeFileActions.size(); @@ -115,12 +115,12 @@ public class DeltaDataFileUpdatesExtractor { new AddFile( // Delta Lake supports relative and absolute paths in theory but relative paths seem // more commonly supported by query engines in our testing - PathUtils.getRelativePath(dataFile.physicalPath(), tableBasePath), + PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath), convertJavaMapToScala(deltaPartitionExtractor.partitionValueSerialization(dataFile)), - dataFile.fileSizeBytes(), - dataFile.lastModified(), + dataFile.getFileSizeBytes(), + dataFile.getLastModified(), true, - getColumnStats(schema, dataFile.recordCount(), dataFile.columnStats()), + getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()), null, null)); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java index 57568281..3492857f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java @@ -253,11 +253,11 @@ public class DeltaPartitionExtractor { public Map<String, String> partitionValueSerialization(InternalDataFile internalDataFile) { Map<String, String> partitionValuesSerialized = new HashMap<>(); - if (internalDataFile.partitionValues() == null - || internalDataFile.partitionValues().isEmpty()) { + if (internalDataFile.getPartitionValues() == null + || internalDataFile.getPartitionValues().isEmpty()) { return partitionValuesSerialized; } - for (PartitionValue partitionValue : internalDataFile.partitionValues()) { + for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) { InternalPartitionField partitionField = partitionValue.getPartitionField(); PartitionTransformType transformType = partitionField.getTransformType(); String partitionValueSerialized; diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java index 76a0300c..ba1f9f9d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java @@ -96,7 +96,7 @@ public class BaseFileUpdatesExtractor { partitionedDataFiles.stream() .map( partitionFileGroup -> { - List<InternalDataFile> dataFiles = partitionFileGroup.dataFiles(); + List<InternalDataFile> dataFiles = partitionFileGroup.getDataFiles(); String partitionPath = getPartitionPath(tableBasePath, dataFiles); // remove the partition from the set of partitions to drop since it is present in // the snapshot @@ -107,7 +107,7 @@ public class BaseFileUpdatesExtractor { dataFiles.stream() .collect( Collectors.toMap( - InternalDataFile::physicalPath, Function.identity())); + InternalDataFile::getPhysicalPath, Function.identity())); List<HoodieBaseFile> baseFiles = isTableInitialized ? fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()) @@ -174,7 +174,7 @@ public class BaseFileUpdatesExtractor { // For all removed files, group by partition and extract the file id Map<String, List<String>> partitionToReplacedFileIds = internalFilesDiff.dataFilesRemoved().stream() - .map(file -> new CachingPath(file.physicalPath())) + .map(file -> new CachingPath(file.getPhysicalPath())) .collect( Collectors.groupingBy( path -> HudiPathUtils.getPartitionPath(tableBasePath, path), @@ -213,7 +213,7 @@ public class BaseFileUpdatesExtractor { InternalDataFile file, Optional<String> partitionPathOptional) { WriteStatus writeStatus = new WriteStatus(); - Path path = new CachingPath(file.physicalPath()); + Path path = new CachingPath(file.getPhysicalPath()); String partitionPath = partitionPathOptional.orElseGet(() -> HudiPathUtils.getPartitionPath(tableBasePath, path)); String fileId = getFileId(path); @@ -227,10 +227,10 @@ public class BaseFileUpdatesExtractor { writeStat.setPath( ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(filePath, commitTime)); writeStat.setPartitionPath(partitionPath); - writeStat.setNumWrites(file.recordCount()); - writeStat.setTotalWriteBytes(file.fileSizeBytes()); - writeStat.setFileSizeInBytes(file.fileSizeBytes()); - writeStat.putRecordsStats(convertColStats(fileName, file.columnStats())); + writeStat.setNumWrites(file.getRecordCount()); + writeStat.setTotalWriteBytes(file.getFileSizeBytes()); + writeStat.setFileSizeInBytes(file.getFileSizeBytes()); + writeStat.putRecordsStats(convertColStats(fileName, file.getColumnStats())); writeStatus.setStat(writeStat); return writeStatus; } @@ -276,6 +276,6 @@ public class BaseFileUpdatesExtractor { private String getPartitionPath(Path tableBasePath, List<InternalDataFile> files) { return HudiPathUtils.getPartitionPath( - tableBasePath, new CachingPath(files.get(0).physicalPath())); + tableBasePath, new CachingPath(files.get(0).getPhysicalPath())); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index 2d6ad726..e47ef72e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -107,7 +107,7 @@ public class HudiFileStatsExtractor { return files.map( file -> { HudiFileStats fileStats = - computeColumnStatsForFile(new Path(file.physicalPath()), nameFieldMap); + computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap); return file.toBuilder() .columnStats(fileStats.getColumnStats()) .recordCount(fileStats.getRowCount()) @@ -128,7 +128,7 @@ public class HudiFileStatsExtractor { Map<Pair<String, String>, InternalDataFile> filePathsToDataFile = files.collect( Collectors.toMap( - file -> getPartitionAndFileName(file.physicalPath()), Function.identity())); + file -> getPartitionAndFileName(file.getPhysicalPath()), Function.identity())); if (filePathsToDataFile.isEmpty()) { return Stream.empty(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 5c4cf2ba..dea5caa0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -31,9 +31,9 @@ import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.storage.FilesDiff; -import org.apache.xtable.model.storage.InternalBaseFile; import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.InternalStorageFile; import org.apache.xtable.model.storage.PartitionFileGroup; @AllArgsConstructor(staticName = "of") @@ -58,7 +58,7 @@ public class IcebergDataFileUpdatesSync { throw new ReadException("Failed to iterate through Iceberg data files", e); } - FilesDiff<? extends InternalBaseFile, DataFile> diff = + FilesDiff<InternalStorageFile, DataFile> diff = InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, partitionSpec); @@ -80,13 +80,13 @@ public class IcebergDataFileUpdatesSync { private void applyDiff( Transaction transaction, - Collection<? extends InternalBaseFile> filesAdded, + Collection<? extends InternalStorageFile> filesAdded, Collection<DataFile> filesRemoved, Schema schema, PartitionSpec partitionSpec) { OverwriteFiles overwriteFiles = transaction.newOverwrite(); filesAdded.stream() - .filter(InternalBaseFile::isDataFile) + .filter(InternalDataFile.class::isInstance) .map(file -> (InternalDataFile) file) .forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f))); filesRemoved.forEach(overwriteFiles::deleteFile); @@ -97,15 +97,15 @@ public class IcebergDataFileUpdatesSync { PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) { DataFiles.Builder builder = DataFiles.builder(partitionSpec) - .withPath(dataFile.physicalPath()) - .withFileSizeInBytes(dataFile.fileSizeBytes()) + .withPath(dataFile.getPhysicalPath()) + .withFileSizeInBytes(dataFile.getFileSizeBytes()) .withMetrics( columnStatsConverter.toIceberg( - schema, dataFile.recordCount(), dataFile.columnStats())) - .withFormat(convertFileFormat(dataFile.fileFormat())); + schema, dataFile.getRecordCount(), dataFile.getColumnStats())) + .withFormat(convertFileFormat(dataFile.getFileFormat())); if (partitionSpec.isPartitioned()) { builder.withPartition( - partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.partitionValues())); + partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.getPartitionValues())); } return builder.build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java index 9ea3baf4..9e95f279 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java @@ -39,8 +39,8 @@ public class ValidationTestHelper { assertNotNull(internalSnapshot.getTable()); List<String> filePaths = internalSnapshot.getPartitionedDataFiles().stream() - .flatMap(group -> group.dataFiles().stream()) - .map(InternalDataFile::physicalPath) + .flatMap(group -> group.getDataFiles().stream()) + .map(InternalDataFile::getPhysicalPath) .collect(Collectors.toList()); replaceFileScheme(allActivePaths); replaceFileScheme(filePaths); @@ -90,13 +90,13 @@ public class ValidationTestHelper { public static List<String> getAllFilePaths(InternalSnapshot internalSnapshot) { return internalSnapshot.getPartitionedDataFiles().stream() - .flatMap(fileGroup -> fileGroup.dataFiles().stream()) - .map(InternalDataFile::physicalPath) + .flatMap(fileGroup -> fileGroup.getDataFiles().stream()) + .map(InternalDataFile::getPhysicalPath) .collect(Collectors.toList()); } private static Set<String> extractPathsFromDataFile(Set<InternalDataFile> dataFiles) { - return dataFiles.stream().map(InternalDataFile::physicalPath).collect(Collectors.toSet()); + return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet()); } private static void replaceFileScheme(List<String> filePaths) { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java index 4c854ba3..4afb6bdb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java @@ -704,7 +704,7 @@ public class ITDeltaConversionSource { throws URISyntaxException { assertEquals( expectedPartitionFiles.getPartitionValues(), actualPartitionFiles.getPartitionValues()); - validateDataFiles(expectedPartitionFiles.dataFiles(), actualPartitionFiles.dataFiles()); + validateDataFiles(expectedPartitionFiles.getDataFiles(), actualPartitionFiles.getDataFiles()); } private void validateDataFiles( @@ -721,25 +721,25 @@ public class ITDeltaConversionSource { private void validatePropertiesDataFile(InternalDataFile expected, InternalDataFile actual) throws URISyntaxException { Assertions.assertTrue( - Paths.get(new URI(actual.physicalPath()).getPath()).isAbsolute(), - () -> "path == " + actual.physicalPath() + " is not absolute"); - Assertions.assertEquals(expected.fileFormat(), actual.fileFormat()); - Assertions.assertEquals(expected.partitionValues(), actual.partitionValues()); - Assertions.assertEquals(expected.fileSizeBytes(), actual.fileSizeBytes()); - Assertions.assertEquals(expected.recordCount(), actual.recordCount()); + Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(), + () -> "path == " + actual.getPhysicalPath() + " is not absolute"); + Assertions.assertEquals(expected.getFileFormat(), actual.getFileFormat()); + Assertions.assertEquals(expected.getPartitionValues(), actual.getPartitionValues()); + Assertions.assertEquals(expected.getFileSizeBytes(), actual.getFileSizeBytes()); + Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount()); Instant now = Instant.now(); long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli(); long maxRange = now.toEpochMilli(); Assertions.assertTrue( - actual.lastModified() > minRange && actual.lastModified() <= maxRange, + actual.getLastModified() > minRange && actual.getLastModified() <= maxRange, () -> "last modified == " - + actual.lastModified() + + actual.getLastModified() + " is expected between " + minRange + " and " + maxRange); - Assertions.assertEquals(expected.columnStats(), actual.columnStats()); + Assertions.assertEquals(expected.getColumnStats(), actual.getColumnStats()); } private static Stream<Arguments> testWithPartitionToggle() { @@ -749,7 +749,7 @@ public class ITDeltaConversionSource { private boolean checkIfFileIsRemoved(String activePath, TableChange tableChange) { Set<String> filePathsRemoved = tableChange.getFilesDiff().getFilesRemoved().stream() - .map(oneDf -> oneDf.physicalPath()) + .map(oneDf -> oneDf.getPhysicalPath()) .collect(Collectors.toSet()); return filePathsRemoved.contains(activePath); } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 61da728c..f0f889d2 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -391,7 +391,7 @@ public class TestDeltaSync { } Map<String, InternalDataFile> pathToFile = internalDataFiles.stream() - .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); + .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); int count = 0; try (CloseableIterator<AddFile> fileItr = deltaScan.getFiles()) { for (CloseableIterator<AddFile> it = fileItr; it.hasNext(); ) { @@ -400,7 +400,7 @@ public class TestDeltaSync { new org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString(); InternalDataFile expected = pathToFile.get(fullPath); assertNotNull(expected); - assertEquals(addFile.getSize(), expected.fileSizeBytes()); + assertEquals(addFile.getSize(), expected.getFileSizeBytes()); count++; } } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java index 7efc20ef..376cceda 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java @@ -705,10 +705,10 @@ public class ITHudiConversionSource { tableChange.getFilesDiff().getFilesAdded().stream() .collect( Collectors.groupingBy( - oneDf -> getFileGroupInfo(oneDf.physicalPath()).getFileId(), + oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(), Collectors.collectingAndThen( Collectors.mapping( - oneDf -> getFileGroupInfo(oneDf.physicalPath()).getCommitTime(), + oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(), Collectors.toList()), list -> { if (list.size() > 1) { @@ -733,10 +733,10 @@ public class ITHudiConversionSource { tableChange.getFilesDiff().getFilesRemoved().stream() .collect( Collectors.groupingBy( - oneDf -> getFileGroupInfo(oneDf.physicalPath()).getFileId(), + oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(), Collectors.collectingAndThen( Collectors.mapping( - oneDf -> getFileGroupInfo(oneDf.physicalPath()).getCommitTime(), + oneDf -> getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(), Collectors.toList()), list -> { if (list.size() > 1) { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index 790a45cb..a18bb743 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -202,8 +202,8 @@ public class TestHudiFileStatsExtractor { private void validateOutput(List<InternalDataFile> output) { assertEquals(1, output.size()); InternalDataFile fileWithStats = output.get(0); - assertEquals(2, fileWithStats.recordCount()); - List<ColumnStat> columnStats = fileWithStats.columnStats(); + assertEquals(2, fileWithStats.getRecordCount()); + List<ColumnStat> columnStats = fileWithStats.getColumnStats(); assertEquals(9, columnStats.size()); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java index 0cdc66c1..ab13ae2d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java @@ -148,19 +148,19 @@ class TestIcebergConversionSource { List<PartitionFileGroup> dataFileChunks = internalSnapshot.getPartitionedDataFiles(); assertEquals(5, dataFileChunks.size()); for (PartitionFileGroup dataFilesChunk : dataFileChunks) { - List<InternalDataFile> internalDataFiles = dataFilesChunk.dataFiles(); + List<InternalDataFile> internalDataFiles = dataFilesChunk.getDataFiles(); assertEquals(1, internalDataFiles.size()); InternalDataFile internalDataFile = internalDataFiles.get(0); - assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.fileFormat()); - assertEquals(1, internalDataFile.recordCount()); - Assertions.assertTrue(internalDataFile.physicalPath().startsWith("file:" + workingDir)); + assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.getFileFormat()); + assertEquals(1, internalDataFile.getRecordCount()); + Assertions.assertTrue(internalDataFile.getPhysicalPath().startsWith("file:" + workingDir)); - List<PartitionValue> partitionValues = internalDataFile.partitionValues(); + List<PartitionValue> partitionValues = internalDataFile.getPartitionValues(); assertEquals(1, partitionValues.size()); PartitionValue partitionEntry = partitionValues.iterator().next(); assertEquals( "cs_sold_date_sk", partitionEntry.getPartitionField().getSourceField().getName()); - assertEquals(7, internalDataFile.columnStats().size()); + assertEquals(7, internalDataFile.getColumnStats().size()); } } @@ -316,7 +316,7 @@ class TestIcebergConversionSource { assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size()); assertTrue( tableChange.getFilesDiff().dataFilesAdded().stream() - .allMatch(file -> file.columnStats().size() == numberOfColumns)); + .allMatch(file -> file.getColumnStats().size() == numberOfColumns)); assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size()); } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index d0964bcd..bd36dde9 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -737,13 +737,14 @@ public class TestIcebergSync { assertEquals(expectedFiles.size(), combinedScanTask.files().size()); Map<String, InternalDataFile> pathToFile = expectedFiles.stream() - .collect(Collectors.toMap(InternalDataFile::physicalPath, Function.identity())); + .collect( + Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); for (FileScanTask fileScanTask : combinedScanTask.files()) { // check that path and other stats match InternalDataFile expected = pathToFile.get(fileScanTask.file().path()); assertNotNull(expected); - assertEquals(expected.fileSizeBytes(), fileScanTask.file().fileSizeInBytes()); - assertEquals(expected.recordCount(), fileScanTask.file().recordCount()); + assertEquals(expected.getFileSizeBytes(), fileScanTask.file().fileSizeInBytes()); + assertEquals(expected.getRecordCount(), fileScanTask.file().recordCount()); } } } @@ -761,11 +762,11 @@ public class TestIcebergSync { } private void mockColStatsForFile(InternalDataFile dataFile, int times) { - Metrics response = new Metrics(dataFile.recordCount(), null, null, null, null); + Metrics response = new Metrics(dataFile.getRecordCount(), null, null, null, null); Metrics[] responses = IntStream.of(times - 1).mapToObj(unused -> response).toArray(Metrics[]::new); when(mockColumnStatsConverter.toIceberg( - any(Schema.class), eq(dataFile.recordCount()), eq(Collections.emptyList()))) + any(Schema.class), eq(dataFile.getRecordCount()), eq(Collections.emptyList()))) .thenReturn(response, responses); } }