This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 95e75f9d [553] parquet source with partition extraction (#728)
95e75f9d is described below
commit 95e75f9d580e81f4611a88635735feee9023f660
Author: Selim Soufargi <[email protected]>
AuthorDate: Mon Sep 15 03:02:04 2025 +0200
[553] parquet source with partition extraction (#728)
* First attempt at parquet source with partition extraction
* refactored conversion source for reading parquetFiles once to
getCurrentSnapshot()
* file changes + refactoring of conversion source
* cleanup
* implement get commit identifier
* Hudi Source Config uses PartitionFieldSpec.java
* Hudi Source Config uses PartitionFieldSpec.java bug fix
* refactoring for PartitionValueExtractor to use Hudi based related code
* aligning ConversionSource tests to include Parquet
* implemented remaining parquetSource methods + refactoring, TODO tests
* default case as an exception for finding parquet files + javadocs for
PartitionFieldSpec + no file diffs removed files
* refactoring for specExtractor
* table changes update does not check if table exists already
* added test for conversion source
* adjusted test for Hudi after update of HudiSourceConfig
* fixed test bug for parquet and added partitionValues for statsExtractor
* fixed test bug for stats extractor by defining instance obj instead of
using static function
* now using a string for the partition format input
* conversion source const for the test using specExtractor
* adjusted conversionSourceProvider for partition init
* adjusted table creation with Spark for Parquet
* fix tests for CI to pass
* fix tests for CI to pass (reverted)
* cleanups
* parquet files collected in a stream now for memory issue
* fixing CI error on partition config in tests
* seperate test class for parquet source
* adjusting pom config for Actions
* tests hitting the source, needs to create the parquet files locally in
the test through Spark
* creating parquet files before Testing
* test bug fix
* cleanups
* TODO implement class for test
* TODO check why sync fails to all 3 formats
* sourceField bug not null OK, sync failing on stat generation from
InternalType
* highlighting sync errors related to stats and partition fields
* partition config read from the test class + cleanups
* augmenting InternalSchema with the partitionField to get SourceField
non-null
* HudiSourceConfig const param type fix
* duplicate col for partitions
* delta stats serialization from parquet data bug fixed
* removed old code and mvn spotless apply
* fixed partitioning by year in tests
* timestamp col is of type timestamp not string (todo schemaExtractor
logicalTypes for INT96)
* Delta sync OK, stat conversion issue for Iceberg, metadata issue for Hudi
* Stat conversion patch for string (ICEBERG)
* Parsing Date Values from Partition Path, ICEBERG & DELTA Sync OK
* fix for Binary Stats conversion in Hudi + upgraded Parquet Version
* set recordKey Fields for Hudi Metadata + spotless:apply
* write data in a data path for proper reading of sourceTable after sync
(Tests)
* set parquet schema for reading the dataset (Tests)
* bug fix of dataPath (Tests)
* different metadata path than the parquet file's to prevent reading
confusion related to paths (Tests)
* read parquet sparksession for tests
* read parquet sparksession for tests
* update
* reading part col from path disabled
* spotless apply and CI pass error fixed
* CI pass error fixed
* CI pass error fixed
* CI pass error fixed
* CI pass error fixed
* CI pass error fixed
* refactored parquetSource + cleanups
* fix schemaExtractor bug
* fix Parquet related partition values extraction from path
* fix for Iceberg sync tests for CI to pass
* binary stats are converted into string stats in parquet stats extractor
* binary stats converted per logical type case: STRING....
* testing dataset equivalence differently for Hudi (partition column
removed)
* set source identifier for current snapshot as the last modif time
* set spark config for CI
* reformatting + cleanups
* iceberg CI error fix
* delta CI error fix
* delta CI error fix
* refactoring for the statsExtractor
* refactoring for the statsExtractor
* refactoring for the statsExtractor
* many partitions as input for parquet
* many partitions as input for parquet
* revert change in hudi spec extractor
* fix for isNullable records
* fix for isNullable records
* cleanups
* cleanups
* revert hudi version to 0.14.0
* spotless:apply
* added test for non partitioned data + adjusted for cases of Month, Day
partitions
* resolving the merge conflict1
* resolving the merge conflict1
* resolving the merge conflict2
* fix for Iceberg for (same key) partition values merge conflict
* fix for reading the datasets in the test class + disable equivalence test
for Hudi do to difference in parttition values
* spotless:apply
* added boolean type to the testing dataset
* reformatting
* cleanups
* refactoring + reformatting
* more data types in the testing dataset + refactoring
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* refactor test to seperate partitioned and non-partitioned cases
* reformatting
* re-read partitioned data -> create new data with additional rows ->
partition again ->sync again
* fix writing second dataset bug
* fix writing second dataset bug
* fix datasets equivalence bug
* reformatting
* reformatting
* add more data than append to partitioned file then sync again test added
* add more data than append to partitioned file then sync again test added
* reformatting
* cleanups
* add test for Hudi format for non-partitioned data
* bug fix iceberg partition values
* bug fix iceberg partition values: revert Iceberg code
* parse partition values fix
* fix values parsing by lowering case the partitioning config
* fix values parsing by lowering case the partitioning config 2
* isIncrementalSafe returns false + cleanups + formatting
* address remaining feedback, remove unnecessary changes, cleanup
---------
Co-authored-by: Selim Soufargi <[email protected]~>
Co-authored-by: Timothy Brown <[email protected]>
---
pom.xml | 36 +-
.../PartitionFieldSpec.java} | 22 +-
.../apache/xtable/model/storage/TableFormat.java | 1 +
.../xtable/hudi/BaseFileUpdatesExtractor.java | 2 +-
.../ConfigurationBasedPartitionSpecExtractor.java | 26 +-
.../apache/xtable/hudi/HudiConversionSource.java | 4 +-
.../xtable/hudi/HudiConversionSourceProvider.java | 2 +-
.../apache/xtable/hudi/HudiDataFileExtractor.java | 4 +-
.../org/apache/xtable/hudi/HudiSourceConfig.java | 15 +-
...r.java => PathBasedPartitionSpecExtractor.java} | 6 +-
...java => PathBasedPartitionValuesExtractor.java} | 18 +-
.../xtable/parquet/ParquetConversionSource.java | 242 +++++++++++
.../parquet/ParquetConversionSourceProvider.java | 43 ++
.../ParquetPartitionSpecExtractor.java} | 51 ++-
.../parquet/ParquetPartitionValueExtractor.java | 138 ++++++
.../xtable/parquet/ParquetSchemaExtractor.java | 21 +-
.../ParquetSourceConfig.java} | 43 +-
.../xtable/parquet/ParquetStatsConverterUtil.java | 69 +++
.../xtable/parquet/ParquetStatsExtractor.java | 52 +--
.../apache/xtable/reflection/ReflectionUtils.java | 29 +-
.../apache/xtable/schema/SchemaFieldFinder.java | 3 +-
.../test/java/org/apache/xtable/GenericTable.java | 6 +
.../org/apache/xtable/ITConversionController.java | 7 +-
.../apache/xtable/hudi/ITHudiConversionSource.java | 4 +-
.../hudi/TestHudiPartitionValuesExtractor.java | 24 +-
.../xtable/parquet/ITParquetConversionSource.java | 462 +++++++++++++++++++++
.../xtable/parquet/TestParquetSchemaExtractor.java | 2 +-
.../xtable/parquet/TestParquetStatsExtractor.java | 9 +-
.../xtable/parquet/TestSparkParquetTable.java | 117 ++++++
29 files changed, 1282 insertions(+), 176 deletions(-)
diff --git a/pom.xml b/pom.xml
index bed4d63b..ff80b954 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
<!-- To ensure build content is re-producible -->
<project.build.outputTimestamp>2025-01-01T00:00:00Z</project.build.outputTimestamp>
<maven.compiler.target>8</maven.compiler.target>
+ <maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<avro.version>1.11.4</avro.version>
<log4j.version>2.22.0</log4j.version>
<junit.version>5.11.4</junit.version>
@@ -77,7 +78,7 @@
<maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
<mockito.version>5.15.2</mockito.version>
- <parquet.version>1.15.1</parquet.version>
+ <parquet.version>1.15.2</parquet.version>
<protobuf.version>3.25.5</protobuf.version>
<scala12.version>2.12.20</scala12.version>
<scala13.version>2.13.15</scala13.version>
@@ -90,7 +91,7 @@
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
- <google.java.format.version>1.8</google.java.format.version>
+ <google.java.format.version>1.10.0</google.java.format.version>
<delta.standalone.version>3.3.0</delta.standalone.version>
<delta.hive.version>3.0.0</delta.hive.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -657,23 +658,19 @@
</executions>
</plugin>
<plugin>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok-maven-plugin</artifactId>
- <version>${lombok-maven-plugin.version}</version>
- <configuration>
-
<sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
- <addOutputDirectory>false</addOutputDirectory>
- <outputDirectory>${delombok.output.dir}</outputDirectory>
- <encoding>UTF-8</encoding>
- </configuration>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>delombok</goal>
- </goals>
- </execution>
- </executions>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
<configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -704,6 +701,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.2</version>
<executions>
<execution>
<goals>
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
similarity index 69%
copy from
xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
copy to
xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
index bea0b477..4427fe14 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
@@ -16,19 +16,19 @@
* limitations under the License.
*/
-package org.apache.xtable.model.storage;
+package org.apache.xtable.model.schema;
+
+import lombok.Value;
/**
- * Default constants for supported Table Formats
+ * PartitionFieldSpec represents a schema for the partition format as
specified through the user
+ * configuration for the conversion
*
- * @since 0.1
+ * @since 0.3
*/
-public class TableFormat {
- public static final String HUDI = "HUDI";
- public static final String ICEBERG = "ICEBERG";
- public static final String DELTA = "DELTA";
-
- public static String[] values() {
- return new String[] {"HUDI", "ICEBERG", "DELTA"};
- }
+@Value
+public class PartitionFieldSpec {
+ String sourceFieldPath;
+ PartitionTransformType transformType;
+ String format;
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index bea0b477..9d89de6a 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,6 +27,7 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
+ public static final String PARQUET = "PARQUET";
public static String[] values() {
return new String[] {"HUDI", "ICEBERG", "DELTA"};
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index ba1f9f9d..325fc8a5 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -252,7 +252,7 @@ public class BaseFileUpdatesExtractor {
columnStat.getNumValues(),
columnStat.getTotalSize(),
-1L))
- .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName,
Function.identity()));
+ .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName,
metadata -> metadata));
}
/** Holds the information needed to create a "replace" commit in the Hudi
table. */
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
index 7bc41a10..4eda2059 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
@@ -28,6 +28,7 @@ import lombok.AllArgsConstructor;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
import org.apache.xtable.schema.SchemaFieldFinder;
/**
@@ -35,14 +36,13 @@ import org.apache.xtable.schema.SchemaFieldFinder;
* path:type:format for date types or path:type for value types.
*/
@AllArgsConstructor
-public class ConfigurationBasedPartitionSpecExtractor implements
HudiSourcePartitionSpecExtractor {
- private final HudiSourceConfig config;
+public class ConfigurationBasedPartitionSpecExtractor implements
PathBasedPartitionSpecExtractor {
+ private final List<PartitionFieldSpec> partitionFieldSpecs;
@Override
public List<InternalPartitionField> spec(InternalSchema tableSchema) {
- List<InternalPartitionField> partitionFields =
- new ArrayList<>(config.getPartitionFieldSpecs().size());
- for (HudiSourceConfig.PartitionFieldSpec fieldSpec :
config.getPartitionFieldSpecs()) {
+ List<InternalPartitionField> partitionFields = new
ArrayList<>(partitionFieldSpecs.size());
+ for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
InternalField sourceField =
SchemaFieldFinder.getInstance()
.findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
@@ -58,15 +58,13 @@ public class ConfigurationBasedPartitionSpecExtractor
implements HudiSourceParti
@Override
public Map<String, String> getPathToPartitionFieldFormat() {
Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
- config
- .getPartitionFieldSpecs()
- .forEach(
- partitionFieldSpec -> {
- if (partitionFieldSpec.getFormat() != null) {
- pathToPartitionFieldFormat.put(
- partitionFieldSpec.getSourceFieldPath(),
partitionFieldSpec.getFormat());
- }
- });
+ partitionFieldSpecs.forEach(
+ partitionFieldSpec -> {
+ if (partitionFieldSpec.getFormat() != null) {
+ pathToPartitionFieldFormat.put(
+ partitionFieldSpec.getSourceFieldPath(),
partitionFieldSpec.getFormat());
+ }
+ });
return pathToPartitionFieldFormat;
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index cb65c341..00faa97d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -59,14 +59,14 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
public HudiConversionSource(
HoodieTableMetaClient metaClient,
- HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor) {
+ PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor) {
this.metaClient = metaClient;
this.tableExtractor =
new HudiTableExtractor(new HudiSchemaExtractor(),
sourcePartitionSpecExtractor);
this.dataFileExtractor =
new HudiDataFileExtractor(
metaClient,
- new HudiPartitionValuesExtractor(
+ new PathBasedPartitionValuesExtractor(
sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()),
new HudiFileStatsExtractor(metaClient));
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index 0ddbbcb7..aad7e0a1 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -43,7 +43,7 @@ public class HudiConversionSourceProvider extends
ConversionSourceProvider<Hoodi
log.warn("Source table is Merge On Read. Only base files will be
synced");
}
- final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor =
+ final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor =
HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
.loadSourcePartitionSpecExtractor();
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index 77c0ca98..5e17b389 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -75,7 +75,7 @@ public class HudiDataFileExtractor implements AutoCloseable {
private final HoodieTableMetadata tableMetadata;
private final HoodieTableMetaClient metaClient;
private final HoodieEngineContext engineContext;
- private final HudiPartitionValuesExtractor partitionValuesExtractor;
+ private final PathBasedPartitionValuesExtractor partitionValuesExtractor;
private final HudiFileStatsExtractor fileStatsExtractor;
private final HoodieMetadataConfig metadataConfig;
private final FileSystemViewManager fileSystemViewManager;
@@ -83,7 +83,7 @@ public class HudiDataFileExtractor implements AutoCloseable {
public HudiDataFileExtractor(
HoodieTableMetaClient metaClient,
- HudiPartitionValuesExtractor hudiPartitionValuesExtractor,
+ PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor,
HudiFileStatsExtractor hudiFileStatsExtractor) {
this.engineContext = new
HoodieLocalEngineContext(metaClient.getHadoopConf());
metadataConfig =
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
index 606b9281..7732c382 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
@@ -28,6 +28,7 @@ import lombok.Value;
import com.google.common.base.Preconditions;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.reflection.ReflectionUtils;
@@ -59,14 +60,7 @@ public class HudiSourceConfig {
return new HudiSourceConfig(partitionSpecExtractorClass,
partitionFieldSpecs);
}
- @Value
- static class PartitionFieldSpec {
- String sourceFieldPath;
- PartitionTransformType transformType;
- String format;
- }
-
- private static List<PartitionFieldSpec> parsePartitionFieldSpecs(String
input) {
+ public static List<PartitionFieldSpec> parsePartitionFieldSpecs(String
input) {
if (input == null || input.isEmpty()) {
return Collections.emptyList();
}
@@ -84,9 +78,10 @@ public class HudiSourceConfig {
return partitionFields;
}
- public HudiSourcePartitionSpecExtractor loadSourcePartitionSpecExtractor() {
+ public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() {
Preconditions.checkNotNull(
partitionSpecExtractorClass, "HudiSourcePartitionSpecExtractor class
not provided");
- return ReflectionUtils.createInstanceOfClass(partitionSpecExtractorClass,
this);
+ return ReflectionUtils.createInstanceOfClass(
+ partitionSpecExtractorClass, this.getPartitionFieldSpecs());
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
similarity index 88%
rename from
xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
rename to
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
index c69a8b8d..982bb781 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
@@ -23,9 +23,9 @@ import java.util.Map;
import org.apache.xtable.spi.extractor.SourcePartitionSpecExtractor;
/**
- * Partition spec extractor interface specifically designed for Hudi to parse
partition values
- * appropriately.
+ * Partition spec extractor interface specifically designed for Hudi/Parquet
to parse partition
+ * values appropriately.
*/
-public interface HudiSourcePartitionSpecExtractor extends
SourcePartitionSpecExtractor {
+public interface PathBasedPartitionSpecExtractor extends
SourcePartitionSpecExtractor {
Map<String, String> getPathToPartitionFieldFormat();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
similarity index 93%
rename from
xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
rename to
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
index bf9f1264..09c7c824 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
@@ -22,11 +22,7 @@ import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.*;
import lombok.AllArgsConstructor;
import lombok.NonNull;
@@ -38,11 +34,11 @@ import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
-/** Extracts Partition Values for Hudi from Partition Path. */
+/** Extracts Partition Values for Hudi/Parquet from Partition Path. */
@AllArgsConstructor
-public class HudiPartitionValuesExtractor {
+public class PathBasedPartitionValuesExtractor {
private static final String HIVE_DEFAULT_PARTITION =
"__HIVE_DEFAULT_PARTITION__";
- @NonNull private final Map<String, String> pathToPartitionFieldFormat;
+ @NonNull protected final Map<String, String> pathToPartitionFieldFormat;
public List<PartitionValue> extractPartitionValues(
List<InternalPartitionField> partitionColumns, String partitionPath) {
@@ -103,7 +99,7 @@ public class HudiPartitionValuesExtractor {
}
}
- private static PartialResult parseDate(String remainingPath, String format) {
+ protected static PartialResult parseDate(String remainingPath, String
format) {
try {
String dateString = remainingPath.substring(0, format.length());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
@@ -118,7 +114,7 @@ public class HudiPartitionValuesExtractor {
}
}
- private static PartialResult parseValue(
+ protected static PartialResult parseValue(
String remainingPath, InternalType sourceFieldType, boolean
isSlashDelimited) {
if (remainingPath.isEmpty()) {
throw new PartitionValuesExtractorException("Missing partition value");
@@ -173,7 +169,7 @@ public class HudiPartitionValuesExtractor {
}
@Value
- private static class PartialResult {
+ protected static class PartialResult {
Object value;
String remainingPath;
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
new file mode 100644
index 00000000..e7eff961
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.hudi.HudiPathUtils;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ private static final ParquetStatsExtractor parquetStatsExtractor =
+ ParquetStatsExtractor.getInstance();
+
+ private final ParquetPartitionValueExtractor partitionValueExtractor;
+ private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+ private final String tableName;
+ private final String basePath;
+ @NonNull private final Configuration hadoopConf;
+
+ private InternalTable createInternalTableFromFile(LocatedFileStatus
latestFile) {
+ ParquetMetadata parquetMetadata =
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
latestFile.getPath());
+ MessageType parquetSchema =
parquetMetadataExtractor.getSchema(parquetMetadata);
+ InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema,
"");
+ List<InternalPartitionField> partitionFields =
partitionSpecExtractor.spec(schema);
+
+ DataLayoutStrategy dataLayoutStrategy =
+ partitionFields.isEmpty()
+ ? DataLayoutStrategy.FLAT
+ : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(schema)
+
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+ .build();
+ }
+
+ @Override
+ public InternalTable getTable(Long modificationTime) {
+ // get parquetFile at specific time modificationTime
+ Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+ return createInternalTableFromFile(file);
+ }
+
+ private Stream<InternalDataFile>
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+ return parquetFiles.map(
+ file ->
+ InternalDataFile.builder()
+ .physicalPath(file.getPath().toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(file.getLen())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+
partitionValueExtractor.extractSchemaForParquetPartitions(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath()),
+ file.getPath().toString())),
+ HudiPathUtils.getPartitionPath(new Path(basePath),
file.getPath())))
+ .lastModified(file.getModificationTime())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
+ .build());
+ }
+
+ private InternalDataFile createInternalDataFileFromParquetFile(FileStatus
parquetFile) {
+ return InternalDataFile.builder()
+ .physicalPath(parquetFile.getPath().toString())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+ partitionValueExtractor.extractSchemaForParquetPartitions(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, parquetFile.getPath()),
+ parquetFile.getPath().toString())),
+ basePath))
+ .lastModified(parquetFile.getModificationTime())
+ .fileSizeBytes(parquetFile.getLen())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parquetFile.getPath())))
+ .build();
+ }
+
+ @Override
+ public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync
syncInstants) {
+ List<Long> commitsToProcess =
+
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+ return
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Long modificationTime) {
+ Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+ List<FileStatus> tableChangesAfter =
+ parquetFiles
+ .filter(fileStatus -> fileStatus.getModificationTime() >
modificationTime)
+ .collect(Collectors.toList());
+ InternalTable internalTable = getMostRecentTable(parquetFiles);
+ for (FileStatus tableStatus : tableChangesAfter) {
+ InternalDataFile currentDataFile =
createInternalDataFileFromParquetFile(tableStatus);
+ addedInternalDataFiles.add(currentDataFile);
+ }
+
+ return TableChange.builder()
+ .tableAsOfChange(internalTable)
+
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+ .build();
+ }
+
+ private InternalTable getMostRecentTable(Stream<LocatedFileStatus>
parquetFiles) {
+ LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+ return createInternalTableFromFile(latestFile);
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ return getMostRecentTable(parquetFiles);
+ }
+
+ /**
+ * get current snapshot
+ *
+ * @return
+ */
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ // to avoid consume the stream call the method twice to return the same
stream of parquet files
+ Stream<InternalDataFile> internalDataFiles =
+ getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
+ InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf,
basePath));
+ return InternalSnapshot.builder()
+ .table(table)
+ .sourceIdentifier(
+ getCommitIdentifier(
+ getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath))
+ .getModificationTime()))
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+ .build();
+ }
+
+ private LocatedFileStatus getMostRecentParquetFile(Stream<LocatedFileStatus>
parquetFiles) {
+ return parquetFiles
+ .max(Comparator.comparing(FileStatus::getModificationTime))
+ .orElseThrow(() -> new IllegalStateException("No files found"));
+ }
+
+ private LocatedFileStatus getParquetFileAt(
+ Stream<LocatedFileStatus> parquetFiles, long modificationTime) {
+ return parquetFiles
+ .filter(fileStatus -> fileStatus.getModificationTime() ==
modificationTime)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No file found at " +
modificationTime));
+ }
+
+ private Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf,
String basePath) {
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ URI uriBasePath = new URI(basePath);
+ String parentPath = Paths.get(uriBasePath).toString();
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new
Path(parentPath), true);
+ return RemoteIterators.toList(iterator).stream()
+ .filter(file -> file.getPath().getName().endsWith("parquet"));
+ } catch (IOException | URISyntaxException e) {
+ throw new ReadException("Unable to read files from file system", e);
+ }
+ }
+
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ return false;
+ }
+
+ @Override
+ public String getCommitIdentifier(Long aLong) {
+ return String.valueOf(aLong);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
new file mode 100644
index 00000000..0d508823
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Parquet
file format. */
+public class ParquetConversionSourceProvider extends
ConversionSourceProvider<Long> {
+ @Override
+ public ParquetConversionSource getConversionSourceInstance(SourceTable
sourceTable) {
+ final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor =
+
ParquetSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
+ .loadSourcePartitionSpecExtractor();
+ ParquetPartitionValueExtractor partitionValueExtractor =
+ new ParquetPartitionValueExtractor(
+ sourcePartitionSpecExtractor.getPathToPartitionFieldFormat());
+ return ParquetConversionSource.builder()
+ .tableName(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
+ .hadoopConf(this.hadoopConf)
+ .partitionSpecExtractor(sourcePartitionSpecExtractor)
+ .partitionValueExtractor(partitionValueExtractor)
+ .build();
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
similarity index 56%
copy from
xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
copy to
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
index 7bc41a10..ddbdccba 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
@@ -16,18 +16,22 @@
* limitations under the License.
*/
-package org.apache.xtable.hudi;
+package org.apache.xtable.parquet;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
import org.apache.xtable.schema.SchemaFieldFinder;
/**
@@ -35,38 +39,55 @@ import org.apache.xtable.schema.SchemaFieldFinder;
* path:type:format for date types or path:type for value types.
*/
@AllArgsConstructor
-public class ConfigurationBasedPartitionSpecExtractor implements
HudiSourcePartitionSpecExtractor {
- private final HudiSourceConfig config;
+public class ParquetPartitionSpecExtractor implements
PathBasedPartitionSpecExtractor {
+ private static final ParquetPartitionSpecExtractor INSTANCE =
+ new ParquetPartitionSpecExtractor(new ArrayList<>());
+ private List<PartitionFieldSpec> partitionFieldSpecs;
+
+ public static ParquetPartitionSpecExtractor getInstance() {
+ return INSTANCE;
+ }
@Override
public List<InternalPartitionField> spec(InternalSchema tableSchema) {
- List<InternalPartitionField> partitionFields =
- new ArrayList<>(config.getPartitionFieldSpecs().size());
- for (HudiSourceConfig.PartitionFieldSpec fieldSpec :
config.getPartitionFieldSpecs()) {
+ List<InternalPartitionField> partitionFields = new
ArrayList<>(partitionFieldSpecs.size());
+ for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
InternalField sourceField =
SchemaFieldFinder.getInstance()
.findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
partitionFields.add(
InternalPartitionField.builder()
.sourceField(sourceField)
+
.partitionFieldNames(getListPartitionNamesFromFormatInput(fieldSpec.getFormat()))
.transformType(fieldSpec.getTransformType())
.build());
}
return partitionFields;
}
+ public List<String> getListPartitionNamesFromFormatInput(String inputFormat)
{
+ return Arrays.stream(inputFormat.split("/"))
+ .map(s -> s.split("=")[0])
+ .collect(Collectors.toList());
+ }
+
+ public List<String> getListPartitionValuesFromFormatInput(String
inputFormat) {
+ return Arrays.stream(inputFormat.split("/"))
+ .map(s -> s.split("=")[1])
+ .collect(Collectors.toList());
+ }
+
@Override
public Map<String, String> getPathToPartitionFieldFormat() {
Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
- config
- .getPartitionFieldSpecs()
- .forEach(
- partitionFieldSpec -> {
- if (partitionFieldSpec.getFormat() != null) {
- pathToPartitionFieldFormat.put(
- partitionFieldSpec.getSourceFieldPath(),
partitionFieldSpec.getFormat());
- }
- });
+
+ partitionFieldSpecs.forEach(
+ partitionFieldSpec -> {
+ if (partitionFieldSpec.getFormat() != null) {
+ pathToPartitionFieldFormat.put(
+ partitionFieldSpec.getSourceFieldPath(),
partitionFieldSpec.getFormat());
+ }
+ });
return pathToPartitionFieldFormat;
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
new file mode 100644
index 00000000..ab4ceaf4
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import lombok.NonNull;
+
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.PartitionValuesExtractorException;
+import org.apache.xtable.hudi.PathBasedPartitionValuesExtractor;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Partition value extractor for Parquet. */
+public class ParquetPartitionValueExtractor extends
PathBasedPartitionValuesExtractor {
+ private static final ParquetPartitionValueExtractor INSTANCE =
+ new ParquetPartitionValueExtractor(Collections.emptyMap());
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+ private static final ParquetPartitionSpecExtractor partitionSpecExtractor =
+ ParquetPartitionSpecExtractor.getInstance();
+
+ public ParquetPartitionValueExtractor(@NonNull Map<String, String>
pathToPartitionFieldFormat) {
+ super(pathToPartitionFieldFormat);
+ }
+
+ public List<PartitionValue> extractPartitionValues(
+ List<InternalPartitionField> partitionColumns, String partitionPath) {
+ String currentDateValue = "";
+ List<String> parsedDateValues = new ArrayList<>();
+ if (partitionColumns.size() == 0) {
+ return Collections.emptyList();
+ }
+ int totalNumberOfPartitions =
partitionColumns.get(0).getPartitionFieldNames().size();
+ List<PartitionValue> result = new ArrayList<>(totalNumberOfPartitions);
+ String remainingPartitionPath = partitionPath;
+ for (InternalPartitionField partitionField : partitionColumns) {
+ int index = 0;
+ for (String partitionFieldName :
partitionField.getPartitionFieldNames()) {
+ if (remainingPartitionPath.startsWith(partitionFieldName + "=")) {
+ remainingPartitionPath =
+ remainingPartitionPath.substring(partitionFieldName.length() +
1);
+ }
+ // parsePartitionPath shouldn't do two things but only gets the
remaining path
+ // to store the parsed value for the current partition field
+ currentDateValue = parsePartitionDateValue(partitionField,
remainingPartitionPath, index);
+ parsedDateValues.add(currentDateValue);
+
+ remainingPartitionPath =
+ getRemainingPath(
+ remainingPartitionPath,
+ partitionSpecExtractor
+ .getListPartitionValuesFromFormatInput(
+
pathToPartitionFieldFormat.get(partitionField.getSourceField().getName()))
+ .get(index));
+ index++;
+ }
+ try {
+ result.add(
+ PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar(computeSinceEpochValue(parsedDateValues,
partitionField)))
+ .build());
+ } catch (ParseException e) {
+ throw new PartitionValuesExtractorException("Unable to parse date
value", e);
+ }
+ }
+ return result;
+ }
+
+ protected String parsePartitionDateValue(
+ InternalPartitionField field, String remainingPath, int index) {
+ return parseDateValue(
+ remainingPath,
+ partitionSpecExtractor
+ .getListPartitionValuesFromFormatInput(
+
pathToPartitionFieldFormat.get(field.getSourceField().getName()))
+ .get(index));
+ }
+
+ private long computeSinceEpochValue(
+ List<String> parsedDateValues, InternalPartitionField partitionField)
throws ParseException {
+ // in the list of parsed values combine them using the standard way using
hyphen symbol then
+ // parse the date into sinceEpoch
+ String combinedDate = String.join("-", parsedDateValues); //
SimpleDateFormat works with hyphens
+ // convert to since Epoch
+ SimpleDateFormat simpleDateFormat =
+ new SimpleDateFormat(
+ String.join(
+ "-",
+ partitionSpecExtractor.getListPartitionValuesFromFormatInput(
+
pathToPartitionFieldFormat.get(partitionField.getSourceField().getName()))));
+ simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return simpleDateFormat.parse(combinedDate).toInstant().toEpochMilli();
+ }
+
+ protected static String parseDateValue(String remainingPath, String format) {
+ return remainingPath.substring(0, format.length());
+ }
+
+ protected static String getRemainingPath(String remainingPath, String
format) {
+ return remainingPath.substring(Math.min(remainingPath.length(),
format.length() + 1));
+ }
+
+ public static ParquetPartitionValueExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata
footer, String path) {
+ MessageType parquetSchema = parquetMetadataExtractor.getSchema(footer);
+ return schemaExtractor.toInternalSchema(parquetSchema, path);
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
index 9043ac0d..ed6cf284 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -97,6 +97,11 @@ public class ParquetSchemaExtractor {
primitiveType = schema.asPrimitiveType();
switch (primitiveType.getPrimitiveTypeName()) {
// PrimitiveTypes
+ case INT96: // TODO check logicaltypes of INT96
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.NANOS);
+ newDataType = InternalType.TIMESTAMP;
+ break;
case INT64:
logicalType = schema.getLogicalTypeAnnotation();
if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
@@ -169,6 +174,9 @@ public class ParquetSchemaExtractor {
newDataType = InternalType.INT;
}
break;
+ case DOUBLE:
+ newDataType = InternalType.DOUBLE;
+ break;
case FLOAT:
newDataType = InternalType.FLOAT;
break;
@@ -210,7 +218,6 @@ public class ParquetSchemaExtractor {
InternalSchema.MetadataKey.DECIMAL_SCALE,
((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
newDataType = InternalType.DECIMAL;
-
} else {
newDataType = InternalType.BYTES;
}
@@ -270,12 +277,10 @@ public class ParquetSchemaExtractor {
.fields(valueSchema.getFields())
.build();
} else {
-
subFields = new ArrayList<>(schema.asGroupType().getFields().size());
for (Type parquetField : schema.asGroupType().getFields()) {
String fieldName = parquetField.getName();
Type.ID fieldId = parquetField.getId();
- currentRepetition = parquetField.getRepetition();
InternalSchema subFieldSchema =
toInternalSchema(
parquetField, SchemaUtils.getFullyQualifiedPath(parentPath,
fieldName));
@@ -284,6 +289,7 @@ public class ParquetSchemaExtractor {
== 1) { // TODO Tuple (many subelements in a list)
newDataType = subFieldSchema.getDataType();
elementName = subFieldSchema.getName();
+ // subFields = subFieldSchema.getFields();
break;
}
subFields.add(
@@ -295,15 +301,18 @@ public class ParquetSchemaExtractor {
.fieldId(fieldId == null ? null : fieldId.intValue())
.build());
}
- if (currentRepetition != Repetition.REPEATED
- && schema.asGroupType().getName() != "list"
+ // RECORD Type (non-nullable elements)
+ if (!schema.asGroupType().getName().equals("list")
&& !Arrays.asList("key_value",
"map").contains(schema.asGroupType().getName())) {
+ boolean isNullable =
+ subFields.stream().anyMatch(ele -> ele.getSchema().isNullable())
+ && isNullable(schema.asGroupType());
return InternalSchema.builder()
.name(schema.getName())
.comment(null)
.dataType(InternalType.RECORD)
.fields(subFields)
- .isNullable(isNullable(schema.asGroupType()))
+ .isNullable(isNullable)
.build();
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
similarity index 64%
copy from xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
copy to
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
index 606b9281..c1ab594c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.xtable.hudi;
+package org.apache.xtable.parquet;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,45 +28,35 @@ import lombok.Value;
import com.google.common.base.Preconditions;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.reflection.ReflectionUtils;
-/** Configuration of Hudi source format for the sync process. */
+/** Configuration of Parquet source format for the sync process. */
@Value
-public class HudiSourceConfig {
+public class ParquetSourceConfig {
public static final String PARTITION_SPEC_EXTRACTOR_CLASS =
- "xtable.hudi.source.partition_spec_extractor_class";
+ "xtable.parquet.source.partition_spec_extractor_class";
public static final String PARTITION_FIELD_SPEC_CONFIG =
- "xtable.hudi.source.partition_field_spec_config";
+ "xtable.parquet.source.partition_field_spec_config";
String partitionSpecExtractorClass;
List<PartitionFieldSpec> partitionFieldSpecs;
- public static HudiSourceConfig fromPartitionFieldSpecConfig(String
partitionFieldSpecConfig) {
- return new HudiSourceConfig(
- ConfigurationBasedPartitionSpecExtractor.class.getName(),
- parsePartitionFieldSpecs(partitionFieldSpecConfig));
- }
-
- public static HudiSourceConfig fromProperties(Properties properties) {
+ public static ParquetSourceConfig fromProperties(Properties properties) {
String partitionSpecExtractorClass =
properties.getProperty(
- PARTITION_SPEC_EXTRACTOR_CLASS,
- ConfigurationBasedPartitionSpecExtractor.class.getName());
+ PARTITION_SPEC_EXTRACTOR_CLASS,
ParquetPartitionSpecExtractor.class.getName());
+
String partitionFieldSpecString =
properties.getProperty(PARTITION_FIELD_SPEC_CONFIG);
+
List<PartitionFieldSpec> partitionFieldSpecs =
parsePartitionFieldSpecs(partitionFieldSpecString);
- return new HudiSourceConfig(partitionSpecExtractorClass,
partitionFieldSpecs);
- }
-
- @Value
- static class PartitionFieldSpec {
- String sourceFieldPath;
- PartitionTransformType transformType;
- String format;
+ return new ParquetSourceConfig(partitionSpecExtractorClass,
partitionFieldSpecs);
}
- private static List<PartitionFieldSpec> parsePartitionFieldSpecs(String
input) {
+ public static List<PartitionFieldSpec> parsePartitionFieldSpecs(String
input) {
if (input == null || input.isEmpty()) {
return Collections.emptyList();
}
@@ -84,9 +74,10 @@ public class HudiSourceConfig {
return partitionFields;
}
- public HudiSourcePartitionSpecExtractor loadSourcePartitionSpecExtractor() {
+ public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() {
Preconditions.checkNotNull(
- partitionSpecExtractorClass, "HudiSourcePartitionSpecExtractor class
not provided");
- return ReflectionUtils.createInstanceOfClass(partitionSpecExtractorClass,
this);
+ this.partitionSpecExtractorClass, "PathBasedPartitionSpecExtractor
class not provided");
+ return ReflectionUtils.createInstanceOfClass(
+ this.partitionSpecExtractorClass, this.getPartitionFieldSpecs());
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
new file mode 100644
index 00000000..e5fd2d07
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ParquetStatsConverterUtil {
+ public static Object convertStatBinaryTypeToLogicalType(
+ ColumnChunkMetaData columnMetaData, boolean isMin) {
+ Object returnedObj = null;
+ PrimitiveType primitiveType = columnMetaData.getPrimitiveType();
+ switch (primitiveType.getPrimitiveTypeName()) {
+ case BINARY: // TODO check if other primitiveType' needs to be handled
as well
+ if (primitiveType.getLogicalTypeAnnotation() != null) {
+ if (columnMetaData
+ .getPrimitiveType()
+ .getLogicalTypeAnnotation()
+ .toString()
+ .equals("STRING")) {
+ returnedObj =
+ new String(
+ (isMin
+ ? (Binary)
columnMetaData.getStatistics().genericGetMin()
+ : (Binary)
columnMetaData.getStatistics().genericGetMax())
+ .getBytes(),
+ StandardCharsets.UTF_8);
+ } else {
+ returnedObj =
+ isMin
+ ? columnMetaData.getStatistics().genericGetMin()
+ : columnMetaData.getStatistics().genericGetMax();
+ }
+ } else {
+ returnedObj =
+ isMin
+ ? columnMetaData.getStatistics().genericGetMin()
+ : columnMetaData.getStatistics().genericGetMax();
+ }
+ break;
+ default:
+ returnedObj =
+ isMin
+ ? columnMetaData.getStatistics().genericGetMin()
+ : columnMetaData.getStatistics().genericGetMax();
+ // TODO JSON and DECIMAL... of BINARY primitiveType
+ }
+ return returnedObj;
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
index 8d02b392..a1393b98 100644
---
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -36,6 +36,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
@@ -55,12 +56,15 @@ public class ParquetStatsExtractor {
private static final ParquetMetadataExtractor parquetMetadataExtractor =
ParquetMetadataExtractor.getInstance();
- // private static final InputPartitionFields partitions = null;
-
public static ParquetStatsExtractor getInstance() {
return INSTANCE;
}
+ private static final ParquetPartitionValueExtractor partitionValueExtractor =
+ ParquetPartitionValueExtractor.getInstance();
+ private static PathBasedPartitionSpecExtractor partitionSpecExtractor =
+ ParquetPartitionSpecExtractor.getInstance();
+
public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata
footer) {
return getStatsForFile(footer).values().stream()
.flatMap(List::stream)
@@ -112,39 +116,37 @@ public class ParquetStatsExtractor {
.totalSize(columnMetaData.getTotalSize())
.range(
Range.vector(
-
columnMetaData.getStatistics().genericGetMin(),
-
columnMetaData.getStatistics().genericGetMax()))
+ ParquetStatsConverterUtil
+
.convertStatBinaryTypeToLogicalType(
+ columnMetaData,
+ true), // if stats are string
convert to
+ // litteraly a string stat and
+ // store to range
+ ParquetStatsConverterUtil
+
.convertStatBinaryTypeToLogicalType(
+ columnMetaData, false)))
.build(),
Collectors.toList())));
return columnDescStats;
}
- /* private static InputPartitionFields initPartitionInfo() {
- return partitions;
- }*/
-
public static InternalDataFile toInternalDataFile(Configuration hadoopConf,
Path parentPath)
throws IOException {
- FileStatus file = null;
- List<PartitionValue> partitionValues = null;
- ParquetMetadata footer = null;
- List<ColumnStat> columnStatsForAFile = null;
- try {
- FileSystem fs = FileSystem.get(hadoopConf);
- file = fs.getFileStatus(parentPath);
- // InputPartitionFields partitionInfo = initPartitionInfo();
- footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parentPath);
- MessageType schema = parquetMetadataExtractor.getSchema(footer);
- columnStatsForAFile = getColumnStatsForaFile(footer);
- // partitionValues = partitionExtractor.createPartitionValues(
- // partitionInfo);
- } catch (java.io.IOException e) {
-
- }
+ FileSystem fs = FileSystem.get(hadoopConf);
+ FileStatus file = fs.getFileStatus(parentPath);
+ ParquetMetadata footer =
parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath);
+ List<ColumnStat> columnStatsForAFile = getColumnStatsForaFile(footer);
+ List<PartitionValue> partitionValues =
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+ partitionValueExtractor.extractSchemaForParquetPartitions(
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
file.getPath()),
+ file.getPath().toString())),
+ parentPath.toString());
return InternalDataFile.builder()
.physicalPath(parentPath.toString())
.fileFormat(FileFormat.APACHE_PARQUET)
- // .partitionValues(partitionValues)
+ .partitionValues(partitionValues)
.fileSizeBytes(file.getLen())
.recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
.columnStats(columnStatsForAFile)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
index 9e965c9d..68b40d35 100644
---
a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
+++
b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
@@ -18,10 +18,10 @@
package org.apache.xtable.reflection;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
-import java.util.Arrays;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -43,17 +43,28 @@ public class ReflectionUtils {
if (constructorArgs.length == 0) {
return clazz.newInstance();
}
- Class<?>[] constructorArgTypes =
-
Arrays.stream(constructorArgs).map(Object::getClass).toArray(Class[]::new);
- if (hasConstructor(clazz, constructorArgTypes)) {
- return
clazz.getConstructor(constructorArgTypes).newInstance(constructorArgs);
- } else {
- return clazz.newInstance();
+ for (Constructor<?> constructor : clazz.getConstructors()) {
+ Class<?>[] parameterTypes = constructor.getParameterTypes();
+ if (parameterTypes.length == constructorArgs.length) {
+ boolean matches = true;
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if
(!parameterTypes[i].isAssignableFrom(constructorArgs[i].getClass())) {
+ matches = false;
+ break;
+ }
+ }
+ if (matches) {
+ return (T) constructor.newInstance(constructorArgs);
+ }
+ }
}
+ throw new NoSuchMethodException(
+ "Could not find a suitable constructor for class: " + className);
+
} catch (InstantiationException
| IllegalAccessException
- | NoSuchMethodException
- | InvocationTargetException e) {
+ | InvocationTargetException
+ | NoSuchMethodException e) {
throw new ConfigurationException("Unable to load class: " + className,
e);
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
b/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
index 78b483ac..daadfef9 100644
--- a/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
+++ b/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
@@ -45,7 +45,8 @@ public class SchemaFieldFinder {
* @return the field if it exists, otherwise returns null
*/
public InternalField findFieldByPath(InternalSchema schema, String path) {
- return findFieldByPath(schema, path.split("\\."), 0);
+ int startIndex = 0;
+ return findFieldByPath(schema, path.split("\\."), startIndex);
}
private InternalField findFieldByPath(InternalSchema schema, String[]
pathParts, int startIndex) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index db7e776b..14395e0d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -21,6 +21,7 @@ package org.apache.xtable;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import java.nio.file.Path;
import java.util.Arrays;
@@ -32,6 +33,8 @@ import org.apache.spark.sql.SparkSession;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.xtable.parquet.TestSparkParquetTable;
+
public interface GenericTable<T, Q> extends AutoCloseable {
// A list of values for the level field which serves as a basic field to
partition on for tests
List<String> LEVEL_VALUES = Arrays.asList("INFO", "WARN", "ERROR");
@@ -76,6 +79,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
String sourceFormat,
boolean isPartitioned) {
switch (sourceFormat) {
+ case PARQUET:
+ return TestSparkParquetTable.forStandardSchemaAndPartitioning(
+ tableName, tempDir, jsc, isPartitioned);
case HUDI:
return TestSparkHudiTable.forStandardSchemaAndPartitioning(
tableName, tempDir, jsc, isPartitioned);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index dc90d4d5..bda54c0f 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -24,6 +24,7 @@ import static
org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PARQUET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -485,7 +486,11 @@ public class ITConversionController {
private static List<String> getOtherFormats(String sourceTableFormat) {
return Arrays.stream(TableFormat.values())
- .filter(format -> !format.equals(sourceTableFormat))
+ .filter(
+ format ->
+ !format.equals(sourceTableFormat)
+ && !format.equals(
+ PARQUET)) // excluded file formats because upset,
insert etc. not supported
.collect(Collectors.toList());
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
index 5dd00174..b1cba5c7 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
@@ -693,9 +693,9 @@ public class ITHudiConversionSource {
.setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true)
.build();
- HudiSourcePartitionSpecExtractor partitionSpecExtractor =
+ PathBasedPartitionSpecExtractor partitionSpecExtractor =
new ConfigurationBasedPartitionSpecExtractor(
-
HudiSourceConfig.fromPartitionFieldSpecConfig(xTablePartitionConfig));
+ HudiSourceConfig.parsePartitionFieldSpecs(xTablePartitionConfig));
return new HudiConversionSource(hoodieTableMetaClient,
partitionSpecExtractor);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
index c5915b9e..4d3faa50 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
@@ -74,7 +74,7 @@ public class TestHudiPartitionValuesExtractor {
PartitionValue.builder().partitionField(column).range(Range.scalar("foo")).build());
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(Collections.singletonList(column), "foo");
Assertions.assertEquals(expected, actual);
}
@@ -100,7 +100,7 @@ public class TestHudiPartitionValuesExtractor {
PartitionValue.builder().partitionField(column).range(Range.scalar("foo/bar")).build());
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(Collections.singletonList(column),
"foo/bar");
Assertions.assertEquals(expected, actual);
}
@@ -239,7 +239,7 @@ public class TestHudiPartitionValuesExtractor {
Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
pathToPartitionFieldFormat.put(column.getSourceField().getPath(), format);
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+ new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
.extractPartitionValues(Collections.singletonList(column),
partitionString);
Assertions.assertEquals(expected, actual);
}
@@ -298,7 +298,7 @@ public class TestHudiPartitionValuesExtractor {
Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
pathToPartitionFieldFormat.put(column2.getSourceField().getPath(),
"yyyy/MM/dd");
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+ new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
.extractPartitionValues(Arrays.asList(column1, column2, column3),
"foo/2022/10/02/32");
Assertions.assertEquals(expected, actual);
}
@@ -350,7 +350,7 @@ public class TestHudiPartitionValuesExtractor {
Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
pathToPartitionFieldFormat.put(column2.getSourceField().getPath(),
"yyyy-MM-dd");
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+ new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
.extractPartitionValues(
Arrays.asList(column1, column2, column3),
"foo/__HIVE_DEFAULT_PARTITION__/32");
Assertions.assertEquals(expected, actual);
@@ -388,7 +388,7 @@ public class TestHudiPartitionValuesExtractor {
PartitionValue.builder().partitionField(column2).range(Range.scalar(32L)).build());
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(Arrays.asList(column1, column2),
"column1=foo/column2=32");
Assertions.assertEquals(expected, actual);
}
@@ -425,7 +425,7 @@ public class TestHudiPartitionValuesExtractor {
PartitionValue.builder().partitionField(column1).range(Range.scalar(null)).build());
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(
Arrays.asList(column2, column1),
"column2=32/column1=__HIVE_DEFAULT_PARTITION__");
Assertions.assertEquals(expected, actual);
@@ -461,14 +461,14 @@ public class TestHudiPartitionValuesExtractor {
Assertions.assertThrows(
PartitionValuesExtractorException.class,
() ->
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(Arrays.asList(column1, column2),
"foo"));
}
@Test
public void testNoPartitionColumnsConfigured() {
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(Collections.emptyList(),
"column1=foo/column2=32");
Assertions.assertTrue(actual.isEmpty());
}
@@ -476,7 +476,7 @@ public class TestHudiPartitionValuesExtractor {
@Test
public void testNullPartitionColumns() {
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(null, "column1=foo/column2=32");
Assertions.assertTrue(actual.isEmpty());
}
@@ -503,7 +503,7 @@ public class TestHudiPartitionValuesExtractor {
Assertions.assertThrows(
PartitionValuesExtractorException.class,
() ->
- new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+ new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
.extractPartitionValues(Collections.singletonList(column),
"2022-10-02"));
}
@@ -564,7 +564,7 @@ public class TestHudiPartitionValuesExtractor {
.collect(Collectors.toList());
List<PartitionValue> actual =
- new HudiPartitionValuesExtractor(Collections.emptyMap())
+ new PathBasedPartitionValuesExtractor(Collections.emptyMap())
.extractPartitionValues(partitionFields, partitionPath);
Assertions.assertEquals(expected, actual);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
new file mode 100644
index 00000000..6dda9db1
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+ public static final String PARTITION_FIELD_SPEC_CONFIG =
+ "xtable.parquet.source.partition_field_spec_config";
+ @TempDir public static Path tempDir;
+ private static JavaSparkContext jsc;
+ private static SparkSession sparkSession;
+ private static StructType schema;
+
+ @BeforeAll
+ public static void setupOnce() {
+ SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+ sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+ sparkConf.set("parquet.avro.write-old-list-structure", "false");
+ sparkConf.set("spark.sql.parquet.writeLegacyFormat", "false");
+ sparkConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS");
+
+ sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+ jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+ }
+
+ @AfterAll
+ public static void teardown() {
+ if (jsc != null) {
+ jsc.stop();
+ jsc = null;
+ }
+ if (sparkSession != null) {
+ sparkSession.stop();
+ sparkSession = null;
+ }
+ }
+ // delimiter must be / and not - or any other one
+ private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+ String partitionConfig = // "timestamp:YEAR:year=yyyy";
+ "timestamp:MONTH:year=yyyy/month=MM"; // or
"timestamp:YEAR:year=yyyy", or //
+ // timestamp:DAY:year=yyyy/month=MM/day=dd
+ return Stream.of(
+ Arguments.of(
+ buildArgsForPartition(
+ PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig,
partitionConfig)));
+ }
+
+ private static TableFormatPartitionDataHolder buildArgsForPartition(
+ String sourceFormat,
+ List<String> targetFormats,
+ String hudiPartitionConfig,
+ String xTablePartitionConfig) {
+ return TableFormatPartitionDataHolder.builder()
+ .sourceTableFormat(sourceFormat)
+ .targetTableFormats(targetFormats)
+ .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
+ .xTablePartitionConfig(xTablePartitionConfig)
+ .build();
+ }
+
+ private static ConversionConfig getTableSyncConfig(
+ String sourceTableFormat,
+ SyncMode syncMode,
+ String tableName,
+ GenericTable table,
+ List<String> targetTableFormats,
+ String partitionConfig,
+ Duration metadataRetention) {
+ Properties sourceProperties = new Properties();
+ if (partitionConfig != null) {
+ sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+ }
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(tableName)
+ .formatName(sourceTableFormat)
+ .basePath(table.getBasePath())
+ .dataPath(table.getDataPath())
+ .additionalProperties(sourceProperties)
+ .build();
+
+ List<TargetTable> targetTables =
+ targetTableFormats.stream()
+ .map(
+ formatName ->
+ TargetTable.builder()
+ .name(tableName)
+ .formatName(formatName)
+ // set the metadata path to the data path as the
default (required by Hudi)
+ .basePath(table.getDataPath())
+ .metadataRetention(metadataRetention)
+ .build())
+ .collect(Collectors.toList());
+
+ return ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .syncMode(syncMode)
+ .build();
+ }
+
+ private static Stream<Arguments> provideArgsForFileNonPartitionTesting() {
+ String partitionConfig = null;
+ return Stream.of(
+ Arguments.of(
+ buildArgsForPartition(
+ PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig,
partitionConfig)));
+ }
+
+ private ConversionSourceProvider<?> getConversionSourceProvider(String
sourceTableFormat) {
+ if (sourceTableFormat.equalsIgnoreCase(PARQUET)) {
+ ConversionSourceProvider<Long> parquetConversionSourceProvider =
+ new ParquetConversionSourceProvider();
+ parquetConversionSourceProvider.init(jsc.hadoopConfiguration());
+ return parquetConversionSourceProvider;
+ } else {
+ throw new IllegalArgumentException("Unsupported source format: " +
sourceTableFormat);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgsForFileNonPartitionTesting")
+ public void testFileNonPartitionedData(
+ TableFormatPartitionDataHolder tableFormatPartitionDataHolder) throws
URISyntaxException {
+ String tableName = getTableName();
+ String sourceTableFormat =
tableFormatPartitionDataHolder.getSourceTableFormat();
+ List<String> targetTableFormats =
tableFormatPartitionDataHolder.getTargetTableFormats();
+ String xTablePartitionConfig =
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+ ConversionSourceProvider<?> conversionSourceProvider =
+ getConversionSourceProvider(sourceTableFormat);
+
+ List<Row> data =
+ Arrays.asList(
+ RowFactory.create(1, "Alice", true, 30.1, new
Timestamp(System.currentTimeMillis())),
+ RowFactory.create(
+ 2, "Bob", false, 24.6, new
Timestamp(System.currentTimeMillis() + 1000)),
+ RowFactory.create(
+ 3, "Charlie", true, 35.2, new
Timestamp(System.currentTimeMillis() + 2000)),
+ RowFactory.create(
+ 4, "David", false, 29.5, new
Timestamp(System.currentTimeMillis() + 3000)),
+ RowFactory.create(
+ 5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis()
+ 4000)));
+
+ schema =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, false),
+ DataTypes.createStructField("name", DataTypes.StringType, false),
+ DataTypes.createStructField("hasSiblings",
DataTypes.BooleanType, false),
+ DataTypes.createStructField("age", DataTypes.DoubleType, false),
+ DataTypes.createStructField(
+ "timestamp",
+ DataTypes.TimestampType,
+ false,
+ new MetadataBuilder().putString("precision",
"millis").build())
+ });
+ Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+ String dataPath = tempDir.toAbsolutePath().toString() +
"/non_partitioned_data";
+ df.write().mode(SaveMode.Overwrite).parquet(dataPath);
+ GenericTable table;
+ table =
+ GenericTable.getInstance(
+ tableName, Paths.get(dataPath), sparkSession, jsc,
sourceTableFormat, false);
+ try (GenericTable tableToClose = table) {
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ SyncMode.FULL,
+ tableName,
+ table,
+ targetTableFormats,
+ xTablePartitionConfig,
+ null);
+ ConversionController conversionController =
+ new ConversionController(jsc.hadoopConfiguration());
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose,
targetTableFormats, false);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgsForFilePartitionTesting")
+ public void testFilePartitionedData(TableFormatPartitionDataHolder
tableFormatPartitionDataHolder)
+ throws URISyntaxException {
+ String tableName = getTableName();
+ String sourceTableFormat =
tableFormatPartitionDataHolder.getSourceTableFormat();
+ List<String> targetTableFormats =
tableFormatPartitionDataHolder.getTargetTableFormats();
+ String xTablePartitionConfig =
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+ ConversionSourceProvider<?> conversionSourceProvider =
+ getConversionSourceProvider(sourceTableFormat);
+ // create the data
+ List<Row> data =
+ Arrays.asList(
+ RowFactory.create(1, "Alice", true, 30.1, new
Timestamp(System.currentTimeMillis())),
+ RowFactory.create(
+ 2, "Bob", false, 24.6, new
Timestamp(System.currentTimeMillis() + 1000)),
+ RowFactory.create(
+ 3, "Charlie", true, 35.2, new
Timestamp(System.currentTimeMillis() + 2000)),
+ RowFactory.create(
+ 4, "David", false, 29.5, new
Timestamp(System.currentTimeMillis() + 3000)),
+ RowFactory.create(
+ 5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis()
+ 4000)));
+
+ schema =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, false),
+ DataTypes.createStructField("name", DataTypes.StringType, false),
+ DataTypes.createStructField("hasSiblings",
DataTypes.BooleanType, false),
+ DataTypes.createStructField("age", DataTypes.DoubleType, false),
+ DataTypes.createStructField(
+ "timestamp",
+ DataTypes.TimestampType,
+ false,
+ new MetadataBuilder().putString("precision",
"millis").build())
+ });
+ Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+ String dataPathPart = tempDir.toAbsolutePath() + "/partitioned_data";
+ df.withColumn("year",
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+ .withColumn(
+ "month",
+
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType),
"MM"))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .partitionBy("year", "month")
+ .parquet(dataPathPart);
+ GenericTable table;
+ table =
+ GenericTable.getInstance(
+ tableName, Paths.get(dataPathPart), sparkSession, jsc,
sourceTableFormat, true);
+ try (GenericTable tableToClose = table) {
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ SyncMode.FULL,
+ tableName,
+ table,
+ targetTableFormats,
+ xTablePartitionConfig,
+ null);
+ ConversionController conversionController =
+ new ConversionController(jsc.hadoopConfiguration());
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose,
targetTableFormats, true);
+ // update the current parquet file data with another attribute the sync
again
+ List<Row> dataToAppend =
+ Arrays.asList(
+ RowFactory.create(
+ 10,
+ "BobAppended",
+ false,
+ 70.3,
+ new Timestamp(System.currentTimeMillis() + 1500)));
+
+ Dataset<Row> dfAppend = sparkSession.createDataFrame(dataToAppend,
schema);
+ dfAppend
+ .withColumn(
+ "year",
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+ .withColumn(
+ "month",
+
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType),
"MM"))
+ .write()
+ .mode(SaveMode.Append)
+ .partitionBy("year", "month")
+ .parquet(dataPathPart);
+ GenericTable tableAppend;
+ tableAppend =
+ GenericTable.getInstance(
+ tableName, Paths.get(dataPathPart), sparkSession, jsc,
sourceTableFormat, true);
+ try (GenericTable tableToCloseAppended = tableAppend) {
+ ConversionConfig conversionConfigAppended =
+ getTableSyncConfig(
+ sourceTableFormat,
+ SyncMode.FULL,
+ tableName,
+ tableAppend,
+ targetTableFormats,
+ xTablePartitionConfig,
+ null);
+ ConversionController conversionControllerAppended =
+ new ConversionController(jsc.hadoopConfiguration());
+ conversionControllerAppended.sync(conversionConfigAppended,
conversionSourceProvider);
+ }
+ }
+ }
+
+ private void checkDatasetEquivalenceWithFilter(
+ String sourceFormat,
+ GenericTable<?, ?> sourceTable,
+ List<String> targetFormats,
+ boolean isPartitioned)
+ throws URISyntaxException {
+ checkDatasetEquivalence(
+ sourceFormat,
+ sourceTable,
+ Collections.emptyMap(),
+ targetFormats,
+ Collections.emptyMap(),
+ null,
+ isPartitioned);
+ }
+
+ private void checkDatasetEquivalence(
+ String sourceFormat,
+ GenericTable<?, ?> sourceTable,
+ Map<String, String> sourceOptions,
+ List<String> targetFormats,
+ Map<String, Map<String, String>> targetOptions,
+ Integer expectedCount,
+ boolean isPartitioned)
+ throws URISyntaxException {
+ Dataset<Row> sourceRows =
+ sparkSession
+ .read()
+ .schema(schema)
+ .options(sourceOptions)
+ .option("recursiveFileLookup", "true")
+ .option("pathGlobFilter", "*.parquet")
+ .parquet(sourceTable.getDataPath());
+ Map<String, Dataset<Row>> targetRowsByFormat =
+ targetFormats.stream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ targetFormat -> {
+ Map<String, String> finalTargetOptions =
+ targetOptions.getOrDefault(targetFormat,
Collections.emptyMap());
+ if (targetFormat.equals(HUDI)) {
+ finalTargetOptions = new HashMap<>(finalTargetOptions);
+
finalTargetOptions.put(HoodieMetadataConfig.ENABLE.key(), "true");
+ finalTargetOptions.put(
+
"hoodie.datasource.read.extract.partition.values.from.path", "true");
+ }
+ return sparkSession
+ .read()
+ .options(finalTargetOptions)
+ .format(targetFormat.toLowerCase())
+ .load(sourceTable.getDataPath());
+ }));
+
+ String[] selectColumnsArr = schema.fieldNames();
+ List<String> dataset1Rows =
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+
+ Set<Map.Entry<String, Dataset<Row>>> entrySet =
targetRowsByFormat.entrySet();
+
+ for (Map.Entry<String, Dataset<Row>> entry : entrySet) {
+
+ String format = entry.getKey();
+
+ Dataset<Row> targetRows = entry.getValue();
+ targetRows.show();
+
+ List<String> dataset2Rows =
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+
+ assertEquals(
+ dataset1Rows.size(),
+ dataset2Rows.size(),
+ String.format(
+ "Datasets have different row counts when reading from Spark.
Source: %s, Target: %s",
+ sourceFormat, format));
+
+ if (expectedCount != null) {
+ assertEquals(expectedCount, dataset1Rows.size());
+ } else {
+ assertFalse(dataset1Rows.isEmpty());
+ }
+ if (isPartitioned) { // discard Hudi case because the partitioned col
values won't be equal
+ if (!format.equals("HUDI")) {
+ assertEquals(
+ dataset1Rows,
+ dataset2Rows,
+ String.format(
+ "Datasets are not equivalent when reading from Spark.
Source: %s, Target: %s",
+ sourceFormat, format));
+ }
+ } else {
+ assertEquals(
+ dataset1Rows,
+ dataset2Rows,
+ String.format(
+ "Datasets are not equivalent when reading from Spark. Source:
%s, Target: %s",
+ sourceFormat, format));
+ }
+ }
+ }
+
+ @Builder
+ @Value
+ private static class TableFormatPartitionDataHolder {
+ String sourceTableFormat;
+ List<String> targetTableFormats;
+ String xTablePartitionConfig;
+ Optional<String> hudiSourceConfig;
+ String filter;
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
index 13d2299f..a78e07bf 100644
---
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -277,7 +277,7 @@ public class TestParquetSchemaExtractor {
InternalSchema.builder()
.name("my_record")
.dataType(InternalType.RECORD)
- .isNullable(true)
+ .isNullable(false)
.fields(
Arrays.asList(
InternalField.builder()
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
index 7522c292..df0aba33 100644
---
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -58,6 +58,7 @@ public class TestParquetStatsExtractor {
private static final ParquetSchemaExtractor schemaExtractor =
ParquetSchemaExtractor.getInstance();
+ private static final ParquetStatsExtractor statsExtractor =
ParquetStatsExtractor.getInstance();
@TempDir static java.nio.file.Path tempDir = Paths.get("./");
@@ -349,7 +350,7 @@ public class TestParquetStatsExtractor {
Path hadoopPath = new Path(file.toURI());
// statsExtractor toInternalDataFile testing
InternalDataFile internalDataFile =
- ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ statsExtractor.toInternalDataFile(configuration, hadoopPath);
InternalDataFile testInternalFile =
InternalDataFile.builder()
.physicalPath(
@@ -378,7 +379,7 @@ public class TestParquetStatsExtractor {
Path hadoopPath = new Path(file.toURI());
// statsExtractor toInternalDataFile testing
InternalDataFile internalDataFile =
- ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ statsExtractor.toInternalDataFile(configuration, hadoopPath);
InternalDataFile testInternalFile =
InternalDataFile.builder()
.physicalPath(
@@ -406,7 +407,7 @@ public class TestParquetStatsExtractor {
Path hadoopPath = new Path(file.toURI());
// statsExtractor toInternalDataFile testing
InternalDataFile internalDataFile =
- ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ statsExtractor.toInternalDataFile(configuration, hadoopPath);
InternalDataFile testInternalFile =
InternalDataFile.builder()
.physicalPath(
@@ -435,7 +436,7 @@ public class TestParquetStatsExtractor {
Path hadoopPath = new Path(file.toURI());
// statsExtractor toInternalDataFile testing
InternalDataFile internalDataFile =
- ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ statsExtractor.toInternalDataFile(configuration, hadoopPath);
InternalDataFile testInternalFile =
InternalDataFile.builder()
.physicalPath(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
new file mode 100644
index 00000000..67db30c4
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.xtable.GenericTable;
+
+public class TestSparkParquetTable implements GenericTable<Group, String> {
+ // Name of the table
+ protected String tableName;
+ // Base path for the table
+ protected String basePath;
+ protected String dataPath;
+ protected Path tempDir;
+ protected String partitionConfig;
+ protected MessageType schema;
+ protected List<String> partitionFieldNames;
+ private JavaSparkContext jsc;
+
+ private TestSparkParquetTable(
+ String name, Path tempDir, JavaSparkContext jsc, String partitionConfig)
{
+ // initialize spark session
+ try {
+ this.tableName = name;
+ this.tempDir = tempDir;
+ this.partitionConfig = partitionConfig;
+ this.jsc = jsc;
+ this.basePath = initBasePath(tempDir, name);
+ this.dataPath = initBasePath(tempDir, name);
+ } catch (IOException ex) {
+ throw new UncheckedIOException("Unable to initialize Test Parquet File",
ex);
+ }
+ }
+
+ public static TestSparkParquetTable forStandardSchemaAndPartitioning(
+ String tableName, Path tempDir, JavaSparkContext jsc, boolean
isPartitioned) {
+ String partitionConfig = isPartitioned ? "level:SIMPLE" : null;
+ return new TestSparkParquetTable(tableName, tempDir, jsc, partitionConfig);
+ }
+
+ protected String initBasePath(Path tempDir, String tableName) throws
IOException {
+ Path basePath = tempDir.resolve(tableName).getParent();
+ Files.createDirectories(basePath);
+ return basePath.toUri().toString();
+ }
+
+ public List<Group> insertRows(int numRows) {
+ return null;
+ }
+
+ public List<Group> insertRecordsForSpecialPartition(int numRows) {
+ return null;
+ }
+
+ public void upsertRows(List<Group> rows) {}
+
+ public void deleteRows(List<Group> rows) {}
+
+ public void deletePartition(String partitionValue) {}
+
+ public void deleteSpecialPartition() {}
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public String getDataPath() {
+ return dataPath;
+ }
+
+ public String getMetadataPath() {
+ return null;
+ }
+
+ public String getOrderByColumn() {
+ return null;
+ }
+
+ public void close() {}
+
+ public void reload() {}
+ // TODO either create BASIC_SCHEMA = new Schema.Parser().parse(schema); or
move this code out
+ public List<String> getColumnsToSelect() {
+ return
schema.getFields().stream().map(Type::getName).collect(Collectors.toList());
+ }
+
+ public String getFilterQuery() {
+ return null;
+ }
+}