This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch 652-generic-internal-file-representation
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to
refs/heads/652-generic-internal-file-representation by this push:
new bd3f38ff Address review comments
bd3f38ff is described below
commit bd3f38ff24f3bc2fba8db299352d6ea870f39b0b
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Tue Feb 18 16:51:11 2025 -0800
Address review comments
---
.../org/apache/xtable/model/storage/FileType.java | 45 ----------------------
.../org/apache/xtable/model/storage/FilesDiff.java | 6 +--
.../xtable/model/storage/InternalDataFile.java | 5 +--
.../xtable/model/storage/InternalFilesDiff.java | 16 ++++----
...ernalBaseFile.java => InternalStorageFile.java} | 17 ++------
.../xtable/model/storage/PartitionFileGroup.java | 12 +++---
.../apache/xtable/model/storage/TestFilesDiff.java | 2 +-
.../apache/xtable/delta/DeltaConversionSource.java | 4 +-
.../delta/DeltaDataFileUpdatesExtractor.java | 16 ++++----
.../xtable/delta/DeltaPartitionExtractor.java | 6 +--
.../xtable/hudi/BaseFileUpdatesExtractor.java | 18 ++++-----
.../apache/xtable/hudi/HudiFileStatsExtractor.java | 4 +-
.../xtable/iceberg/IcebergDataFileUpdatesSync.java | 18 ++++-----
.../org/apache/xtable/ValidationTestHelper.java | 10 ++---
.../xtable/delta/ITDeltaConversionSource.java | 22 +++++------
.../org/apache/xtable/delta/TestDeltaSync.java | 4 +-
.../apache/xtable/hudi/ITHudiConversionSource.java | 8 ++--
.../xtable/hudi/TestHudiFileStatsExtractor.java | 4 +-
.../iceberg/TestIcebergConversionSource.java | 14 +++----
.../org/apache/xtable/iceberg/TestIcebergSync.java | 11 +++---
20 files changed, 92 insertions(+), 150 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java
deleted file mode 100644
index 7d886181..00000000
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FileType.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.model.storage;
-
-/**
- * Known types of storage files. For e.g. data file of a table, a position
deletion, statistics
- * file.
- *
- * @since 0.1
- */
-public enum FileType {
- /** Files of type data contain the actual data records of the table. */
- DATA_FILE,
-
- /**
- * Files of type deletion contain information of soft deleted records of the
table, typically
- * containing ordinal references
- */
- DELETION_FILE,
-
- /**
- * Files of type statistics typically contain supplemental statistics
information related to a
- * table, its partitions, or data files
- */
- STATISTICS_FILE,
-
- /** Files of type index contain information related to the indexes of a
table */
- INDEX_FILE
-}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
index 05f760e6..0bb00aa2 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/FilesDiff.java
@@ -92,12 +92,12 @@ public class FilesDiff<L, P> {
* @param <P> the type of the previous files
* @return the set of files that are added
*/
- public static <P> FilesDiff<? extends InternalBaseFile, P>
findNewAndRemovedFiles(
+ public static <P> FilesDiff<InternalStorageFile, P> findNewAndRemovedFiles(
List<PartitionFileGroup> latestFileGroups, Map<String, P> previousFiles)
{
- Map<String, ? extends InternalBaseFile> latestFiles =
+ Map<String, InternalStorageFile> latestFiles =
latestFileGroups.stream()
.flatMap(group -> group.getFiles().stream())
- .collect(Collectors.toMap(InternalBaseFile::physicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalStorageFile::getPhysicalPath,
Function.identity()));
return findNewAndRemovedFiles(latestFiles, previousFiles);
}
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index effbc887..3f1a7fa5 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -26,7 +26,6 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
-import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import lombok.experimental.SuperBuilder;
@@ -42,15 +41,13 @@ import org.apache.xtable.model.stat.PartitionValue;
@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@Accessors(fluent = true)
@Getter
-public class InternalDataFile extends InternalBaseFile {
+public class InternalDataFile extends InternalStorageFile {
// file format
@Builder.Default @NonNull FileFormat fileFormat = FileFormat.APACHE_PARQUET;
// partition ranges for the data file
@Builder.Default @NonNull List<PartitionValue> partitionValues =
Collections.emptyList();
- long recordCount;
// column stats for each column in the data file
@Builder.Default @NonNull List<ColumnStat> columnStats =
Collections.emptyList();
// last modified time in millis since epoch
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
index 527d5abd..4de0c976 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalFilesDiff.java
@@ -32,7 +32,7 @@ import lombok.experimental.SuperBuilder;
@Value
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
-public class InternalFilesDiff extends FilesDiff<InternalBaseFile,
InternalBaseFile> {
+public class InternalFilesDiff extends FilesDiff<InternalStorageFile,
InternalStorageFile> {
/**
* Creates a InternalFilesDiff from the list of files in the target table
and the list of files in
@@ -46,10 +46,10 @@ public class InternalFilesDiff extends
FilesDiff<InternalBaseFile, InternalBaseF
List<InternalDataFile> source, List<InternalDataFile> target) {
Map<String, InternalDataFile> targetPaths =
target.stream()
- .collect(Collectors.toMap(InternalDataFile::physicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
Map<String, InternalDataFile> sourcePaths =
source.stream()
- .collect(Collectors.toMap(InternalDataFile::physicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
FilesDiff<InternalDataFile, InternalDataFile> diff =
findNewAndRemovedFiles(sourcePaths, targetPaths);
@@ -60,26 +60,26 @@ public class InternalFilesDiff extends
FilesDiff<InternalBaseFile, InternalBaseF
}
/**
- * Filters files of type {@link FileType#DATA_FILE} from the list of files
added to the source
- * table and returns the list.
+ * Filters files of type {@link InternalDataFile} from the list of files
added to the source table
+ * and returns the list.
*/
public Set<InternalDataFile> dataFilesAdded() {
Set<InternalDataFile> result =
getFilesAdded().stream()
- .filter(InternalBaseFile::isDataFile)
+ .filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.collect(Collectors.toSet());
return result;
}
/**
- * Filters files of type {@link FileType#DATA_FILE} from the list of files
removed to the source
+ * Filters files of type {@link InternalDataFile} from the list of files
removed to the source
* table and returns the list.
*/
public Set<InternalDataFile> dataFilesRemoved() {
Set<InternalDataFile> result =
getFilesRemoved().stream()
- .filter(InternalBaseFile::isDataFile)
+ .filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.collect(Collectors.toSet());
return result;
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java
similarity index 83%
rename from
xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java
rename to
xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java
index 67e0f6b8..04591d89 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalBaseFile.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalStorageFile.java
@@ -19,12 +19,10 @@
package org.apache.xtable.model.storage;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
-import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import lombok.experimental.SuperBuilder;
@@ -43,25 +41,16 @@ import lombok.experimental.SuperBuilder;
@ToString(callSuper = true)
@EqualsAndHashCode
@AllArgsConstructor
-@Accessors(fluent = true)
@Getter
-public abstract class InternalBaseFile {
+public abstract class InternalStorageFile {
// Absolute path of the storage file, with the scheme, that contains this
logical file. Typically,
// one physical storage file contains only one base file, for e.g. a parquet
data file. However,
// in some cases, one storage file can contain multiple logical storage
files for optimizations.
@NonNull String physicalPath;
- // The offset of the logical file in the physical storage file.
- @Builder.Default long offset = 0;
-
// The size of the logical file in the physical storage file.
long fileSizeBytes;
- // File Type represents the type of the storage file. For example, a data
file, a delete file, a
- // stat file
- @Builder.Default @NonNull FileType fileType = FileType.DATA_FILE;
-
- public boolean isDataFile() {
- return fileType.equals(FileType.DATA_FILE);
- }
+ // The number of records in the storage file.
+ long recordCount;
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
index b386cd23..cd6dc6c4 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/storage/PartitionFileGroup.java
@@ -28,12 +28,12 @@ import lombok.Value;
import org.apache.xtable.model.stat.PartitionValue;
-/** Represents a grouping of {@link InternalBaseFile} with the same partition
values. */
+/** Represents a grouping of {@link InternalStorageFile} with the same
partition values. */
@Value
@Builder
public class PartitionFileGroup {
List<PartitionValue> partitionValues;
- List<? extends InternalBaseFile> files;
+ List<? extends InternalStorageFile> files;
public static List<PartitionFileGroup> fromFiles(List<InternalDataFile>
files) {
return fromFiles(files.stream());
@@ -41,7 +41,7 @@ public class PartitionFileGroup {
public static List<PartitionFileGroup> fromFiles(Stream<InternalDataFile>
files) {
Map<List<PartitionValue>, List<InternalDataFile>> filesGrouped =
-
files.collect(Collectors.groupingBy(InternalDataFile::partitionValues));
+
files.collect(Collectors.groupingBy(InternalDataFile::getPartitionValues));
return filesGrouped.entrySet().stream()
.map(
entry ->
@@ -52,10 +52,10 @@ public class PartitionFileGroup {
.collect(Collectors.toList());
}
- /** Filters storage files of type {@link FileType#DATA_FILE} and returns
them. */
- public List<InternalDataFile> dataFiles() {
+ /** Filters storage files of type {@link InternalDataFile} and returns them.
*/
+ public List<InternalDataFile> getDataFiles() {
return files.stream()
- .filter(InternalBaseFile::isDataFile)
+ .filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.collect(Collectors.toList());
}
diff --git
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
index a93bb622..ace58cd8 100644
---
a/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
+++
b/xtable-api/src/test/java/org/apache/xtable/model/storage/TestFilesDiff.java
@@ -50,7 +50,7 @@ public class TestFilesDiff {
previousFiles.put("file2NoGroup", file2);
previousFiles.put("file2Group2", file3);
- FilesDiff<? extends InternalBaseFile, File> diff =
+ FilesDiff<InternalStorageFile, File> diff =
FilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
assertEquals(2, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file1Group2));
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index fc3f6b92..47d932f7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -127,7 +127,7 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
true,
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance());
- addedFiles.put(dataFile.physicalPath(), dataFile);
+ addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion,
(AddFile) action);
if (deleteVectorPath != null) {
@@ -141,7 +141,7 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
fileFormat,
tableAtVersion.getPartitioningFields(),
DeltaPartitionExtractor.getInstance());
- removedFiles.put(dataFile.physicalPath(), dataFile);
+ removedFiles.put(dataFile.getPhysicalPath(), dataFile);
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
index 022c5591..f33ccaef 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
@@ -40,9 +40,9 @@ import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.FilesDiff;
-import org.apache.xtable.model.storage.InternalBaseFile;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
+import org.apache.xtable.model.storage.InternalStorageFile;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.paths.PathUtils;
@@ -75,7 +75,7 @@ public class DeltaDataFileUpdatesExtractor {
file -> DeltaActionsConverter.getFullPathToFile(snapshot,
file.path()),
file -> file));
- FilesDiff<? extends InternalBaseFile, Action> diff =
+ FilesDiff<InternalStorageFile, Action> diff =
InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
return applyDiff(
@@ -93,13 +93,13 @@ public class DeltaDataFileUpdatesExtractor {
}
private Seq<Action> applyDiff(
- Set<? extends InternalBaseFile> filesAdded,
+ Set<? extends InternalStorageFile> filesAdded,
Collection<Action> removeFileActions,
InternalSchema tableSchema,
String tableBasePath) {
Stream<Action> addActions =
filesAdded.stream()
- .filter(InternalBaseFile::isDataFile)
+ .filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.flatMap(dFile -> createAddFileAction(dFile, tableSchema,
tableBasePath));
int totalActions = filesAdded.size() + removeFileActions.size();
@@ -115,12 +115,12 @@ public class DeltaDataFileUpdatesExtractor {
new AddFile(
// Delta Lake supports relative and absolute paths in theory but
relative paths seem
// more commonly supported by query engines in our testing
- PathUtils.getRelativePath(dataFile.physicalPath(), tableBasePath),
+ PathUtils.getRelativePath(dataFile.getPhysicalPath(),
tableBasePath),
convertJavaMapToScala(deltaPartitionExtractor.partitionValueSerialization(dataFile)),
- dataFile.fileSizeBytes(),
- dataFile.lastModified(),
+ dataFile.getFileSizeBytes(),
+ dataFile.getLastModified(),
true,
- getColumnStats(schema, dataFile.recordCount(),
dataFile.columnStats()),
+ getColumnStats(schema, dataFile.getRecordCount(),
dataFile.getColumnStats()),
null,
null));
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
index 57568281..3492857f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
@@ -253,11 +253,11 @@ public class DeltaPartitionExtractor {
public Map<String, String> partitionValueSerialization(InternalDataFile
internalDataFile) {
Map<String, String> partitionValuesSerialized = new HashMap<>();
- if (internalDataFile.partitionValues() == null
- || internalDataFile.partitionValues().isEmpty()) {
+ if (internalDataFile.getPartitionValues() == null
+ || internalDataFile.getPartitionValues().isEmpty()) {
return partitionValuesSerialized;
}
- for (PartitionValue partitionValue : internalDataFile.partitionValues()) {
+ for (PartitionValue partitionValue :
internalDataFile.getPartitionValues()) {
InternalPartitionField partitionField =
partitionValue.getPartitionField();
PartitionTransformType transformType = partitionField.getTransformType();
String partitionValueSerialized;
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index 76a0300c..ba1f9f9d 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -96,7 +96,7 @@ public class BaseFileUpdatesExtractor {
partitionedDataFiles.stream()
.map(
partitionFileGroup -> {
- List<InternalDataFile> dataFiles =
partitionFileGroup.dataFiles();
+ List<InternalDataFile> dataFiles =
partitionFileGroup.getDataFiles();
String partitionPath = getPartitionPath(tableBasePath,
dataFiles);
// remove the partition from the set of partitions to drop
since it is present in
// the snapshot
@@ -107,7 +107,7 @@ public class BaseFileUpdatesExtractor {
dataFiles.stream()
.collect(
Collectors.toMap(
- InternalDataFile::physicalPath,
Function.identity()));
+ InternalDataFile::getPhysicalPath,
Function.identity()));
List<HoodieBaseFile> baseFiles =
isTableInitialized
?
fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList())
@@ -174,7 +174,7 @@ public class BaseFileUpdatesExtractor {
// For all removed files, group by partition and extract the file id
Map<String, List<String>> partitionToReplacedFileIds =
internalFilesDiff.dataFilesRemoved().stream()
- .map(file -> new CachingPath(file.physicalPath()))
+ .map(file -> new CachingPath(file.getPhysicalPath()))
.collect(
Collectors.groupingBy(
path -> HudiPathUtils.getPartitionPath(tableBasePath,
path),
@@ -213,7 +213,7 @@ public class BaseFileUpdatesExtractor {
InternalDataFile file,
Optional<String> partitionPathOptional) {
WriteStatus writeStatus = new WriteStatus();
- Path path = new CachingPath(file.physicalPath());
+ Path path = new CachingPath(file.getPhysicalPath());
String partitionPath =
partitionPathOptional.orElseGet(() ->
HudiPathUtils.getPartitionPath(tableBasePath, path));
String fileId = getFileId(path);
@@ -227,10 +227,10 @@ public class BaseFileUpdatesExtractor {
writeStat.setPath(
ExternalFilePathUtil.appendCommitTimeAndExternalFileMarker(filePath,
commitTime));
writeStat.setPartitionPath(partitionPath);
- writeStat.setNumWrites(file.recordCount());
- writeStat.setTotalWriteBytes(file.fileSizeBytes());
- writeStat.setFileSizeInBytes(file.fileSizeBytes());
- writeStat.putRecordsStats(convertColStats(fileName, file.columnStats()));
+ writeStat.setNumWrites(file.getRecordCount());
+ writeStat.setTotalWriteBytes(file.getFileSizeBytes());
+ writeStat.setFileSizeInBytes(file.getFileSizeBytes());
+ writeStat.putRecordsStats(convertColStats(fileName,
file.getColumnStats()));
writeStatus.setStat(writeStat);
return writeStatus;
}
@@ -276,6 +276,6 @@ public class BaseFileUpdatesExtractor {
private String getPartitionPath(Path tableBasePath, List<InternalDataFile>
files) {
return HudiPathUtils.getPartitionPath(
- tableBasePath, new CachingPath(files.get(0).physicalPath()));
+ tableBasePath, new CachingPath(files.get(0).getPhysicalPath()));
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
index 2d6ad726..e47ef72e 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
@@ -107,7 +107,7 @@ public class HudiFileStatsExtractor {
return files.map(
file -> {
HudiFileStats fileStats =
- computeColumnStatsForFile(new Path(file.physicalPath()),
nameFieldMap);
+ computeColumnStatsForFile(new Path(file.getPhysicalPath()),
nameFieldMap);
return file.toBuilder()
.columnStats(fileStats.getColumnStats())
.recordCount(fileStats.getRowCount())
@@ -128,7 +128,7 @@ public class HudiFileStatsExtractor {
Map<Pair<String, String>, InternalDataFile> filePathsToDataFile =
files.collect(
Collectors.toMap(
- file -> getPartitionAndFileName(file.physicalPath()),
Function.identity()));
+ file -> getPartitionAndFileName(file.getPhysicalPath()),
Function.identity()));
if (filePathsToDataFile.isEmpty()) {
return Stream.empty();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 5c4cf2ba..dea5caa0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -31,9 +31,9 @@ import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.storage.FilesDiff;
-import org.apache.xtable.model.storage.InternalBaseFile;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
+import org.apache.xtable.model.storage.InternalStorageFile;
import org.apache.xtable.model.storage.PartitionFileGroup;
@AllArgsConstructor(staticName = "of")
@@ -58,7 +58,7 @@ public class IcebergDataFileUpdatesSync {
throw new ReadException("Failed to iterate through Iceberg data files",
e);
}
- FilesDiff<? extends InternalBaseFile, DataFile> diff =
+ FilesDiff<InternalStorageFile, DataFile> diff =
InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(),
schema, partitionSpec);
@@ -80,13 +80,13 @@ public class IcebergDataFileUpdatesSync {
private void applyDiff(
Transaction transaction,
- Collection<? extends InternalBaseFile> filesAdded,
+ Collection<? extends InternalStorageFile> filesAdded,
Collection<DataFile> filesRemoved,
Schema schema,
PartitionSpec partitionSpec) {
OverwriteFiles overwriteFiles = transaction.newOverwrite();
filesAdded.stream()
- .filter(InternalBaseFile::isDataFile)
+ .filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec,
schema, f)));
filesRemoved.forEach(overwriteFiles::deleteFile);
@@ -97,15 +97,15 @@ public class IcebergDataFileUpdatesSync {
PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) {
DataFiles.Builder builder =
DataFiles.builder(partitionSpec)
- .withPath(dataFile.physicalPath())
- .withFileSizeInBytes(dataFile.fileSizeBytes())
+ .withPath(dataFile.getPhysicalPath())
+ .withFileSizeInBytes(dataFile.getFileSizeBytes())
.withMetrics(
columnStatsConverter.toIceberg(
- schema, dataFile.recordCount(), dataFile.columnStats()))
- .withFormat(convertFileFormat(dataFile.fileFormat()));
+ schema, dataFile.getRecordCount(),
dataFile.getColumnStats()))
+ .withFormat(convertFileFormat(dataFile.getFileFormat()));
if (partitionSpec.isPartitioned()) {
builder.withPartition(
- partitionValueConverter.toIceberg(partitionSpec, schema,
dataFile.partitionValues()));
+ partitionValueConverter.toIceberg(partitionSpec, schema,
dataFile.getPartitionValues()));
}
return builder.build();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index 9ea3baf4..9e95f279 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -39,8 +39,8 @@ public class ValidationTestHelper {
assertNotNull(internalSnapshot.getTable());
List<String> filePaths =
internalSnapshot.getPartitionedDataFiles().stream()
- .flatMap(group -> group.dataFiles().stream())
- .map(InternalDataFile::physicalPath)
+ .flatMap(group -> group.getDataFiles().stream())
+ .map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
replaceFileScheme(allActivePaths);
replaceFileScheme(filePaths);
@@ -90,13 +90,13 @@ public class ValidationTestHelper {
public static List<String> getAllFilePaths(InternalSnapshot
internalSnapshot) {
return internalSnapshot.getPartitionedDataFiles().stream()
- .flatMap(fileGroup -> fileGroup.dataFiles().stream())
- .map(InternalDataFile::physicalPath)
+ .flatMap(fileGroup -> fileGroup.getDataFiles().stream())
+ .map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
}
private static Set<String> extractPathsFromDataFile(Set<InternalDataFile>
dataFiles) {
- return
dataFiles.stream().map(InternalDataFile::physicalPath).collect(Collectors.toSet());
+ return
dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
}
private static void replaceFileScheme(List<String> filePaths) {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
index 4c854ba3..4afb6bdb 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionSource.java
@@ -704,7 +704,7 @@ public class ITDeltaConversionSource {
throws URISyntaxException {
assertEquals(
expectedPartitionFiles.getPartitionValues(),
actualPartitionFiles.getPartitionValues());
- validateDataFiles(expectedPartitionFiles.dataFiles(),
actualPartitionFiles.dataFiles());
+ validateDataFiles(expectedPartitionFiles.getDataFiles(),
actualPartitionFiles.getDataFiles());
}
private void validateDataFiles(
@@ -721,25 +721,25 @@ public class ITDeltaConversionSource {
private void validatePropertiesDataFile(InternalDataFile expected,
InternalDataFile actual)
throws URISyntaxException {
Assertions.assertTrue(
- Paths.get(new URI(actual.physicalPath()).getPath()).isAbsolute(),
- () -> "path == " + actual.physicalPath() + " is not absolute");
- Assertions.assertEquals(expected.fileFormat(), actual.fileFormat());
- Assertions.assertEquals(expected.partitionValues(),
actual.partitionValues());
- Assertions.assertEquals(expected.fileSizeBytes(), actual.fileSizeBytes());
- Assertions.assertEquals(expected.recordCount(), actual.recordCount());
+ Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(),
+ () -> "path == " + actual.getPhysicalPath() + " is not absolute");
+ Assertions.assertEquals(expected.getFileFormat(), actual.getFileFormat());
+ Assertions.assertEquals(expected.getPartitionValues(),
actual.getPartitionValues());
+ Assertions.assertEquals(expected.getFileSizeBytes(),
actual.getFileSizeBytes());
+ Assertions.assertEquals(expected.getRecordCount(),
actual.getRecordCount());
Instant now = Instant.now();
long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli();
long maxRange = now.toEpochMilli();
Assertions.assertTrue(
- actual.lastModified() > minRange && actual.lastModified() <= maxRange,
+ actual.getLastModified() > minRange && actual.getLastModified() <=
maxRange,
() ->
"last modified == "
- + actual.lastModified()
+ + actual.getLastModified()
+ " is expected between "
+ minRange
+ " and "
+ maxRange);
- Assertions.assertEquals(expected.columnStats(), actual.columnStats());
+ Assertions.assertEquals(expected.getColumnStats(),
actual.getColumnStats());
}
private static Stream<Arguments> testWithPartitionToggle() {
@@ -749,7 +749,7 @@ public class ITDeltaConversionSource {
private boolean checkIfFileIsRemoved(String activePath, TableChange
tableChange) {
Set<String> filePathsRemoved =
tableChange.getFilesDiff().getFilesRemoved().stream()
- .map(oneDf -> oneDf.physicalPath())
+ .map(oneDf -> oneDf.getPhysicalPath())
.collect(Collectors.toSet());
return filePathsRemoved.contains(activePath);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 61da728c..f0f889d2 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -391,7 +391,7 @@ public class TestDeltaSync {
}
Map<String, InternalDataFile> pathToFile =
internalDataFiles.stream()
- .collect(Collectors.toMap(InternalDataFile::physicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
int count = 0;
try (CloseableIterator<AddFile> fileItr = deltaScan.getFiles()) {
for (CloseableIterator<AddFile> it = fileItr; it.hasNext(); ) {
@@ -400,7 +400,7 @@ public class TestDeltaSync {
new
org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString();
InternalDataFile expected = pathToFile.get(fullPath);
assertNotNull(expected);
- assertEquals(addFile.getSize(), expected.fileSizeBytes());
+ assertEquals(addFile.getSize(), expected.getFileSizeBytes());
count++;
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
index 7efc20ef..376cceda 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
@@ -705,10 +705,10 @@ public class ITHudiConversionSource {
tableChange.getFilesDiff().getFilesAdded().stream()
.collect(
Collectors.groupingBy(
- oneDf ->
getFileGroupInfo(oneDf.physicalPath()).getFileId(),
+ oneDf ->
getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(),
Collectors.collectingAndThen(
Collectors.mapping(
- oneDf ->
getFileGroupInfo(oneDf.physicalPath()).getCommitTime(),
+ oneDf ->
getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(),
Collectors.toList()),
list -> {
if (list.size() > 1) {
@@ -733,10 +733,10 @@ public class ITHudiConversionSource {
tableChange.getFilesDiff().getFilesRemoved().stream()
.collect(
Collectors.groupingBy(
- oneDf ->
getFileGroupInfo(oneDf.physicalPath()).getFileId(),
+ oneDf ->
getFileGroupInfo(oneDf.getPhysicalPath()).getFileId(),
Collectors.collectingAndThen(
Collectors.mapping(
- oneDf ->
getFileGroupInfo(oneDf.physicalPath()).getCommitTime(),
+ oneDf ->
getFileGroupInfo(oneDf.getPhysicalPath()).getCommitTime(),
Collectors.toList()),
list -> {
if (list.size() > 1) {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
index 790a45cb..a18bb743 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
@@ -202,8 +202,8 @@ public class TestHudiFileStatsExtractor {
private void validateOutput(List<InternalDataFile> output) {
assertEquals(1, output.size());
InternalDataFile fileWithStats = output.get(0);
- assertEquals(2, fileWithStats.recordCount());
- List<ColumnStat> columnStats = fileWithStats.columnStats();
+ assertEquals(2, fileWithStats.getRecordCount());
+ List<ColumnStat> columnStats = fileWithStats.getColumnStats();
assertEquals(9, columnStats.size());
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index 0cdc66c1..ab13ae2d 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -148,19 +148,19 @@ class TestIcebergConversionSource {
List<PartitionFileGroup> dataFileChunks =
internalSnapshot.getPartitionedDataFiles();
assertEquals(5, dataFileChunks.size());
for (PartitionFileGroup dataFilesChunk : dataFileChunks) {
- List<InternalDataFile> internalDataFiles = dataFilesChunk.dataFiles();
+ List<InternalDataFile> internalDataFiles = dataFilesChunk.getDataFiles();
assertEquals(1, internalDataFiles.size());
InternalDataFile internalDataFile = internalDataFiles.get(0);
- assertEquals(FileFormat.APACHE_PARQUET, internalDataFile.fileFormat());
- assertEquals(1, internalDataFile.recordCount());
- Assertions.assertTrue(internalDataFile.physicalPath().startsWith("file:"
+ workingDir));
+ assertEquals(FileFormat.APACHE_PARQUET,
internalDataFile.getFileFormat());
+ assertEquals(1, internalDataFile.getRecordCount());
+
Assertions.assertTrue(internalDataFile.getPhysicalPath().startsWith("file:" +
workingDir));
- List<PartitionValue> partitionValues =
internalDataFile.partitionValues();
+ List<PartitionValue> partitionValues =
internalDataFile.getPartitionValues();
assertEquals(1, partitionValues.size());
PartitionValue partitionEntry = partitionValues.iterator().next();
assertEquals(
"cs_sold_date_sk",
partitionEntry.getPartitionField().getSourceField().getName());
- assertEquals(7, internalDataFile.columnStats().size());
+ assertEquals(7, internalDataFile.getColumnStats().size());
}
}
@@ -316,7 +316,7 @@ class TestIcebergConversionSource {
assertEquals(addedFiles,
tableChange.getFilesDiff().getFilesAdded().size());
assertTrue(
tableChange.getFilesDiff().dataFilesAdded().stream()
- .allMatch(file -> file.columnStats().size() == numberOfColumns));
+ .allMatch(file -> file.getColumnStats().size() ==
numberOfColumns));
assertEquals(removedFiles,
tableChange.getFilesDiff().getFilesRemoved().size());
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index d0964bcd..bd36dde9 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -737,13 +737,14 @@ public class TestIcebergSync {
assertEquals(expectedFiles.size(), combinedScanTask.files().size());
Map<String, InternalDataFile> pathToFile =
expectedFiles.stream()
- .collect(Collectors.toMap(InternalDataFile::physicalPath,
Function.identity()));
+ .collect(
+ Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
for (FileScanTask fileScanTask : combinedScanTask.files()) {
// check that path and other stats match
InternalDataFile expected =
pathToFile.get(fileScanTask.file().path());
assertNotNull(expected);
- assertEquals(expected.fileSizeBytes(),
fileScanTask.file().fileSizeInBytes());
- assertEquals(expected.recordCount(),
fileScanTask.file().recordCount());
+ assertEquals(expected.getFileSizeBytes(),
fileScanTask.file().fileSizeInBytes());
+ assertEquals(expected.getRecordCount(),
fileScanTask.file().recordCount());
}
}
}
@@ -761,11 +762,11 @@ public class TestIcebergSync {
}
private void mockColStatsForFile(InternalDataFile dataFile, int times) {
- Metrics response = new Metrics(dataFile.recordCount(), null, null, null,
null);
+ Metrics response = new Metrics(dataFile.getRecordCount(), null, null,
null, null);
Metrics[] responses =
IntStream.of(times - 1).mapToObj(unused ->
response).toArray(Metrics[]::new);
when(mockColumnStatsConverter.toIceberg(
- any(Schema.class), eq(dataFile.recordCount()),
eq(Collections.emptyList())))
+ any(Schema.class), eq(dataFile.getRecordCount()),
eq(Collections.emptyList())))
.thenReturn(response, responses);
}
}