This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch 394-substitute-one-with-internal-in-onedatafile
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to
refs/heads/394-substitute-one-with-internal-in-onedatafile by this push:
new a35d3c6a Rename OneDataFile to InternalDataFile
a35d3c6a is described below
commit a35d3c6a9a7b67b68f95ff33f1c463fb55352b30
Author: Ashvin Agrawal <[email protected]>
AuthorDate: Tue Mar 26 10:19:07 2024 -0700
Rename OneDataFile to InternalDataFile
---
.../java/org/apache/xtable/model/stat/Range.java | 6 +-
.../apache/xtable/model/storage/DataFilesDiff.java | 12 ++--
.../{OneDataFile.java => InternalDataFile.java} | 2 +-
.../xtable/model/storage/OneDataFilesDiff.java | 16 ++---
.../apache/xtable/model/storage/OneFileGroup.java | 12 ++--
.../xtable/spi/extractor/DataFileIterator.java | 4 +-
.../xtable/model/storage/TestDataFilesDiff.java | 10 +--
.../xtable/model/storage/TestOneDataFilesDiff.java | 20 +++---
.../spi/extractor/TestExtractFromSource.java | 16 ++---
.../xtable/spi/sync/TestTableFormatSync.java | 6 +-
.../apache/xtable/delta/DeltaActionsConverter.java | 10 +--
.../xtable/delta/DeltaDataFileExtractor.java | 8 +--
.../delta/DeltaDataFileUpdatesExtractor.java | 8 +--
.../xtable/delta/DeltaPartitionExtractor.java | 9 +--
.../org/apache/xtable/delta/DeltaSourceClient.java | 16 ++---
.../xtable/hudi/BaseFileUpdatesExtractor.java | 13 ++--
.../apache/xtable/hudi/HudiDataFileExtractor.java | 42 ++++++-------
.../apache/xtable/hudi/HudiFileStatsExtractor.java | 18 +++---
.../xtable/iceberg/IcebergDataFileExtractor.java | 10 +--
.../xtable/iceberg/IcebergDataFileUpdatesSync.java | 9 +--
.../apache/xtable/iceberg/IcebergSourceClient.java | 18 +++---
.../org/apache/xtable/ValidationTestHelper.java | 10 +--
.../apache/xtable/delta/ITDeltaSourceClient.java | 20 +++---
.../org/apache/xtable/delta/TestDeltaSync.java | 49 +++++++--------
.../org/apache/xtable/hudi/ITHudiTargetClient.java | 20 +++---
.../xtable/hudi/TestBaseFileUpdatesExtractor.java | 36 +++++------
.../xtable/hudi/TestHudiFileStatsExtractor.java | 18 +++---
.../xtable/iceberg/TestIcebergSourceClient.java | 24 ++++----
.../org/apache/xtable/iceberg/TestIcebergSync.java | 71 +++++++++++-----------
29 files changed, 253 insertions(+), 260 deletions(-)
diff --git a/api/src/main/java/org/apache/xtable/model/stat/Range.java
b/api/src/main/java/org/apache/xtable/model/stat/Range.java
index dbba7db0..80e56424 100644
--- a/api/src/main/java/org/apache/xtable/model/stat/Range.java
+++ b/api/src/main/java/org/apache/xtable/model/stat/Range.java
@@ -25,14 +25,14 @@ import lombok.Value;
import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.OneType;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
/**
* Represents a range of values in the specified data type. Can represent a
scalar value when the
* {@link #minValue} and {@link #maxValue} are the same.
*
- * <p>For the ranges stored in {@link OneDataFile#getPartitionValues()}, the
values will be based
- * off the {@link PartitionTransformType}. {@link
PartitionTransformType#HOUR}, {@link
+ * <p>For the ranges stored in {@link InternalDataFile#getPartitionValues()},
the values will be
+ * based off the {@link PartitionTransformType}. {@link
PartitionTransformType#HOUR}, {@link
* PartitionTransformType#DAY}, {@link PartitionTransformType#MONTH}, {@link
* PartitionTransformType#YEAR} are all stored as a long representing a point
in time as
* milliseconds since epoch. {@link PartitionTransformType#VALUE} will match
the rules below.
diff --git
a/api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
b/api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
index 4a42cc3a..682136c5 100644
--- a/api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
+++ b/api/src/main/java/org/apache/xtable/model/storage/DataFilesDiff.java
@@ -32,9 +32,9 @@ import lombok.experimental.SuperBuilder;
* state may contain new files not present in the older state and may have
removed files that were
* present in the older state. In most cases the data files included in the
newer state are derived
* from a new commit in a source table format that has not been applied to a
target table format
- * yet. Hence, the collection of data files in the newer state are typically
{@link OneDataFile}s,
- * whereas the files in the older state are represented using a generic type P
which can be a data
- * file type in specific to the target table format.
+ * yet. Hence, the collection of data files in the newer state are typically
{@link
+ * InternalDataFile}s, whereas the files in the older state are represented
using a generic type P
+ * which can be a data file type in specific to the target table format.
*
* @param <L> the type of the files in the latest state
* @param <P> the type of the files in the target table format
@@ -92,12 +92,12 @@ public class DataFilesDiff<L, P> {
* @param <P> the type of the previous files
* @return the set of files that are added
*/
- public static <P> DataFilesDiff<OneDataFile, P> findNewAndRemovedFiles(
+ public static <P> DataFilesDiff<InternalDataFile, P> findNewAndRemovedFiles(
List<OneFileGroup> latestFileGroups, Map<String, P> previousFiles) {
- Map<String, OneDataFile> latestFiles =
+ Map<String, InternalDataFile> latestFiles =
latestFileGroups.stream()
.flatMap(group -> group.getFiles().stream())
- .collect(Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
return findNewAndRemovedFiles(latestFiles, previousFiles);
}
}
diff --git a/api/src/main/java/org/apache/xtable/model/storage/OneDataFile.java
b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
similarity index 98%
rename from api/src/main/java/org/apache/xtable/model/storage/OneDataFile.java
rename to
api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index b89e066b..52effbe2 100644
--- a/api/src/main/java/org/apache/xtable/model/storage/OneDataFile.java
+++ b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -36,7 +36,7 @@ import org.apache.xtable.model.stat.PartitionValue;
*/
@Builder(toBuilder = true)
@Value
-public class OneDataFile {
+public class InternalDataFile {
// written schema version
SchemaVersion schemaVersion;
// physical path of the file (absolute with scheme)
diff --git
a/api/src/main/java/org/apache/xtable/model/storage/OneDataFilesDiff.java
b/api/src/main/java/org/apache/xtable/model/storage/OneDataFilesDiff.java
index f01ba7f4..5c5476c1 100644
--- a/api/src/main/java/org/apache/xtable/model/storage/OneDataFilesDiff.java
+++ b/api/src/main/java/org/apache/xtable/model/storage/OneDataFilesDiff.java
@@ -31,7 +31,7 @@ import lombok.experimental.SuperBuilder;
@Value
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
-public class OneDataFilesDiff extends DataFilesDiff<OneDataFile, OneDataFile> {
+public class OneDataFilesDiff extends DataFilesDiff<InternalDataFile,
InternalDataFile> {
/**
* Creates a OneDataFilesDiff from the list of files in the target table and
the list of files in
@@ -41,15 +41,17 @@ public class OneDataFilesDiff extends
DataFilesDiff<OneDataFile, OneDataFile> {
* @param target list of files currently in the target table
* @return files that need to be added and removed for the target table
match the source table
*/
- public static OneDataFilesDiff from(List<OneDataFile> source,
List<OneDataFile> target) {
- Map<String, OneDataFile> targetPaths =
+ public static OneDataFilesDiff from(
+ List<InternalDataFile> source, List<InternalDataFile> target) {
+ Map<String, InternalDataFile> targetPaths =
target.stream()
- .collect(Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
- Map<String, OneDataFile> sourcePaths =
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
+ Map<String, InternalDataFile> sourcePaths =
source.stream()
- .collect(Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
- DataFilesDiff<OneDataFile, OneDataFile> diff =
findNewAndRemovedFiles(sourcePaths, targetPaths);
+ DataFilesDiff<InternalDataFile, InternalDataFile> diff =
+ findNewAndRemovedFiles(sourcePaths, targetPaths);
return OneDataFilesDiff.builder()
.filesAdded(diff.getFilesAdded())
.filesRemoved(diff.getFilesRemoved())
diff --git
a/api/src/main/java/org/apache/xtable/model/storage/OneFileGroup.java
b/api/src/main/java/org/apache/xtable/model/storage/OneFileGroup.java
index 219a97a9..32c73f25 100644
--- a/api/src/main/java/org/apache/xtable/model/storage/OneFileGroup.java
+++ b/api/src/main/java/org/apache/xtable/model/storage/OneFileGroup.java
@@ -28,20 +28,20 @@ import lombok.Value;
import org.apache.xtable.model.stat.PartitionValue;
-/** Represents a grouping of {@link OneDataFile} with the same partition
values. */
+/** Represents a grouping of {@link InternalDataFile} with the same partition
values. */
@Value
@Builder
public class OneFileGroup {
List<PartitionValue> partitionValues;
- List<OneDataFile> files;
+ List<InternalDataFile> files;
- public static List<OneFileGroup> fromFiles(List<OneDataFile> files) {
+ public static List<OneFileGroup> fromFiles(List<InternalDataFile> files) {
return fromFiles(files.stream());
}
- public static List<OneFileGroup> fromFiles(Stream<OneDataFile> files) {
- Map<List<PartitionValue>, List<OneDataFile>> filesGrouped =
- files.collect(Collectors.groupingBy(OneDataFile::getPartitionValues));
+ public static List<OneFileGroup> fromFiles(Stream<InternalDataFile> files) {
+ Map<List<PartitionValue>, List<InternalDataFile>> filesGrouped =
+
files.collect(Collectors.groupingBy(InternalDataFile::getPartitionValues));
return filesGrouped.entrySet().stream()
.map(
entry ->
diff --git
a/api/src/main/java/org/apache/xtable/spi/extractor/DataFileIterator.java
b/api/src/main/java/org/apache/xtable/spi/extractor/DataFileIterator.java
index 767650e9..d5ece6ec 100644
--- a/api/src/main/java/org/apache/xtable/spi/extractor/DataFileIterator.java
+++ b/api/src/main/java/org/apache/xtable/spi/extractor/DataFileIterator.java
@@ -20,7 +20,7 @@ package org.apache.xtable.spi.extractor;
import java.util.Iterator;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
/** DataFileIterator lets the consumer iterate over the files of a table. */
-public interface DataFileIterator extends Iterator<OneDataFile>, AutoCloseable
{}
+public interface DataFileIterator extends Iterator<InternalDataFile>,
AutoCloseable {}
diff --git
a/api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
b/api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
index 22e6ffeb..ab6c409f 100644
--- a/api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
+++ b/api/src/test/java/org/apache/xtable/model/storage/TestDataFilesDiff.java
@@ -33,10 +33,10 @@ import org.junit.jupiter.api.Test;
public class TestDataFilesDiff {
@Test
void findDiffFromFileGroups() {
- OneDataFile file1Group1 =
OneDataFile.builder().physicalPath("file1Group1").build();
- OneDataFile file2Group1 =
OneDataFile.builder().physicalPath("file2Group1").build();
- OneDataFile file1Group2 =
OneDataFile.builder().physicalPath("file1Group2").build();
- OneDataFile file2Group2 =
OneDataFile.builder().physicalPath("file2Group2").build();
+ InternalDataFile file1Group1 =
InternalDataFile.builder().physicalPath("file1Group1").build();
+ InternalDataFile file2Group1 =
InternalDataFile.builder().physicalPath("file2Group1").build();
+ InternalDataFile file1Group2 =
InternalDataFile.builder().physicalPath("file1Group2").build();
+ InternalDataFile file2Group2 =
InternalDataFile.builder().physicalPath("file2Group2").build();
List<OneFileGroup> latestFileGroups =
OneFileGroup.fromFiles(Arrays.asList(file1Group1, file2Group1,
file1Group2, file2Group2));
@@ -49,7 +49,7 @@ public class TestDataFilesDiff {
previousFiles.put("file2NoGroup", file2);
previousFiles.put("file2Group2", file3);
- DataFilesDiff<OneDataFile, File> diff =
+ DataFilesDiff<InternalDataFile, File> diff =
DataFilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
assertEquals(2, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file1Group2));
diff --git
a/api/src/test/java/org/apache/xtable/model/storage/TestOneDataFilesDiff.java
b/api/src/test/java/org/apache/xtable/model/storage/TestOneDataFilesDiff.java
index bfe38f0a..36beda82 100644
---
a/api/src/test/java/org/apache/xtable/model/storage/TestOneDataFilesDiff.java
+++
b/api/src/test/java/org/apache/xtable/model/storage/TestOneDataFilesDiff.java
@@ -27,16 +27,16 @@ import org.junit.jupiter.api.Test;
public class TestOneDataFilesDiff {
@Test
void testFrom() {
- OneDataFile sourceFile1 =
-
OneDataFile.builder().physicalPath("file://new_source_file1.parquet").build();
- OneDataFile sourceFile2 =
-
OneDataFile.builder().physicalPath("file://new_source_file2.parquet").build();
- OneDataFile targetFile1 =
-
OneDataFile.builder().physicalPath("file://already_in_target1.parquet").build();
- OneDataFile targetFile2 =
-
OneDataFile.builder().physicalPath("file://already_in_target2.parquet").build();
- OneDataFile sourceFileInTargetAlready =
-
OneDataFile.builder().physicalPath("file://already_in_target3.parquet").build();
+ InternalDataFile sourceFile1 =
+
InternalDataFile.builder().physicalPath("file://new_source_file1.parquet").build();
+ InternalDataFile sourceFile2 =
+
InternalDataFile.builder().physicalPath("file://new_source_file2.parquet").build();
+ InternalDataFile targetFile1 =
+
InternalDataFile.builder().physicalPath("file://already_in_target1.parquet").build();
+ InternalDataFile targetFile2 =
+
InternalDataFile.builder().physicalPath("file://already_in_target2.parquet").build();
+ InternalDataFile sourceFileInTargetAlready =
+
InternalDataFile.builder().physicalPath("file://already_in_target3.parquet").build();
OneDataFilesDiff actual =
OneDataFilesDiff.from(
Arrays.asList(sourceFile1, sourceFile2, sourceFileInTargetAlready),
diff --git
a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 1f97b48a..9616df2d 100644
---
a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++
b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -42,7 +42,7 @@ import org.apache.xtable.model.OneSnapshot;
import org.apache.xtable.model.OneTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
@@ -66,8 +66,8 @@ public class TestExtractFromSource {
@Test
public void extractTableChanges() {
- OneDataFile initialFile2 = getOneDataFile("file2.parquet");
- OneDataFile initialFile3 = getOneDataFile("file3.parquet");
+ InternalDataFile initialFile2 = getDataFile("file2.parquet");
+ InternalDataFile initialFile3 = getDataFile("file3.parquet");
Instant lastSyncTime = Instant.now().minus(2, ChronoUnit.DAYS);
TestCommit firstCommitToSync = TestCommit.of("first_commit");
@@ -84,7 +84,7 @@ public class TestExtractFromSource {
.thenReturn(commitsBacklogToReturn);
// drop a file and add a file
- OneDataFile newFile1 = getOneDataFile("file4.parquet");
+ InternalDataFile newFile1 = getDataFile("file4.parquet");
OneTable tableAtFirstInstant =
OneTable.builder().latestCommitTime(Instant.now().minus(1,
ChronoUnit.DAYS)).build();
TableChange tableChangeToReturnAtFirstInstant =
@@ -103,8 +103,8 @@ public class TestExtractFromSource {
.build();
// add 2 new files, remove 2 files
- OneDataFile newFile2 = getOneDataFile("file5.parquet");
- OneDataFile newFile3 = getOneDataFile("file6.parquet");
+ InternalDataFile newFile2 = getDataFile("file5.parquet");
+ InternalDataFile newFile3 = getDataFile("file6.parquet");
OneTable tableAtSecondInstant =
OneTable.builder().latestCommitTime(Instant.now()).build();
TableChange tableChangeToReturnAtSecondInstant =
@@ -137,8 +137,8 @@ public class TestExtractFromSource {
Arrays.asList(expectedFirstTableChange, expectedSecondTableChange),
actualTableChanges);
}
- private OneDataFile getOneDataFile(String physicalPath) {
- return OneDataFile.builder().physicalPath(physicalPath).build();
+ private InternalDataFile getDataFile(String physicalPath) {
+ return InternalDataFile.builder().physicalPath(physicalPath).build();
}
@AllArgsConstructor(staticName = "of")
diff --git
a/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
b/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index e0e57cee..4fde7a99 100644
--- a/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++ b/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -48,7 +48,7 @@ import org.apache.xtable.model.schema.OneField;
import org.apache.xtable.model.schema.OnePartitionField;
import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
import org.apache.xtable.model.storage.TableFormat;
@@ -68,7 +68,7 @@ public class TestTableFormatSync {
OneFileGroup.builder()
.files(
Collections.singletonList(
-
OneDataFile.builder().physicalPath("/tmp/path/file.parquet").build()))
+
InternalDataFile.builder().physicalPath("/tmp/path/file.parquet").build()))
.build());
List<Instant> pendingCommitInstants =
Collections.singletonList(Instant.now());
OneSnapshot snapshot =
@@ -362,7 +362,7 @@ public class TestTableFormatSync {
return OneDataFilesDiff.builder()
.filesAdded(
Collections.singletonList(
- OneDataFile.builder()
+ InternalDataFile.builder()
.physicalPath(String.format("/tmp/path/file_%d.parquet",
id))
.build()))
.build();
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
b/core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index 7bd3f663..8262b40b 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -35,7 +35,7 @@ import org.apache.xtable.model.schema.OneField;
import org.apache.xtable.model.schema.OnePartitionField;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaActionsConverter {
@@ -46,7 +46,7 @@ public class DeltaActionsConverter {
return INSTANCE;
}
- public OneDataFile convertAddActionToOneDataFile(
+ public InternalDataFile convertAddActionToInternalDataFile(
AddFile addFile,
Snapshot deltaSnapshot,
FileFormat fileFormat,
@@ -62,7 +62,7 @@ public class DeltaActionsConverter {
long recordCount =
columnStats.stream().map(ColumnStat::getNumValues).max(Long::compareTo).orElse(0L);
// TODO(https://github.com/apache/incubator-xtable/issues/102): removed
record count.
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.physicalPath(getFullPathToFile(deltaSnapshot, addFile.path()))
.fileFormat(fileFormat)
.fileSizeBytes(addFile.size())
@@ -74,13 +74,13 @@ public class DeltaActionsConverter {
.build();
}
- public OneDataFile convertRemoveActionToOneDataFile(
+ public InternalDataFile convertRemoveActionToInternalDataFile(
RemoveFile removeFile,
Snapshot deltaSnapshot,
FileFormat fileFormat,
List<OnePartitionField> partitionFields,
DeltaPartitionExtractor partitionExtractor) {
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.physicalPath(getFullPathToFile(deltaSnapshot, removeFile.path()))
.fileFormat(fileFormat)
.partitionValues(
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaDataFileExtractor.java
b/core/src/main/java/org/apache/xtable/delta/DeltaDataFileExtractor.java
index 89e22fe0..95507826 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaDataFileExtractor.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaDataFileExtractor.java
@@ -29,7 +29,7 @@ import org.apache.xtable.model.schema.OneField;
import org.apache.xtable.model.schema.OnePartitionField;
import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.spi.extractor.DataFileIterator;
/** DeltaDataFileExtractor lets the consumer iterate over partitions. */
@@ -58,7 +58,7 @@ public class DeltaDataFileExtractor {
private final FileFormat fileFormat;
private final List<OneField> fields;
private final List<OnePartitionField> partitionFields;
- private final Iterator<OneDataFile> dataFilesIterator;
+ private final Iterator<InternalDataFile> dataFilesIterator;
private DeltaDataFileIterator(Snapshot snapshot, OneSchema schema, boolean
includeColumnStats) {
this.fileFormat =
@@ -71,7 +71,7 @@ public class DeltaDataFileExtractor {
snapshot.allFiles().collectAsList().stream()
.map(
addFile ->
- actionsConverter.convertAddActionToOneDataFile(
+ actionsConverter.convertAddActionToInternalDataFile(
addFile,
snapshot,
fileFormat,
@@ -92,7 +92,7 @@ public class DeltaDataFileExtractor {
}
@Override
- public OneDataFile next() {
+ public InternalDataFile next() {
return dataFilesIterator.next();
}
}
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
b/core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
index 5ad7dd29..75903ea0 100644
---
a/core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
+++
b/core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java
@@ -40,7 +40,7 @@ import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.DataFilesDiff;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
import org.apache.xtable.paths.PathUtils;
@@ -72,7 +72,7 @@ public class DeltaDataFileUpdatesExtractor {
file -> DeltaActionsConverter.getFullPathToFile(snapshot,
file.path()),
file -> file));
- DataFilesDiff<OneDataFile, Action> diff =
+ DataFilesDiff<InternalDataFile, Action> diff =
OneDataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
return applyDiff(
@@ -90,7 +90,7 @@ public class DeltaDataFileUpdatesExtractor {
}
private Seq<Action> applyDiff(
- Set<OneDataFile> filesAdded,
+ Set<InternalDataFile> filesAdded,
Collection<Action> removeFileActions,
OneSchema tableSchema,
String tableBasePath) {
@@ -105,7 +105,7 @@ public class DeltaDataFileUpdatesExtractor {
}
private Stream<AddFile> createAddFileAction(
- OneDataFile dataFile, OneSchema schema, String tableBasePath) {
+ InternalDataFile dataFile, OneSchema schema, String tableBasePath) {
return Stream.of(
new AddFile(
// Delta Lake supports relative and absolute paths in theory but
relative paths seem
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
b/core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
index b7ba5f23..31eb0752 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
@@ -56,7 +56,7 @@ import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.schema.SchemaFieldFinder;
/**
@@ -247,12 +247,13 @@ public class DeltaPartitionExtractor {
return nameToStructFieldMap;
}
- public Map<String, String> partitionValueSerialization(OneDataFile
oneDataFile) {
+ public Map<String, String> partitionValueSerialization(InternalDataFile
internalDataFile) {
Map<String, String> partitionValuesSerialized = new HashMap<>();
- if (oneDataFile.getPartitionValues() == null ||
oneDataFile.getPartitionValues().isEmpty()) {
+ if (internalDataFile.getPartitionValues() == null
+ || internalDataFile.getPartitionValues().isEmpty()) {
return partitionValuesSerialized;
}
- for (PartitionValue partitionValue : oneDataFile.getPartitionValues()) {
+ for (PartitionValue partitionValue :
internalDataFile.getPartitionValues()) {
OnePartitionField partitionField = partitionValue.getPartitionField();
PartitionTransformType transformType = partitionField.getTransformType();
String partitionValueSerialized;
diff --git a/core/src/main/java/org/apache/xtable/delta/DeltaSourceClient.java
b/core/src/main/java/org/apache/xtable/delta/DeltaSourceClient.java
index bb344be7..7d950bb2 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaSourceClient.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaSourceClient.java
@@ -54,7 +54,7 @@ import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
import org.apache.xtable.spi.extractor.DataFileIterator;
@@ -102,7 +102,7 @@ public class DeltaSourceClient implements
SourceClient<Long> {
return OneSnapshot.builder()
.table(table)
.schemaCatalog(getSchemaCatalog(table, snapshot.version()))
- .partitionedDataFiles(getOneDataFiles(snapshot, table.getReadSchema()))
+ .partitionedDataFiles(getInternalDataFiles(snapshot,
table.getReadSchema()))
.build();
}
@@ -115,12 +115,12 @@ public class DeltaSourceClient implements
SourceClient<Long> {
FileFormat fileFormat =
actionsConverter.convertToOneTableFileFormat(
snapshotAtVersion.metadata().format().provider());
- Set<OneDataFile> addedFiles = new HashSet<>();
- Set<OneDataFile> removedFiles = new HashSet<>();
+ Set<InternalDataFile> addedFiles = new HashSet<>();
+ Set<InternalDataFile> removedFiles = new HashSet<>();
for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
addedFiles.add(
- actionsConverter.convertAddActionToOneDataFile(
+ actionsConverter.convertAddActionToInternalDataFile(
(AddFile) action,
snapshotAtVersion,
fileFormat,
@@ -131,7 +131,7 @@ public class DeltaSourceClient implements
SourceClient<Long> {
DeltaStatsExtractor.getInstance()));
} else if (action instanceof RemoveFile) {
removedFiles.add(
- actionsConverter.convertRemoveActionToOneDataFile(
+ actionsConverter.convertRemoveActionToInternalDataFile(
(RemoveFile) action,
snapshotAtVersion,
fileFormat,
@@ -188,9 +188,9 @@ public class DeltaSourceClient implements
SourceClient<Long> {
.build());
}
- private List<OneFileGroup> getOneDataFiles(Snapshot snapshot, OneSchema
schema) {
+ private List<OneFileGroup> getInternalDataFiles(Snapshot snapshot, OneSchema
schema) {
try (DataFileIterator fileIterator = dataFileExtractor.iterator(snapshot,
schema)) {
- List<OneDataFile> dataFiles = new ArrayList<>();
+ List<InternalDataFile> dataFiles = new ArrayList<>();
fileIterator.forEachRemaining(dataFiles::add);
return OneFileGroup.fromFiles(dataFiles);
} catch (Exception e) {
diff --git
a/core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
b/core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index 7d7f7558..698aaa0a 100644
--- a/core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++ b/core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -54,7 +54,7 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.model.schema.OneType;
import org.apache.xtable.model.stat.ColumnStat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
@@ -94,17 +94,18 @@ public class BaseFileUpdatesExtractor {
partitionedDataFiles.stream()
.map(
partitionFileGroup -> {
- List<OneDataFile> dataFiles = partitionFileGroup.getFiles();
+ List<InternalDataFile> dataFiles =
partitionFileGroup.getFiles();
String partitionPath = getPartitionPath(tableBasePath,
dataFiles);
// remove the partition from the set of partitions to drop
since it is present in
// the snapshot
partitionPathsToDrop.remove(partitionPath);
// create a map of file path to the data file, any entries
not in the hudi table
// will be added
- Map<String, OneDataFile> physicalPathToFile =
+ Map<String, InternalDataFile> physicalPathToFile =
dataFiles.stream()
.collect(
- Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
+ Collectors.toMap(
+ InternalDataFile::getPhysicalPath,
Function.identity()));
List<HoodieBaseFile> baseFiles =
isTableInitialized
?
fsView.getLatestBaseFiles(partitionPath).collect(Collectors.toList())
@@ -206,7 +207,7 @@ public class BaseFileUpdatesExtractor {
private WriteStatus toWriteStatus(
Path tableBasePath,
String commitTime,
- OneDataFile file,
+ InternalDataFile file,
Optional<String> partitionPathOptional) {
WriteStatus writeStatus = new WriteStatus();
Path path = new CachingPath(file.getPhysicalPath());
@@ -269,7 +270,7 @@ public class BaseFileUpdatesExtractor {
}
}
- private String getPartitionPath(Path tableBasePath, List<OneDataFile> files)
{
+ private String getPartitionPath(Path tableBasePath, List<InternalDataFile>
files) {
return HudiPathUtils.getPartitionPath(
tableBasePath, new CachingPath(files.get(0).getPhysicalPath()));
}
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index ee8cbe63..97acff98 100644
--- a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++ b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -65,7 +65,7 @@ import org.apache.xtable.model.schema.OnePartitionField;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
@@ -115,7 +115,7 @@ public class HudiDataFileExtractor implements AutoCloseable
{
tableMetadata != null
? tableMetadata.getAllPartitionPaths()
: FSUtils.getAllPartitionPaths(engineContext, metadataConfig,
basePath.toString());
- return getOneDataFilesForPartitions(allPartitionPaths, table);
+ return getInternalDataFilesForPartitions(allPartitionPaths, table);
} catch (IOException ex) {
throw new OneIOException(
"Unable to read partitions for table " +
metaClient.getTableConfig().getTableName(), ex);
@@ -132,12 +132,12 @@ public class HudiDataFileExtractor implements
AutoCloseable {
getAddedAndRemovedPartitionInfo(
visibleTimeline, instant, fsView, hoodieInstantForDiff,
table.getPartitioningFields());
- Stream<OneDataFile> filesAddedWithoutStats = allInfo.getAdded().stream();
- List<OneDataFile> filesAdded =
+ Stream<InternalDataFile> filesAddedWithoutStats =
allInfo.getAdded().stream();
+ List<InternalDataFile> filesAdded =
fileStatsExtractor
.addStatsToFiles(tableMetadata, filesAddedWithoutStats,
table.getReadSchema())
.collect(Collectors.toList());
- List<OneDataFile> filesRemoved = allInfo.getRemoved();
+ List<InternalDataFile> filesRemoved = allInfo.getRemoved();
return
OneDataFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build();
}
@@ -149,8 +149,8 @@ public class HudiDataFileExtractor implements AutoCloseable
{
HoodieInstant instantToConsider,
List<OnePartitionField> partitioningFields) {
try {
- List<OneDataFile> addedFiles = new ArrayList<>();
- List<OneDataFile> removedFiles = new ArrayList<>();
+ List<InternalDataFile> addedFiles = new ArrayList<>();
+ List<InternalDataFile> removedFiles = new ArrayList<>();
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
@@ -253,7 +253,7 @@ public class HudiDataFileExtractor implements AutoCloseable
{
}
}
- private List<OneDataFile> getRemovedFiles(
+ private List<InternalDataFile> getRemovedFiles(
String partitionPath, List<String> deletedPaths, List<OnePartitionField>
partitioningFields) {
List<PartitionValue> partitionValues =
partitionValuesExtractor.extractPartitionValues(partitioningFields,
partitionPath);
@@ -282,8 +282,8 @@ public class HudiDataFileExtractor implements AutoCloseable
{
String partitionPath,
Set<String> affectedFileIds,
List<OnePartitionField> partitioningFields) {
- List<OneDataFile> filesToAdd = new ArrayList<>(affectedFileIds.size());
- List<OneDataFile> filesToRemove = new ArrayList<>(affectedFileIds.size());
+ List<InternalDataFile> filesToAdd = new
ArrayList<>(affectedFileIds.size());
+ List<InternalDataFile> filesToRemove = new
ArrayList<>(affectedFileIds.size());
List<PartitionValue> partitionValues =
partitionValuesExtractor.extractPartitionValues(partitioningFields,
partitionPath);
Stream<HoodieFileGroup> fileGroups =
@@ -318,8 +318,8 @@ public class HudiDataFileExtractor implements AutoCloseable
{
Set<String> replacedFileIds,
Set<String> newFileIds,
List<OnePartitionField> partitioningFields) {
- List<OneDataFile> filesToAdd = new ArrayList<>(newFileIds.size());
- List<OneDataFile> filesToRemove = new ArrayList<>(replacedFileIds.size());
+ List<InternalDataFile> filesToAdd = new ArrayList<>(newFileIds.size());
+ List<InternalDataFile> filesToRemove = new
ArrayList<>(replacedFileIds.size());
List<PartitionValue> partitionValues =
partitionValuesExtractor.extractPartitionValues(partitioningFields,
partitionPath);
Stream<HoodieFileGroup> fileGroups =
@@ -341,11 +341,11 @@ public class HudiDataFileExtractor implements
AutoCloseable {
return
AddedAndRemovedFiles.builder().added(filesToAdd).removed(filesToRemove).build();
}
- private List<OneFileGroup> getOneDataFilesForPartitions(
+ private List<OneFileGroup> getInternalDataFilesForPartitions(
List<String> partitionPaths, OneTable table) {
SyncableFileSystemView fsView =
fileSystemViewManager.getFileSystemView(metaClient);
- Stream<OneDataFile> filesWithoutStats =
+ Stream<InternalDataFile> filesWithoutStats =
partitionPaths.stream()
.parallel()
.flatMap(
@@ -357,7 +357,7 @@ public class HudiDataFileExtractor implements AutoCloseable
{
.getLatestBaseFiles(partitionPath)
.map(baseFile -> buildFileWithoutStats(partitionValues,
baseFile));
});
- Stream<OneDataFile> files =
+ Stream<InternalDataFile> files =
fileStatsExtractor.addStatsToFiles(tableMetadata, filesWithoutStats,
table.getReadSchema());
return OneFileGroup.fromFiles(files);
}
@@ -378,21 +378,21 @@ public class HudiDataFileExtractor implements
AutoCloseable {
@Builder
@Value
private static class AddedAndRemovedFiles {
- List<OneDataFile> added;
- List<OneDataFile> removed;
+ List<InternalDataFile> added;
+ List<InternalDataFile> removed;
}
/**
- * Builds a {@link OneDataFile} without any statistics or rowCount value set.
+ * Builds a {@link InternalDataFile} without any statistics or rowCount
value set.
*
* @param partitionValues values extracted from the partition path
* @param hoodieBaseFile the base file from Hudi
- * @return {@link OneDataFile} without any statistics or rowCount value set.
+ * @return {@link InternalDataFile} without any statistics or rowCount value
set.
*/
- private OneDataFile buildFileWithoutStats(
+ private InternalDataFile buildFileWithoutStats(
List<PartitionValue> partitionValues, HoodieBaseFile hoodieBaseFile) {
long rowCount = 0L;
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.schemaVersion(DEFAULT_SCHEMA_VERSION)
.physicalPath(hoodieBaseFile.getPath())
.fileFormat(getFileFormat(FSUtils.getFileExtension(hoodieBaseFile.getPath())))
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
b/core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
index cdb96d04..1d6c1c7d 100644
--- a/core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
+++ b/core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
@@ -53,7 +53,7 @@ import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.OneType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
/** Responsible for Column stats extraction for Hudi. */
@AllArgsConstructor
@@ -83,8 +83,8 @@ public class HudiFileStatsExtractor {
* @param schema the schema of the files (assumed to be the same for all
files in stream)
* @return a stream of files with column stats and row count information
*/
- public Stream<OneDataFile> addStatsToFiles(
- HoodieTableMetadata metadataTable, Stream<OneDataFile> files, OneSchema
schema) {
+ public Stream<InternalDataFile> addStatsToFiles(
+ HoodieTableMetadata metadataTable, Stream<InternalDataFile> files,
OneSchema schema) {
boolean useMetadataTableColStats =
metadataTable != null
&& metaClient
@@ -101,8 +101,8 @@ public class HudiFileStatsExtractor {
: computeColumnStatsFromParquetFooters(files, nameFieldMap);
}
- private Stream<OneDataFile> computeColumnStatsFromParquetFooters(
- Stream<OneDataFile> files, Map<String, OneField> nameFieldMap) {
+ private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
+ Stream<InternalDataFile> files, Map<String, OneField> nameFieldMap) {
return files.map(
file -> {
HudiFileStats fileStats =
@@ -120,11 +120,11 @@ public class HudiFileStatsExtractor {
return Pair.of(partitionPath, filePath.getName());
}
- private Stream<OneDataFile> computeColumnStatsFromMetadataTable(
+ private Stream<InternalDataFile> computeColumnStatsFromMetadataTable(
HoodieTableMetadata metadataTable,
- Stream<OneDataFile> files,
+ Stream<InternalDataFile> files,
Map<String, OneField> nameFieldMap) {
- Map<Pair<String, String>, OneDataFile> filePathsToDataFile =
+ Map<Pair<String, String>, InternalDataFile> filePathsToDataFile =
files.collect(
Collectors.toMap(
file -> getPartitionAndFileName(file.getPhysicalPath()),
Function.identity()));
@@ -154,7 +154,7 @@ public class HudiFileStatsExtractor {
.map(
pathToDataFile -> {
Pair<String, String> filePath = pathToDataFile.getKey();
- OneDataFile file = pathToDataFile.getValue();
+ InternalDataFile file = pathToDataFile.getValue();
List<Pair<OneField, HoodieMetadataColumnStats>> fileStats =
stats.getOrDefault(filePath, Collections.emptyList());
List<ColumnStat> columnStats =
diff --git
a/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java
b/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java
index f68da230..c6d0d3ad 100644
--- a/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java
+++ b/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java
@@ -30,7 +30,7 @@ import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
/** Extractor of data files for Iceberg */
@Builder
@@ -41,19 +41,19 @@ public class IcebergDataFileExtractor {
IcebergPartitionValueConverter.getInstance();
/**
- * Builds {@link OneDataFile} representation from Iceberg {@link DataFile}.
+ * Builds {@link InternalDataFile} representation from Iceberg {@link
DataFile}.
*
* @param dataFile Iceberg data file
* @param partitionValues representation of partition fields and ranges
* @param schema current schema for the table, used for mapping field IDs to
stats
* @return corresponding OneTable data file
*/
- OneDataFile fromIceberg(
+ InternalDataFile fromIceberg(
DataFile dataFile, List<PartitionValue> partitionValues, OneSchema
schema) {
return fromIceberg(dataFile, partitionValues, schema, true);
}
- private OneDataFile fromIceberg(
+ private InternalDataFile fromIceberg(
DataFile dataFile,
List<PartitionValue> partitionValues,
OneSchema schema,
@@ -74,7 +74,7 @@ public class IcebergDataFileExtractor {
if (!filePath.contains(":")) {
filePath = "file:" + filePath;
}
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.physicalPath(filePath)
.fileFormat(fromIcebergFileFormat(dataFile.format()))
.fileSizeBytes(dataFile.fileSizeInBytes())
diff --git
a/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
b/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index f0cff732..4714a8d8 100644
---
a/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++
b/core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -31,7 +31,7 @@ import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.model.OneTable;
import org.apache.xtable.model.storage.DataFilesDiff;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
@@ -57,7 +57,7 @@ public class IcebergDataFileUpdatesSync {
throw new OneIOException("Failed to iterate through Iceberg data files",
e);
}
- DataFilesDiff<OneDataFile, DataFile> diff =
+ DataFilesDiff<InternalDataFile, DataFile> diff =
OneDataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(),
schema, partitionSpec);
@@ -79,7 +79,7 @@ public class IcebergDataFileUpdatesSync {
private void applyDiff(
Transaction transaction,
- Collection<OneDataFile> filesAdded,
+ Collection<InternalDataFile> filesAdded,
Collection<DataFile> filesRemoved,
Schema schema,
PartitionSpec partitionSpec) {
@@ -89,7 +89,8 @@ public class IcebergDataFileUpdatesSync {
overwriteFiles.commit();
}
- private DataFile getDataFile(PartitionSpec partitionSpec, Schema schema,
OneDataFile dataFile) {
+ private DataFile getDataFile(
+ PartitionSpec partitionSpec, Schema schema, InternalDataFile dataFile) {
DataFiles.Builder builder =
DataFiles.builder(partitionSpec)
.withPath(dataFile.getPhysicalPath())
diff --git
a/core/src/main/java/org/apache/xtable/iceberg/IcebergSourceClient.java
b/core/src/main/java/org/apache/xtable/iceberg/IcebergSourceClient.java
index a7e49584..6f411305 100644
--- a/core/src/main/java/org/apache/xtable/iceberg/IcebergSourceClient.java
+++ b/core/src/main/java/org/apache/xtable/iceberg/IcebergSourceClient.java
@@ -48,11 +48,8 @@ import org.apache.xtable.model.schema.OneSchema;
import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneDataFilesDiff;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.spi.extractor.SourceClient;
@Log4j2
@@ -145,10 +142,10 @@ public class IcebergSourceClient implements
SourceClient<Snapshot> {
PartitionSpec partitionSpec = iceTable.spec();
List<OneFileGroup> partitionedDataFiles;
try (CloseableIterable<FileScanTask> files = scan.planFiles()) {
- List<OneDataFile> irFiles = new ArrayList<>();
+ List<InternalDataFile> irFiles = new ArrayList<>();
for (FileScanTask fileScanTask : files) {
DataFile file = fileScanTask.file();
- OneDataFile irDataFile = fromIceberg(file, partitionSpec, irTable);
+ InternalDataFile irDataFile = fromIceberg(file, partitionSpec,
irTable);
irFiles.add(irDataFile);
}
partitionedDataFiles = OneFileGroup.fromFiles(irFiles);
@@ -164,7 +161,8 @@ public class IcebergSourceClient implements
SourceClient<Snapshot> {
.build();
}
- private OneDataFile fromIceberg(DataFile file, PartitionSpec partitionSpec,
OneTable oneTable) {
+ private InternalDataFile fromIceberg(
+ DataFile file, PartitionSpec partitionSpec, OneTable oneTable) {
List<PartitionValue> partitionValues =
partitionConverter.toOneTable(oneTable, file.partition(),
partitionSpec);
return dataFileExtractor.fromIceberg(file, partitionValues,
oneTable.getReadSchema());
@@ -177,12 +175,12 @@ public class IcebergSourceClient implements
SourceClient<Snapshot> {
PartitionSpec partitionSpec = iceTable.spec();
OneTable irTable = getTable(snapshot);
- Set<OneDataFile> dataFilesAdded =
+ Set<InternalDataFile> dataFilesAdded =
StreamSupport.stream(snapshot.addedDataFiles(fileIO).spliterator(),
false)
.map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable))
.collect(Collectors.toSet());
- Set<OneDataFile> dataFilesRemoved =
+ Set<InternalDataFile> dataFilesRemoved =
StreamSupport.stream(snapshot.removedDataFiles(fileIO).spliterator(),
false)
.map(dataFile -> fromIceberg(dataFile, partitionSpec, irTable))
.collect(Collectors.toSet());
diff --git a/core/src/test/java/org/apache/xtable/ValidationTestHelper.java
b/core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index 49b50b39..aa65e9ca 100644
--- a/core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -29,7 +29,7 @@ import java.util.stream.IntStream;
import org.apache.xtable.model.OneSnapshot;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
public class ValidationTestHelper {
@@ -39,7 +39,7 @@ public class ValidationTestHelper {
List<String> onetablePaths =
oneSnapshot.getPartitionedDataFiles().stream()
.flatMap(group -> group.getFiles().stream())
- .map(OneDataFile::getPhysicalPath)
+ .map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
replaceFileScheme(allActivePaths);
replaceFileScheme(onetablePaths);
@@ -90,12 +90,12 @@ public class ValidationTestHelper {
public static List<String> getAllFilePaths(OneSnapshot oneSnapshot) {
return oneSnapshot.getPartitionedDataFiles().stream()
.flatMap(oneFileGroup -> oneFileGroup.getFiles().stream())
- .map(oneDataFile -> oneDataFile.getPhysicalPath())
+ .map(InternalDataFile::getPhysicalPath)
.collect(Collectors.toList());
}
- private static Set<String> extractPathsFromDataFile(Set<OneDataFile>
dataFiles) {
- return
dataFiles.stream().map(OneDataFile::getPhysicalPath).collect(Collectors.toSet());
+ private static Set<String> extractPathsFromDataFile(Set<InternalDataFile>
dataFiles) {
+ return
dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
}
private static void replaceFileScheme(List<String> filePaths) {
diff --git
a/core/src/test/java/org/apache/xtable/delta/ITDeltaSourceClient.java
b/core/src/test/java/org/apache/xtable/delta/ITDeltaSourceClient.java
index 72d46378..03cf602b 100644
--- a/core/src/test/java/org/apache/xtable/delta/ITDeltaSourceClient.java
+++ b/core/src/test/java/org/apache/xtable/delta/ITDeltaSourceClient.java
@@ -70,11 +70,8 @@ import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.InternalDataFile;
public class ITDeltaSourceClient {
@@ -193,7 +190,7 @@ public class ITDeltaSourceClient {
OneFileGroup.builder()
.files(
Collections.singletonList(
- OneDataFile.builder()
+ InternalDataFile.builder()
.physicalPath("file:/fake/path")
.fileFormat(FileFormat.APACHE_PARQUET)
.partitionValues(Collections.emptyList())
@@ -277,7 +274,7 @@ public class ITDeltaSourceClient {
.partitionValues(partitionValue)
.files(
Collections.singletonList(
- OneDataFile.builder()
+ InternalDataFile.builder()
.physicalPath("file:/fake/path")
.fileFormat(FileFormat.APACHE_PARQUET)
.partitionValues(partitionValue)
@@ -679,17 +676,18 @@ public class ITDeltaSourceClient {
validateDataFiles(expectedPartitionFiles.getFiles(),
actualPartitionFiles.getFiles());
}
- private void validateDataFiles(List<OneDataFile> expectedFiles,
List<OneDataFile> actualFiles)
+ private void validateDataFiles(
+ List<InternalDataFile> expectedFiles, List<InternalDataFile> actualFiles)
throws URISyntaxException {
Assertions.assertEquals(expectedFiles.size(), actualFiles.size());
for (int i = 0; i < expectedFiles.size(); i++) {
- OneDataFile expected = expectedFiles.get(i);
- OneDataFile actual = actualFiles.get(i);
+ InternalDataFile expected = expectedFiles.get(i);
+ InternalDataFile actual = actualFiles.get(i);
validatePropertiesDataFile(expected, actual);
}
}
- private void validatePropertiesDataFile(OneDataFile expected, OneDataFile
actual)
+ private void validatePropertiesDataFile(InternalDataFile expected,
InternalDataFile actual)
throws URISyntaxException {
Assertions.assertEquals(expected.getSchemaVersion(),
actual.getSchemaVersion());
Assertions.assertTrue(
diff --git a/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
b/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 807e6d2d..bbeb31d4 100644
--- a/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -86,11 +86,8 @@ import org.apache.xtable.model.schema.OneType;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.schema.SchemaFieldFinder;
import org.apache.xtable.spi.sync.TableFormatSync;
@@ -149,9 +146,9 @@ public class TestDeltaSync {
OneTable table1 = getOneTable(tableName, basePath, schema1, null,
LAST_COMMIT_TIME);
OneTable table2 = getOneTable(tableName, basePath, schema2, null,
LAST_COMMIT_TIME);
- OneDataFile dataFile1 = getOneDataFile(1, Collections.emptyList(),
basePath);
- OneDataFile dataFile2 = getOneDataFile(2, Collections.emptyList(),
basePath);
- OneDataFile dataFile3 = getOneDataFile(3, Collections.emptyList(),
basePath);
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(),
basePath);
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(),
basePath);
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(),
basePath);
OneSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
OneSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
@@ -195,9 +192,9 @@ public class TestDeltaSync {
.partitionField(onePartitionField)
.range(Range.scalar("warning"))
.build());
- OneDataFile dataFile1 = getOneDataFile(1, partitionValues1, basePath);
- OneDataFile dataFile2 = getOneDataFile(2, partitionValues1, basePath);
- OneDataFile dataFile3 = getOneDataFile(3, partitionValues2, basePath);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);
EqualTo equalToExpr =
new EqualTo(new Column("string_field", new StringType()),
Literal.of("warning"));
@@ -268,9 +265,9 @@ public class TestDeltaSync {
.range(Range.scalar(20))
.build());
- OneDataFile dataFile1 = getOneDataFile(1, partitionValues1, basePath);
- OneDataFile dataFile2 = getOneDataFile(2, partitionValues2, basePath);
- OneDataFile dataFile3 = getOneDataFile(3, partitionValues3, basePath);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues2, basePath);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues3, basePath);
EqualTo equalToExpr1 =
new EqualTo(new Column("string_field", new StringType()),
Literal.of("level"));
@@ -315,9 +312,9 @@ public class TestDeltaSync {
.partitionField(partitionField)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- OneDataFile dataFile1 = getOneDataFile(1, partitionValues1, basePath);
- OneDataFile dataFile2 = getOneDataFile(2, partitionValues1, basePath);
- OneDataFile dataFile3 = getOneDataFile(3, partitionValues2, basePath);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);
OneSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
@@ -357,7 +354,7 @@ public class TestDeltaSync {
}
private void validateDeltaTable(
- Path basePath, Set<OneDataFile> oneDataFiles, Expression
filterExpression)
+ Path basePath, Set<InternalDataFile> internalDataFiles, Expression
filterExpression)
throws IOException {
DeltaLog deltaLog = DeltaLog.forTable(new Configuration(),
basePath.toString());
assertTrue(deltaLog.tableExists());
@@ -370,26 +367,26 @@ public class TestDeltaSync {
} else {
deltaScan = snapshot.scan(filterExpression);
}
- Map<String, OneDataFile> pathToFile =
- oneDataFiles.stream()
- .collect(Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
+ Map<String, InternalDataFile> pathToFile =
+ internalDataFiles.stream()
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
int count = 0;
try (CloseableIterator<AddFile> fileItr = deltaScan.getFiles()) {
for (CloseableIterator<AddFile> it = fileItr; it.hasNext(); ) {
AddFile addFile = it.next();
String fullPath =
new
org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString();
- OneDataFile expected = pathToFile.get(fullPath);
+ InternalDataFile expected = pathToFile.get(fullPath);
assertNotNull(expected);
assertEquals(addFile.getSize(), expected.getFileSizeBytes());
count++;
}
}
assertEquals(
- oneDataFiles.size(), count, "Number of files from DeltaScan don't
match expectation");
+ internalDataFiles.size(), count, "Number of files from DeltaScan don't
match expectation");
}
- private OneSnapshot buildSnapshot(OneTable table, OneDataFile... dataFiles) {
+ private OneSnapshot buildSnapshot(OneTable table, InternalDataFile...
dataFiles) {
return OneSnapshot.builder()
.table(table)
.partitionedDataFiles(OneFileGroup.fromFiles(Arrays.asList(dataFiles)))
@@ -413,12 +410,12 @@ public class TestDeltaSync {
.build();
}
- private OneDataFile getOneDataFile(
+ private InternalDataFile getDataFile(
int index, List<PartitionValue> partitionValues, Path basePath) {
String physicalPath =
new org.apache.hadoop.fs.Path(basePath.toUri() + "physical" + index +
".parquet")
.toString();
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.fileFormat(FileFormat.APACHE_PARQUET)
.fileSizeBytes(RANDOM.nextInt(10000))
.physicalPath(physicalPath)
diff --git a/core/src/test/java/org/apache/xtable/hudi/ITHudiTargetClient.java
b/core/src/test/java/org/apache/xtable/hudi/ITHudiTargetClient.java
index 0cf012a2..96c6025c 100644
--- a/core/src/test/java/org/apache/xtable/hudi/ITHudiTargetClient.java
+++ b/core/src/test/java/org/apache/xtable/hudi/ITHudiTargetClient.java
@@ -81,12 +81,8 @@ import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneDataFilesDiff;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.spi.sync.TargetClient;
/**
@@ -175,8 +171,8 @@ public class ITHudiTargetClient {
Collections.emptyMap());
}
- OneDataFile fileToRemove =
- OneDataFile.builder()
+ InternalDataFile fileToRemove =
+ InternalDataFile.builder()
.fileFormat(FileFormat.APACHE_PARQUET)
.lastModified(System.currentTimeMillis())
.fileSizeBytes(100L)
@@ -371,8 +367,8 @@ public class ITHudiTargetClient {
private OneTableMetadata incrementalSync(
TargetClient targetClient,
- List<OneDataFile> filesToAdd,
- List<OneDataFile> filesToRemove,
+ List<InternalDataFile> filesToAdd,
+ List<InternalDataFile> filesToRemove,
Instant commitStart) {
OneDataFilesDiff dataFilesDiff2 =
OneDataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
@@ -527,7 +523,7 @@ public class ITHudiTargetClient {
assertEquals(-1, columnStats.getTotalUncompressedSize());
}
- private OneDataFile getTestFile(String partitionPath, String fileName) {
+ private InternalDataFile getTestFile(String partitionPath, String fileName) {
List<ColumnStat> columnStats =
Arrays.asList(
ColumnStat.builder()
@@ -551,7 +547,7 @@ public class ITHudiTargetClient {
.numValues(2)
.totalSize(5)
.build());
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.schemaVersion(SCHEMA_VERSION)
.physicalPath(String.format("file://%s/%s/%s", tableBasePath,
partitionPath, fileName))
.fileSizeBytes(FILE_SIZE)
diff --git
a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
index f183d418..a1deedca 100644
---
a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
+++
b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
@@ -61,7 +61,7 @@ import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.OneDataFilesDiff;
import org.apache.xtable.model.storage.OneFileGroup;
import org.apache.xtable.testutil.ColumnStatMapUtil;
@@ -90,32 +90,32 @@ public class TestBaseFileUpdatesExtractor {
String partitionPath1 = "partition1";
String fileName1 = "file1.parquet";
// create file with empty stats to test edge case
- OneDataFile addedFile1 =
+ InternalDataFile addedFile1 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath1,
fileName1),
Collections.emptyList());
// create file with stats
String partitionPath2 = "partition2";
String fileName2 = "file2.parquet";
- OneDataFile addedFile2 =
+ InternalDataFile addedFile2 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath2,
fileName2), getColumnStats());
// remove files 3 files from two different partitions
String fileName3 = "file3.parquet";
- OneDataFile removedFile1 =
+ InternalDataFile removedFile1 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath1,
fileName3), getColumnStats());
// create file that matches hudi format to mimic that a file create by
hudi is now being removed
// by another system
String fileIdForFile4 = "d1cf0980-445c-4c74-bdeb-b7e5d18779f5-0";
String fileName4 = fileIdForFile4 +
"_0-1116-142216_20231003013807542.parquet";
- OneDataFile removedFile2 =
+ InternalDataFile removedFile2 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath1,
fileName4),
Collections.emptyList());
String fileName5 = "file5.parquet";
- OneDataFile removedFile3 =
+ InternalDataFile removedFile3 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath2,
fileName5),
Collections.emptyList());
@@ -161,19 +161,19 @@ public class TestBaseFileUpdatesExtractor {
String partitionPath1 = "partition1";
String fileName1 = "file1.parquet";
// create file with empty stats to test edge case
- OneDataFile addedFile1 =
+ InternalDataFile addedFile1 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath1,
fileName1),
Collections.emptyList());
// create file with stats
String fileName2 = "file2.parquet";
- OneDataFile addedFile2 =
+ InternalDataFile addedFile2 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath1,
fileName2), getColumnStats());
// create file in a second partition
String partitionPath2 = "partition2";
String fileName3 = "file3.parquet";
- OneDataFile addedFile3 =
+ InternalDataFile addedFile3 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath2,
fileName3), getColumnStats());
@@ -257,17 +257,17 @@ public class TestBaseFileUpdatesExtractor {
// create a snapshot without partition1 (dropped partition), a new file
added to partition 2
// along with one of the existing files, and a new file in partition 3
String newFileName1 = "new_file_1.parquet";
- OneDataFile addedFile1 =
+ InternalDataFile addedFile1 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath2,
newFileName1),
Collections.emptyList());
String newFileName2 = "new_file_2.parquet";
- OneDataFile addedFile2 =
+ InternalDataFile addedFile2 =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath3,
newFileName2),
getColumnStats());
- // OneDataFile for one of the existing files in partition2
- OneDataFile existingFile =
+ // InternalDataFile for one of the existing files in partition2
+ InternalDataFile existingFile =
createFile(
String.format("%s/%s/%s", tableBasePath, partitionPath2,
existingFileName2),
Collections.emptyList());
@@ -350,10 +350,10 @@ public class TestBaseFileUpdatesExtractor {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.reload(setupMetaClient);
// create a snapshot with a new file added along with one of the existing
files
String newFileName1 = "new_file_1.parquet";
- OneDataFile addedFile1 =
+ InternalDataFile addedFile1 =
createFile(String.format("%s/%s", tableBasePath, newFileName1),
getColumnStats());
- // OneDataFile for one of the existing files in partition2
- OneDataFile existingFile =
+ // InternalDataFile for one of the existing files in partition2
+ InternalDataFile existingFile =
createFile(
String.format("%s/%s", tableBasePath, existingFileName2),
Collections.emptyList());
List<OneFileGroup> partitionedDataFiles =
@@ -388,8 +388,8 @@ public class TestBaseFileUpdatesExtractor {
actual.stream().map(WriteStatus::toString).collect(Collectors.toSet()));
}
- private OneDataFile createFile(String physicalPath, List<ColumnStat>
columnStats) {
- return OneDataFile.builder()
+ private InternalDataFile createFile(String physicalPath, List<ColumnStat>
columnStats) {
+ return InternalDataFile.builder()
.schemaVersion(SCHEMA_VERSION)
.physicalPath(physicalPath)
.fileSizeBytes(FILE_SIZE)
diff --git
a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
index 994b239d..bb0d08a4 100644
--- a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
+++ b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
@@ -72,7 +72,7 @@ import org.apache.xtable.model.schema.OneType;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
+import org.apache.xtable.model.storage.InternalDataFile;
public class TestHudiFileStatsExtractor {
private static final Schema AVRO_SCHEMA =
@@ -142,8 +142,8 @@ public class TestHudiFileStatsExtractor {
.filter(path -> path.toString().endsWith(".parquet"))
.findFirst()
.orElseThrow(() -> new RuntimeException("No files found"));
- OneDataFile inputFile =
- OneDataFile.builder()
+ InternalDataFile inputFile =
+ InternalDataFile.builder()
.physicalPath(parquetFile.toString())
.schemaVersion(new SchemaVersion(1, null))
.columnStats(Collections.emptyList())
@@ -155,7 +155,7 @@ public class TestHudiFileStatsExtractor {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(configuration).build();
HudiFileStatsExtractor fileStatsExtractor = new
HudiFileStatsExtractor(metaClient);
- List<OneDataFile> output =
+ List<InternalDataFile> output =
fileStatsExtractor
.addStatsToFiles(tableMetadata, Stream.of(inputFile), schema)
.collect(Collectors.toList());
@@ -179,8 +179,8 @@ public class TestHudiFileStatsExtractor {
}
}
- OneDataFile inputFile =
- OneDataFile.builder()
+ InternalDataFile inputFile =
+ InternalDataFile.builder()
.physicalPath(file.toString())
.schemaVersion(new SchemaVersion(1, null))
.columnStats(Collections.emptyList())
@@ -193,16 +193,16 @@ public class TestHudiFileStatsExtractor {
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
HudiFileStatsExtractor fileStatsExtractor = new
HudiFileStatsExtractor(mockMetaClient);
- List<OneDataFile> output =
+ List<InternalDataFile> output =
fileStatsExtractor
.addStatsToFiles(null, Stream.of(inputFile), schema)
.collect(Collectors.toList());
validateOutput(output);
}
- private void validateOutput(List<OneDataFile> output) {
+ private void validateOutput(List<InternalDataFile> output) {
assertEquals(1, output.size());
- OneDataFile fileWithStats = output.get(0);
+ InternalDataFile fileWithStats = output.get(0);
assertEquals(2, fileWithStats.getRecordCount());
List<ColumnStat> columnStats = fileWithStats.getColumnStats();
diff --git
a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSourceClient.java
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSourceClient.java
index 9d799884..9824fa4e 100644
--- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSourceClient.java
+++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSourceClient.java
@@ -59,11 +59,9 @@ import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.*;
import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
class TestIcebergSourceClient {
@@ -176,20 +174,20 @@ class TestIcebergSourceClient {
List<OneFileGroup> dataFileChunks = oneSnapshot.getPartitionedDataFiles();
assertEquals(5, dataFileChunks.size());
for (OneFileGroup dataFilesChunk : dataFileChunks) {
- List<OneDataFile> oneDataFiles = dataFilesChunk.getFiles();
- assertEquals(1, oneDataFiles.size());
- OneDataFile oneDataFile = oneDataFiles.get(0);
- assertEquals(FileFormat.APACHE_PARQUET, oneDataFile.getFileFormat());
- assertEquals(1, oneDataFile.getRecordCount());
- Assertions.assertTrue(oneDataFile.getPhysicalPath().startsWith("file:" +
workingDir));
-
- List<PartitionValue> partitionValues = oneDataFile.getPartitionValues();
+ List<InternalDataFile> internalDataFiles = dataFilesChunk.getFiles();
+ assertEquals(1, internalDataFiles.size());
+ InternalDataFile internalDataFile = internalDataFiles.get(0);
+ assertEquals(FileFormat.APACHE_PARQUET,
internalDataFile.getFileFormat());
+ assertEquals(1, internalDataFile.getRecordCount());
+
Assertions.assertTrue(internalDataFile.getPhysicalPath().startsWith("file:" +
workingDir));
+
+ List<PartitionValue> partitionValues =
internalDataFile.getPartitionValues();
assertEquals(1, partitionValues.size());
PartitionValue partitionEntry = partitionValues.iterator().next();
assertEquals(
"cs_sold_date_sk",
partitionEntry.getPartitionField().getSourceField().getName());
// TODO generate test with column stats
- assertEquals(0, oneDataFile.getColumnStats().size());
+ assertEquals(0, internalDataFile.getColumnStats().size());
}
}
diff --git a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index 3cc9dd95..a6d02d3f 100644
--- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -95,11 +95,8 @@ import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-import org.apache.xtable.model.storage.DataLayoutStrategy;
-import org.apache.xtable.model.storage.FileFormat;
-import org.apache.xtable.model.storage.OneDataFile;
-import org.apache.xtable.model.storage.OneFileGroup;
-import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.schema.SchemaFieldFinder;
import org.apache.xtable.spi.sync.TableFormatSync;
@@ -224,9 +221,9 @@ public class TestIcebergSync {
SchemaVersion schemaVersion2 = new SchemaVersion(2, "");
schemas.put(schemaVersion2, schema2);
- OneDataFile dataFile1 = getOneDataFile(schemaVersion1, 1,
Collections.emptyList());
- OneDataFile dataFile2 = getOneDataFile(schemaVersion1, 2,
Collections.emptyList());
- OneDataFile dataFile3 = getOneDataFile(schemaVersion2, 3,
Collections.emptyList());
+ InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1,
Collections.emptyList());
+ InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2,
Collections.emptyList());
+ InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3,
Collections.emptyList());
OneSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1,
dataFile2);
OneSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema)).thenReturn(icebergSchema);
@@ -320,10 +317,10 @@ public class TestIcebergSync {
SchemaVersion schemaVersion2 = new SchemaVersion(2, "");
schemas.put(schemaVersion2, schema2);
- OneDataFile dataFile1 = getOneDataFile(schemaVersion1, 1,
Collections.emptyList());
- OneDataFile dataFile2 = getOneDataFile(schemaVersion1, 2,
Collections.emptyList());
- OneDataFile dataFile3 = getOneDataFile(schemaVersion2, 3,
Collections.emptyList());
- OneDataFile dataFile4 = getOneDataFile(schemaVersion2, 4,
Collections.emptyList());
+ InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1,
Collections.emptyList());
+ InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2,
Collections.emptyList());
+ InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3,
Collections.emptyList());
+ InternalDataFile dataFile4 = getDataFile(schemaVersion2, 4,
Collections.emptyList());
OneSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1,
dataFile2);
OneSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2,
dataFile3);
OneSnapshot snapshot3 = buildSnapshot(table2, schemas, dataFile3,
dataFile4);
@@ -397,9 +394,9 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- OneDataFile dataFile1 = getOneDataFile(schemaVersion, 1, partitionValues1);
- OneDataFile dataFile2 = getOneDataFile(schemaVersion, 2, partitionValues1);
- OneDataFile dataFile3 = getOneDataFile(schemaVersion, 3, partitionValues2);
+ InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
OneSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema))
@@ -461,9 +458,9 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- OneDataFile dataFile1 = getOneDataFile(schemaVersion, 1, partitionValues1);
- OneDataFile dataFile2 = getOneDataFile(schemaVersion, 2, partitionValues1);
- OneDataFile dataFile3 = getOneDataFile(schemaVersion, 3, partitionValues2);
+ InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
OneSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema)).thenReturn(icebergSchema);
@@ -516,9 +513,9 @@ public class TestIcebergSync {
List<PartitionValue> partitionValues2 =
Collections.singletonList(
PartitionValue.builder().partitionField(partitionField).range(Range.scalar(2)).build());
- OneDataFile dataFile1 = getOneDataFile(schemaVersion, 1, partitionValues1);
- OneDataFile dataFile2 = getOneDataFile(schemaVersion, 2, partitionValues1);
- OneDataFile dataFile3 = getOneDataFile(schemaVersion, 3, partitionValues2);
+ InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
OneSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema)).thenReturn(icebergSchema);
@@ -592,9 +589,9 @@ public class TestIcebergSync {
.partitionField(partitionField2)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- OneDataFile dataFile1 = getOneDataFile(schemaVersion, 1, partitionValues1);
- OneDataFile dataFile2 = getOneDataFile(schemaVersion, 2, partitionValues2);
- OneDataFile dataFile3 = getOneDataFile(schemaVersion, 3, partitionValues3);
+ InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues2);
+ InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues3);
OneSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema)).thenReturn(icebergSchema);
@@ -659,9 +656,9 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar("value2"))
.build());
- OneDataFile dataFile1 = getOneDataFile(schemaVersion, 1, partitionValues1);
- OneDataFile dataFile2 = getOneDataFile(schemaVersion, 2, partitionValues1);
- OneDataFile dataFile3 = getOneDataFile(schemaVersion, 3, partitionValues2);
+ InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
OneSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(oneSchema)).thenReturn(icebergSchema);
@@ -687,7 +684,7 @@ public class TestIcebergSync {
}
private OneSnapshot buildSnapshot(
- OneTable table, Map<SchemaVersion, OneSchema> schemas, OneDataFile...
dataFiles) {
+ OneTable table, Map<SchemaVersion, OneSchema> schemas,
InternalDataFile... dataFiles) {
return OneSnapshot.builder()
.table(table)
.schemaCatalog(SchemaCatalog.builder().schemas(schemas).build())
@@ -695,10 +692,10 @@ public class TestIcebergSync {
.build();
}
- private OneDataFile getOneDataFile(
+ private InternalDataFile getDataFile(
SchemaVersion schemaVersion, int index, List<PartitionValue>
partitionValues) {
String physicalPath = "file:/physical" + index + ".parquet";
- return OneDataFile.builder()
+ return InternalDataFile.builder()
.fileFormat(FileFormat.APACHE_PARQUET)
.fileSizeBytes(RANDOM.nextInt(10000))
.physicalPath(physicalPath)
@@ -727,7 +724,10 @@ public class TestIcebergSync {
}
private void validateIcebergTable(
- String tableName, OneTable table, Set<OneDataFile> expectedFiles,
Expression filterExpression)
+ String tableName,
+ OneTable table,
+ Set<InternalDataFile> expectedFiles,
+ Expression filterExpression)
throws IOException {
Path warehouseLocation = Paths.get(table.getBasePath()).getParent();
try (HadoopCatalog catalog = new HadoopCatalog(CONFIGURATION,
warehouseLocation.toString())) {
@@ -741,12 +741,13 @@ public class TestIcebergSync {
assertEquals(1, Iterables.size(tasks), "1 combined scan task should be
generated");
for (CombinedScanTask combinedScanTask : tasks) {
assertEquals(expectedFiles.size(), combinedScanTask.files().size());
- Map<String, OneDataFile> pathToFile =
+ Map<String, InternalDataFile> pathToFile =
expectedFiles.stream()
- .collect(Collectors.toMap(OneDataFile::getPhysicalPath,
Function.identity()));
+ .collect(
+ Collectors.toMap(InternalDataFile::getPhysicalPath,
Function.identity()));
for (FileScanTask fileScanTask : combinedScanTask.files()) {
// check that path and other stats match
- OneDataFile expected = pathToFile.get(fileScanTask.file().path());
+ InternalDataFile expected =
pathToFile.get(fileScanTask.file().path());
assertNotNull(expected);
assertEquals(expected.getFileSizeBytes(),
fileScanTask.file().fileSizeInBytes());
assertEquals(expected.getRecordCount(),
fileScanTask.file().recordCount());
@@ -766,7 +767,7 @@ public class TestIcebergSync {
}
}
- private void mockColStatsForFile(OneDataFile dataFile, int times) {
+ private void mockColStatsForFile(InternalDataFile dataFile, int times) {
Metrics response = new Metrics(dataFile.getRecordCount(), null, null,
null, null);
Metrics[] responses =
IntStream.of(times - 1).mapToObj(unused ->
response).toArray(Metrics[]::new);