This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 95e75f9d [553] parquet source with partition extraction (#728)
95e75f9d is described below

commit 95e75f9d580e81f4611a88635735feee9023f660
Author: Selim Soufargi <[email protected]>
AuthorDate: Mon Sep 15 03:02:04 2025 +0200

    [553] parquet source with partition extraction (#728)
    
    * First attempt at parquet source with partition extraction
    
    * refactored conversion source for reading parquetFiles once to 
getCurrentSnapshot()
    
    * file changes + refactoring of conversion source
    
    * cleanup
    
    * implement get commit identifier
    
    * Hudi Source Config uses PartitionFieldSpec.java
    
    * Hudi Source Config uses PartitionFieldSpec.java bug fix
    
    * refactoring for PartitionValueExtractor to use Hudi based related code
    
    * aligning ConversionSource tests to include Parquet
    
    * implemented remaining parquetSource methods + refactoring, TODO tests
    
    * default case as an exception for finding parquet files + javadocs for 
PartitionFieldSpec + no file diffs removed files
    
    * refactoring for specExtractor
    
    * table changes update does not check if table exists already
    
    * added test for conversion source
    
    * adjusted test for Hudi after update of HudiSourceConfig
    
    * fixed test bug for parquet and added partitionValues for statsExtractor
    
    * fixed test bug for stats extractor by defining instance obj instead of 
using static function
    
    * now using a string for the partition format input
    
    * conversion source const for the test using specExtractor
    
    * adjusted conversionSourceProvider for partition init
    
    * adjusted table creation with Spark for Parquet
    
    * fix tests for CI to pass
    
    * fix tests for CI to pass (reverted)
    
    * cleanups
    
    * parquet files collected in a stream now for memory issue
    
    * fixing CI error on partition config in tests
    
    * seperate test class for parquet source
    
    * adjusting pom config for Actions
    
    * tests hitting the source, needs to create the parquet files locally in 
the test through Spark
    
    * creating parquet files before Testing
    
    * test bug fix
    
    * cleanups
    
    * TODO implement class for test
    
    * TODO check why sync fails to all 3 formats
    
    * sourceField bug not null OK, sync failing on stat generation from 
InternalType
    
    * highlighting sync errors related to stats and partition fields
    
    * partition config read from the test class + cleanups
    
    * augmenting InternalSchema with the partitionField to get SourceField 
non-null
    
    * HudiSourceConfig const param type fix
    
    * duplicate col for partitions
    
    * delta stats serialization from parquet data bug fixed
    
    * removed old code and mvn spotless apply
    
    * fixed partitioning by year in tests
    
    * timestamp col is of type timestamp not string (todo schemaExtractor 
logicalTypes for INT96)
    
    * Delta sync OK, stat conversion issue for Iceberg, metadata issue for Hudi
    
    * Stat conversion patch for string (ICEBERG)
    
    * Parsing Date Values from Partition Path, ICEBERG & DELTA Sync OK
    
    * fix for Binary Stats conversion in Hudi + upgraded Parquet Version
    
    * set recordKey Fields for Hudi Metadata + spotless:apply
    
    * write data in a data path for proper reading of sourceTable after sync 
(Tests)
    
    * set parquet schema for reading the dataset (Tests)
    
    * bug fix of dataPath (Tests)
    
    * different metadata path than the parquet file's to prevent reading 
confusion related to paths (Tests)
    
    * read parquet sparksession for tests
    
    * read parquet sparksession for tests
    
    * update
    
    * reading part col from path disabled
    
    * spotless apply and CI pass error fixed
    
    * CI pass error fixed
    
    * CI pass error fixed
    
    * CI pass error fixed
    
    * CI pass error fixed
    
    * CI pass error fixed
    
    * refactored parquetSource + cleanups
    
    * fix schemaExtractor bug
    
    * fix Parquet related partition values extraction from path
    
    * fix for Iceberg sync tests for CI to pass
    
    * binary stats are converted into string stats in parquet stats extractor
    
    * binary stats converted per logical type case: STRING....
    
    * testing dataset equivalence differently for Hudi (partition column 
removed)
    
    * set source identifier for current snapshot as the last modif time
    
    * set spark config for CI
    
    * reformatting + cleanups
    
    * iceberg CI error fix
    
    * delta CI error fix
    
    * delta CI error fix
    
    * refactoring for the statsExtractor
    
    * refactoring for the statsExtractor
    
    * refactoring for the statsExtractor
    
    * many partitions as input for parquet
    
    * many partitions as input for parquet
    
    * revert change in hudi spec extractor
    
    * fix for isNullable records
    
    * fix for isNullable records
    
    * cleanups
    
    * cleanups
    
    * revert hudi version to 0.14.0
    
    * spotless:apply
    
    * added test for non partitioned data + adjusted for cases of Month, Day 
partitions
    
    * resolving the merge conflict1
    
    * resolving the merge conflict1
    
    * resolving the merge conflict2
    
    * fix for Iceberg for (same key) partition values merge conflict
    
    * fix for reading the datasets in the test class + disable equivalence test 
for Hudi do to difference in parttition values
    
    * spotless:apply
    
    * added boolean type to the testing dataset
    
    * reformatting
    
    * cleanups
    
    * refactoring + reformatting
    
    * more data types in the testing dataset + refactoring
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * refactor test to seperate partitioned and non-partitioned cases
    
    * reformatting
    
    * re-read partitioned data -> create new data with additional rows -> 
partition again ->sync again
    
    * fix writing second dataset bug
    
    * fix writing second dataset bug
    
    * fix datasets equivalence bug
    
    * reformatting
    
    * reformatting
    
    * add more data than append to partitioned file then sync again test added
    
    * add more data than append to partitioned file then sync again test added
    
    * reformatting
    
    * cleanups
    
    * add test for Hudi format for non-partitioned data
    
    * bug fix iceberg partition values
    
    * bug fix iceberg partition values: revert Iceberg code
    
    * parse partition values fix
    
    * fix values parsing by lowering case the partitioning config
    
    * fix values parsing by lowering case the partitioning config 2
    
    * isIncrementalSafe returns false + cleanups + formatting
    
    * address remaining feedback, remove unnecessary changes, cleanup
    
    ---------
    
    Co-authored-by: Selim Soufargi <[email protected]~>
    Co-authored-by: Timothy Brown <[email protected]>
---
 pom.xml                                            |  36 +-
 .../PartitionFieldSpec.java}                       |  22 +-
 .../apache/xtable/model/storage/TableFormat.java   |   1 +
 .../xtable/hudi/BaseFileUpdatesExtractor.java      |   2 +-
 .../ConfigurationBasedPartitionSpecExtractor.java  |  26 +-
 .../apache/xtable/hudi/HudiConversionSource.java   |   4 +-
 .../xtable/hudi/HudiConversionSourceProvider.java  |   2 +-
 .../apache/xtable/hudi/HudiDataFileExtractor.java  |   4 +-
 .../org/apache/xtable/hudi/HudiSourceConfig.java   |  15 +-
 ...r.java => PathBasedPartitionSpecExtractor.java} |   6 +-
 ...java => PathBasedPartitionValuesExtractor.java} |  18 +-
 .../xtable/parquet/ParquetConversionSource.java    | 242 +++++++++++
 .../parquet/ParquetConversionSourceProvider.java   |  43 ++
 .../ParquetPartitionSpecExtractor.java}            |  51 ++-
 .../parquet/ParquetPartitionValueExtractor.java    | 138 ++++++
 .../xtable/parquet/ParquetSchemaExtractor.java     |  21 +-
 .../ParquetSourceConfig.java}                      |  43 +-
 .../xtable/parquet/ParquetStatsConverterUtil.java  |  69 +++
 .../xtable/parquet/ParquetStatsExtractor.java      |  52 +--
 .../apache/xtable/reflection/ReflectionUtils.java  |  29 +-
 .../apache/xtable/schema/SchemaFieldFinder.java    |   3 +-
 .../test/java/org/apache/xtable/GenericTable.java  |   6 +
 .../org/apache/xtable/ITConversionController.java  |   7 +-
 .../apache/xtable/hudi/ITHudiConversionSource.java |   4 +-
 .../hudi/TestHudiPartitionValuesExtractor.java     |  24 +-
 .../xtable/parquet/ITParquetConversionSource.java  | 462 +++++++++++++++++++++
 .../xtable/parquet/TestParquetSchemaExtractor.java |   2 +-
 .../xtable/parquet/TestParquetStatsExtractor.java  |   9 +-
 .../xtable/parquet/TestSparkParquetTable.java      | 117 ++++++
 29 files changed, 1282 insertions(+), 176 deletions(-)

diff --git a/pom.xml b/pom.xml
index bed4d63b..ff80b954 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
         <!-- To ensure build content is re-producible -->
         
<project.build.outputTimestamp>2025-01-01T00:00:00Z</project.build.outputTimestamp>
         <maven.compiler.target>8</maven.compiler.target>
+        <maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
         <avro.version>1.11.4</avro.version>
         <log4j.version>2.22.0</log4j.version>
         <junit.version>5.11.4</junit.version>
@@ -77,7 +78,7 @@
         <maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
         <maven-release-plugin.version>2.5.3</maven-release-plugin.version>
         <mockito.version>5.15.2</mockito.version>
-        <parquet.version>1.15.1</parquet.version>
+        <parquet.version>1.15.2</parquet.version>
         <protobuf.version>3.25.5</protobuf.version>
         <scala12.version>2.12.20</scala12.version>
         <scala13.version>2.13.15</scala13.version>
@@ -90,7 +91,7 @@
         <jackson.version>2.18.2</jackson.version>
         <spotless.version>2.43.0</spotless.version>
         <apache.rat.version>0.16.1</apache.rat.version>
-        <google.java.format.version>1.8</google.java.format.version>
+        <google.java.format.version>1.10.0</google.java.format.version>
         <delta.standalone.version>3.3.0</delta.standalone.version>
         <delta.hive.version>3.0.0</delta.hive.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -657,23 +658,19 @@
                 </executions>
             </plugin>
             <plugin>
-                <groupId>org.projectlombok</groupId>
-                <artifactId>lombok-maven-plugin</artifactId>
-                <version>${lombok-maven-plugin.version}</version>
-                <configuration>
-                    
<sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
-                    <addOutputDirectory>false</addOutputDirectory>
-                    <outputDirectory>${delombok.output.dir}</outputDirectory>
-                    <encoding>UTF-8</encoding>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>delombok</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler-plugin.version}</version> 
<configuration>
+                <source>${maven.compiler.source}</source>
+                <target>${maven.compiler.target}</target>
+                <annotationProcessorPaths>
+                    <path>
+                        <groupId>org.projectlombok</groupId>
+                        <artifactId>lombok</artifactId>
+                        <version>${lombok.version}</version>
+                    </path>
+                </annotationProcessorPaths>
+            </configuration>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -704,6 +701,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.2</version>
                 <executions>
                     <execution>
                         <goals>
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
similarity index 69%
copy from 
xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
copy to 
xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
index bea0b477..4427fe14 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionFieldSpec.java
@@ -16,19 +16,19 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.model.storage;
+package org.apache.xtable.model.schema;
+
+import lombok.Value;
 
 /**
- * Default constants for supported Table Formats
+ * PartitionFieldSpec represents a schema for the partition format as 
specified through the user
+ * configuration for the conversion
  *
- * @since 0.1
+ * @since 0.3
  */
-public class TableFormat {
-  public static final String HUDI = "HUDI";
-  public static final String ICEBERG = "ICEBERG";
-  public static final String DELTA = "DELTA";
-
-  public static String[] values() {
-    return new String[] {"HUDI", "ICEBERG", "DELTA"};
-  }
+@Value
+public class PartitionFieldSpec {
+  String sourceFieldPath;
+  PartitionTransformType transformType;
+  String format;
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index bea0b477..9d89de6a 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,6 +27,7 @@ public class TableFormat {
   public static final String HUDI = "HUDI";
   public static final String ICEBERG = "ICEBERG";
   public static final String DELTA = "DELTA";
+  public static final String PARQUET = "PARQUET";
 
   public static String[] values() {
     return new String[] {"HUDI", "ICEBERG", "DELTA"};
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index ba1f9f9d..325fc8a5 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -252,7 +252,7 @@ public class BaseFileUpdatesExtractor {
                     columnStat.getNumValues(),
                     columnStat.getTotalSize(),
                     -1L))
-        .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
Function.identity()));
+        .collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
metadata -> metadata));
   }
 
   /** Holds the information needed to create a "replace" commit in the Hudi 
table. */
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
index 7bc41a10..4eda2059 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
@@ -28,6 +28,7 @@ import lombok.AllArgsConstructor;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
 import org.apache.xtable.schema.SchemaFieldFinder;
 
 /**
@@ -35,14 +36,13 @@ import org.apache.xtable.schema.SchemaFieldFinder;
  * path:type:format for date types or path:type for value types.
  */
 @AllArgsConstructor
-public class ConfigurationBasedPartitionSpecExtractor implements 
HudiSourcePartitionSpecExtractor {
-  private final HudiSourceConfig config;
+public class ConfigurationBasedPartitionSpecExtractor implements 
PathBasedPartitionSpecExtractor {
+  private final List<PartitionFieldSpec> partitionFieldSpecs;
 
   @Override
   public List<InternalPartitionField> spec(InternalSchema tableSchema) {
-    List<InternalPartitionField> partitionFields =
-        new ArrayList<>(config.getPartitionFieldSpecs().size());
-    for (HudiSourceConfig.PartitionFieldSpec fieldSpec : 
config.getPartitionFieldSpecs()) {
+    List<InternalPartitionField> partitionFields = new 
ArrayList<>(partitionFieldSpecs.size());
+    for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
       InternalField sourceField =
           SchemaFieldFinder.getInstance()
               .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
@@ -58,15 +58,13 @@ public class ConfigurationBasedPartitionSpecExtractor 
implements HudiSourceParti
   @Override
   public Map<String, String> getPathToPartitionFieldFormat() {
     Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
-    config
-        .getPartitionFieldSpecs()
-        .forEach(
-            partitionFieldSpec -> {
-              if (partitionFieldSpec.getFormat() != null) {
-                pathToPartitionFieldFormat.put(
-                    partitionFieldSpec.getSourceFieldPath(), 
partitionFieldSpec.getFormat());
-              }
-            });
+    partitionFieldSpecs.forEach(
+        partitionFieldSpec -> {
+          if (partitionFieldSpec.getFormat() != null) {
+            pathToPartitionFieldFormat.put(
+                partitionFieldSpec.getSourceFieldPath(), 
partitionFieldSpec.getFormat());
+          }
+        });
     return pathToPartitionFieldFormat;
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index cb65c341..00faa97d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -59,14 +59,14 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
 
   public HudiConversionSource(
       HoodieTableMetaClient metaClient,
-      HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor) {
+      PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor) {
     this.metaClient = metaClient;
     this.tableExtractor =
         new HudiTableExtractor(new HudiSchemaExtractor(), 
sourcePartitionSpecExtractor);
     this.dataFileExtractor =
         new HudiDataFileExtractor(
             metaClient,
-            new HudiPartitionValuesExtractor(
+            new PathBasedPartitionValuesExtractor(
                 sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()),
             new HudiFileStatsExtractor(metaClient));
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index 0ddbbcb7..aad7e0a1 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -43,7 +43,7 @@ public class HudiConversionSourceProvider extends 
ConversionSourceProvider<Hoodi
       log.warn("Source table is Merge On Read. Only base files will be 
synced");
     }
 
-    final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor =
+    final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor =
         HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
             .loadSourcePartitionSpecExtractor();
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index 77c0ca98..5e17b389 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -75,7 +75,7 @@ public class HudiDataFileExtractor implements AutoCloseable {
   private final HoodieTableMetadata tableMetadata;
   private final HoodieTableMetaClient metaClient;
   private final HoodieEngineContext engineContext;
-  private final HudiPartitionValuesExtractor partitionValuesExtractor;
+  private final PathBasedPartitionValuesExtractor partitionValuesExtractor;
   private final HudiFileStatsExtractor fileStatsExtractor;
   private final HoodieMetadataConfig metadataConfig;
   private final FileSystemViewManager fileSystemViewManager;
@@ -83,7 +83,7 @@ public class HudiDataFileExtractor implements AutoCloseable {
 
   public HudiDataFileExtractor(
       HoodieTableMetaClient metaClient,
-      HudiPartitionValuesExtractor hudiPartitionValuesExtractor,
+      PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor,
       HudiFileStatsExtractor hudiFileStatsExtractor) {
     this.engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     metadataConfig =
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
index 606b9281..7732c382 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
@@ -28,6 +28,7 @@ import lombok.Value;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.xtable.model.schema.PartitionFieldSpec;
 import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.reflection.ReflectionUtils;
 
@@ -59,14 +60,7 @@ public class HudiSourceConfig {
     return new HudiSourceConfig(partitionSpecExtractorClass, 
partitionFieldSpecs);
   }
 
-  @Value
-  static class PartitionFieldSpec {
-    String sourceFieldPath;
-    PartitionTransformType transformType;
-    String format;
-  }
-
-  private static List<PartitionFieldSpec> parsePartitionFieldSpecs(String 
input) {
+  public static List<PartitionFieldSpec> parsePartitionFieldSpecs(String 
input) {
     if (input == null || input.isEmpty()) {
       return Collections.emptyList();
     }
@@ -84,9 +78,10 @@ public class HudiSourceConfig {
     return partitionFields;
   }
 
-  public HudiSourcePartitionSpecExtractor loadSourcePartitionSpecExtractor() {
+  public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() {
     Preconditions.checkNotNull(
         partitionSpecExtractorClass, "HudiSourcePartitionSpecExtractor class 
not provided");
-    return ReflectionUtils.createInstanceOfClass(partitionSpecExtractorClass, 
this);
+    return ReflectionUtils.createInstanceOfClass(
+        partitionSpecExtractorClass, this.getPartitionFieldSpecs());
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
similarity index 88%
rename from 
xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
rename to 
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
index c69a8b8d..982bb781 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourcePartitionSpecExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionSpecExtractor.java
@@ -23,9 +23,9 @@ import java.util.Map;
 import org.apache.xtable.spi.extractor.SourcePartitionSpecExtractor;
 
 /**
- * Partition spec extractor interface specifically designed for Hudi to parse 
partition values
- * appropriately.
+ * Partition spec extractor interface specifically designed for Hudi/Parquet 
to parse partition
+ * values appropriately.
  */
-public interface HudiSourcePartitionSpecExtractor extends 
SourcePartitionSpecExtractor {
+public interface PathBasedPartitionSpecExtractor extends 
SourcePartitionSpecExtractor {
   Map<String, String> getPathToPartitionFieldFormat();
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
similarity index 93%
rename from 
xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
rename to 
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
index bf9f1264..09c7c824 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java
@@ -22,11 +22,7 @@ import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.*;
 
 import lombok.AllArgsConstructor;
 import lombok.NonNull;
@@ -38,11 +34,11 @@ import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.stat.PartitionValue;
 import org.apache.xtable.model.stat.Range;
 
-/** Extracts Partition Values for Hudi from Partition Path. */
+/** Extracts Partition Values for Hudi/Parquet from Partition Path. */
 @AllArgsConstructor
-public class HudiPartitionValuesExtractor {
+public class PathBasedPartitionValuesExtractor {
   private static final String HIVE_DEFAULT_PARTITION = 
"__HIVE_DEFAULT_PARTITION__";
-  @NonNull private final Map<String, String> pathToPartitionFieldFormat;
+  @NonNull protected final Map<String, String> pathToPartitionFieldFormat;
 
   public List<PartitionValue> extractPartitionValues(
       List<InternalPartitionField> partitionColumns, String partitionPath) {
@@ -103,7 +99,7 @@ public class HudiPartitionValuesExtractor {
     }
   }
 
-  private static PartialResult parseDate(String remainingPath, String format) {
+  protected static PartialResult parseDate(String remainingPath, String 
format) {
     try {
       String dateString = remainingPath.substring(0, format.length());
       SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
@@ -118,7 +114,7 @@ public class HudiPartitionValuesExtractor {
     }
   }
 
-  private static PartialResult parseValue(
+  protected static PartialResult parseValue(
       String remainingPath, InternalType sourceFieldType, boolean 
isSlashDelimited) {
     if (remainingPath.isEmpty()) {
       throw new PartitionValuesExtractorException("Missing partition value");
@@ -173,7 +169,7 @@ public class HudiPartitionValuesExtractor {
   }
 
   @Value
-  private static class PartialResult {
+  protected static class PartialResult {
     Object value;
     String remainingPath;
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
new file mode 100644
index 00000000..e7eff961
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.hudi.HudiPathUtils;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  private static final ParquetStatsExtractor parquetStatsExtractor =
+      ParquetStatsExtractor.getInstance();
+
+  private final ParquetPartitionValueExtractor partitionValueExtractor;
+  private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  private InternalTable createInternalTableFromFile(LocatedFileStatus 
latestFile) {
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.getPath());
+    MessageType parquetSchema = 
parquetMetadataExtractor.getSchema(parquetMetadata);
+    InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, 
"");
+    List<InternalPartitionField> partitionFields = 
partitionSpecExtractor.spec(schema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+        .build();
+  }
+
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+    // get parquetFile at specific time modificationTime
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+    return createInternalTableFromFile(file);
+  }
+
+  private Stream<InternalDataFile> 
getInternalDataFiles(Stream<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.map(
+        file ->
+            InternalDataFile.builder()
+                .physicalPath(file.getPath().toString())
+                .fileFormat(FileFormat.APACHE_PARQUET)
+                .fileSizeBytes(file.getLen())
+                .partitionValues(
+                    partitionValueExtractor.extractPartitionValues(
+                        partitionSpecExtractor.spec(
+                            
partitionValueExtractor.extractSchemaForParquetPartitions(
+                                parquetMetadataExtractor.readParquetMetadata(
+                                    hadoopConf, file.getPath()),
+                                file.getPath().toString())),
+                        HudiPathUtils.getPartitionPath(new Path(basePath), 
file.getPath())))
+                .lastModified(file.getModificationTime())
+                .columnStats(
+                    parquetStatsExtractor.getColumnStatsForaFile(
+                        
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath())))
+                .build());
+  }
+
+  private InternalDataFile createInternalDataFileFromParquetFile(FileStatus 
parquetFile) {
+    return InternalDataFile.builder()
+        .physicalPath(parquetFile.getPath().toString())
+        .partitionValues(
+            partitionValueExtractor.extractPartitionValues(
+                partitionSpecExtractor.spec(
+                    partitionValueExtractor.extractSchemaForParquetPartitions(
+                        parquetMetadataExtractor.readParquetMetadata(
+                            hadoopConf, parquetFile.getPath()),
+                        parquetFile.getPath().toString())),
+                basePath))
+        .lastModified(parquetFile.getModificationTime())
+        .fileSizeBytes(parquetFile.getLen())
+        .columnStats(
+            parquetStatsExtractor.getColumnStatsForaFile(
+                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath())))
+        .build();
+  }
+
+  @Override
+  public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync 
syncInstants) {
+    List<Long> commitsToProcess =
+        
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+    return 
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+  }
+
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+    List<FileStatus> tableChangesAfter =
+        parquetFiles
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    InternalTable internalTable = getMostRecentTable(parquetFiles);
+    for (FileStatus tableStatus : tableChangesAfter) {
+      InternalDataFile currentDataFile = 
createInternalDataFileFromParquetFile(tableStatus);
+      addedInternalDataFiles.add(currentDataFile);
+    }
+
+    return TableChange.builder()
+        .tableAsOfChange(internalTable)
+        
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+        .build();
+  }
+
+  private InternalTable getMostRecentTable(Stream<LocatedFileStatus> 
parquetFiles) {
+    LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+    return createInternalTableFromFile(latestFile);
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    Stream<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    return getMostRecentTable(parquetFiles);
+  }
+
+  /**
+   * get current snapshot
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    // to avoid consume the stream call the method twice to return the same 
stream of parquet files
+    Stream<InternalDataFile> internalDataFiles =
+        getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
+    InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, 
basePath));
+    return InternalSnapshot.builder()
+        .table(table)
+        .sourceIdentifier(
+            getCommitIdentifier(
+                getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath))
+                    .getModificationTime()))
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  private LocatedFileStatus getMostRecentParquetFile(Stream<LocatedFileStatus> 
parquetFiles) {
+    return parquetFiles
+        .max(Comparator.comparing(FileStatus::getModificationTime))
+        .orElseThrow(() -> new IllegalStateException("No files found"));
+  }
+
+  private LocatedFileStatus getParquetFileAt(
+      Stream<LocatedFileStatus> parquetFiles, long modificationTime) {
+    return parquetFiles
+        .filter(fileStatus -> fileStatus.getModificationTime() == 
modificationTime)
+        .findFirst()
+        .orElseThrow(() -> new IllegalStateException("No file found at " + 
modificationTime));
+  }
+
+  private Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, 
String basePath) {
+    try {
+      FileSystem fs = FileSystem.get(hadoopConf);
+      URI uriBasePath = new URI(basePath);
+      String parentPath = Paths.get(uriBasePath).toString();
+      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new 
Path(parentPath), true);
+      return RemoteIterators.toList(iterator).stream()
+          .filter(file -> file.getPath().getName().endsWith("parquet"));
+    } catch (IOException | URISyntaxException e) {
+      throw new ReadException("Unable to read files from file system", e);
+    }
+  }
+
+  @Override
+  public boolean isIncrementalSyncSafeFrom(Instant instant) {
+    return false;
+  }
+
+  @Override
+  public String getCommitIdentifier(Long aLong) {
+    return String.valueOf(aLong);
+  }
+
+  @Override
+  public void close() {}
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
new file mode 100644
index 00000000..0d508823
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Parquet 
file format. */
+public class ParquetConversionSourceProvider extends 
ConversionSourceProvider<Long> {
+  @Override
+  public ParquetConversionSource getConversionSourceInstance(SourceTable 
sourceTable) {
+    final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor =
+        
ParquetSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
+            .loadSourcePartitionSpecExtractor();
+    ParquetPartitionValueExtractor partitionValueExtractor =
+        new ParquetPartitionValueExtractor(
+            sourcePartitionSpecExtractor.getPathToPartitionFieldFormat());
+    return ParquetConversionSource.builder()
+        .tableName(sourceTable.getName())
+        .basePath(sourceTable.getBasePath())
+        .hadoopConf(this.hadoopConf)
+        .partitionSpecExtractor(sourcePartitionSpecExtractor)
+        .partitionValueExtractor(partitionValueExtractor)
+        .build();
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
similarity index 56%
copy from 
xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
copy to 
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
index 7bc41a10..ddbdccba 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java
@@ -16,18 +16,22 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.hudi;
+package org.apache.xtable.parquet;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import lombok.AllArgsConstructor;
 
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
 import org.apache.xtable.schema.SchemaFieldFinder;
 
 /**
@@ -35,38 +39,55 @@ import org.apache.xtable.schema.SchemaFieldFinder;
  * path:type:format for date types or path:type for value types.
  */
 @AllArgsConstructor
-public class ConfigurationBasedPartitionSpecExtractor implements 
HudiSourcePartitionSpecExtractor {
-  private final HudiSourceConfig config;
+public class ParquetPartitionSpecExtractor implements 
PathBasedPartitionSpecExtractor {
+  private static final ParquetPartitionSpecExtractor INSTANCE =
+      new ParquetPartitionSpecExtractor(new ArrayList<>());
+  private List<PartitionFieldSpec> partitionFieldSpecs;
+
+  public static ParquetPartitionSpecExtractor getInstance() {
+    return INSTANCE;
+  }
 
   @Override
   public List<InternalPartitionField> spec(InternalSchema tableSchema) {
-    List<InternalPartitionField> partitionFields =
-        new ArrayList<>(config.getPartitionFieldSpecs().size());
-    for (HudiSourceConfig.PartitionFieldSpec fieldSpec : 
config.getPartitionFieldSpecs()) {
+    List<InternalPartitionField> partitionFields = new 
ArrayList<>(partitionFieldSpecs.size());
+    for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
       InternalField sourceField =
           SchemaFieldFinder.getInstance()
               .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
       partitionFields.add(
           InternalPartitionField.builder()
               .sourceField(sourceField)
+              
.partitionFieldNames(getListPartitionNamesFromFormatInput(fieldSpec.getFormat()))
               .transformType(fieldSpec.getTransformType())
               .build());
     }
     return partitionFields;
   }
 
+  public List<String> getListPartitionNamesFromFormatInput(String inputFormat) 
{
+    return Arrays.stream(inputFormat.split("/"))
+        .map(s -> s.split("=")[0])
+        .collect(Collectors.toList());
+  }
+
+  public List<String> getListPartitionValuesFromFormatInput(String 
inputFormat) {
+    return Arrays.stream(inputFormat.split("/"))
+        .map(s -> s.split("=")[1])
+        .collect(Collectors.toList());
+  }
+
   @Override
   public Map<String, String> getPathToPartitionFieldFormat() {
     Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
-    config
-        .getPartitionFieldSpecs()
-        .forEach(
-            partitionFieldSpec -> {
-              if (partitionFieldSpec.getFormat() != null) {
-                pathToPartitionFieldFormat.put(
-                    partitionFieldSpec.getSourceFieldPath(), 
partitionFieldSpec.getFormat());
-              }
-            });
+
+    partitionFieldSpecs.forEach(
+        partitionFieldSpec -> {
+          if (partitionFieldSpec.getFormat() != null) {
+            pathToPartitionFieldFormat.put(
+                partitionFieldSpec.getSourceFieldPath(), 
partitionFieldSpec.getFormat());
+          }
+        });
     return pathToPartitionFieldFormat;
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
new file mode 100644
index 00000000..ab4ceaf4
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import lombok.NonNull;
+
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.PartitionValuesExtractorException;
+import org.apache.xtable.hudi.PathBasedPartitionValuesExtractor;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Partition value extractor for Parquet. */
+public class ParquetPartitionValueExtractor extends 
PathBasedPartitionValuesExtractor {
+  private static final ParquetPartitionValueExtractor INSTANCE =
+      new ParquetPartitionValueExtractor(Collections.emptyMap());
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+  private static final ParquetPartitionSpecExtractor partitionSpecExtractor =
+      ParquetPartitionSpecExtractor.getInstance();
+
+  public ParquetPartitionValueExtractor(@NonNull Map<String, String> 
pathToPartitionFieldFormat) {
+    super(pathToPartitionFieldFormat);
+  }
+
+  public List<PartitionValue> extractPartitionValues(
+      List<InternalPartitionField> partitionColumns, String partitionPath) {
+    String currentDateValue = "";
+    List<String> parsedDateValues = new ArrayList<>();
+    if (partitionColumns.size() == 0) {
+      return Collections.emptyList();
+    }
+    int totalNumberOfPartitions = 
partitionColumns.get(0).getPartitionFieldNames().size();
+    List<PartitionValue> result = new ArrayList<>(totalNumberOfPartitions);
+    String remainingPartitionPath = partitionPath;
+    for (InternalPartitionField partitionField : partitionColumns) {
+      int index = 0;
+      for (String partitionFieldName : 
partitionField.getPartitionFieldNames()) {
+        if (remainingPartitionPath.startsWith(partitionFieldName + "=")) {
+          remainingPartitionPath =
+              remainingPartitionPath.substring(partitionFieldName.length() + 
1);
+        }
+        // parsePartitionPath shouldn't do two things but only gets the 
remaining path
+        // to store the parsed value for the current partition field
+        currentDateValue = parsePartitionDateValue(partitionField, 
remainingPartitionPath, index);
+        parsedDateValues.add(currentDateValue);
+
+        remainingPartitionPath =
+            getRemainingPath(
+                remainingPartitionPath,
+                partitionSpecExtractor
+                    .getListPartitionValuesFromFormatInput(
+                        
pathToPartitionFieldFormat.get(partitionField.getSourceField().getName()))
+                    .get(index));
+        index++;
+      }
+      try {
+        result.add(
+            PartitionValue.builder()
+                .partitionField(partitionField)
+                .range(Range.scalar(computeSinceEpochValue(parsedDateValues, 
partitionField)))
+                .build());
+      } catch (ParseException e) {
+        throw new PartitionValuesExtractorException("Unable to parse date 
value", e);
+      }
+    }
+    return result;
+  }
+
+  protected String parsePartitionDateValue(
+      InternalPartitionField field, String remainingPath, int index) {
+    return parseDateValue(
+        remainingPath,
+        partitionSpecExtractor
+            .getListPartitionValuesFromFormatInput(
+                
pathToPartitionFieldFormat.get(field.getSourceField().getName()))
+            .get(index));
+  }
+
+  private long computeSinceEpochValue(
+      List<String> parsedDateValues, InternalPartitionField partitionField) 
throws ParseException {
+    // in the list of parsed values combine them using the standard way using 
hyphen symbol then
+    // parse the date into sinceEpoch
+    String combinedDate = String.join("-", parsedDateValues); // 
SimpleDateFormat works with hyphens
+    // convert to since Epoch
+    SimpleDateFormat simpleDateFormat =
+        new SimpleDateFormat(
+            String.join(
+                "-",
+                partitionSpecExtractor.getListPartitionValuesFromFormatInput(
+                    
pathToPartitionFieldFormat.get(partitionField.getSourceField().getName()))));
+    simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    return simpleDateFormat.parse(combinedDate).toInstant().toEpochMilli();
+  }
+
+  protected static String parseDateValue(String remainingPath, String format) {
+    return remainingPath.substring(0, format.length());
+  }
+
+  protected static String getRemainingPath(String remainingPath, String 
format) {
+    return remainingPath.substring(Math.min(remainingPath.length(), 
format.length() + 1));
+  }
+
+  public static ParquetPartitionValueExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public InternalSchema extractSchemaForParquetPartitions(ParquetMetadata 
footer, String path) {
+    MessageType parquetSchema = parquetMetadataExtractor.getSchema(footer);
+    return schemaExtractor.toInternalSchema(parquetSchema, path);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
index 9043ac0d..ed6cf284 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -97,6 +97,11 @@ public class ParquetSchemaExtractor {
       primitiveType = schema.asPrimitiveType();
       switch (primitiveType.getPrimitiveTypeName()) {
           // PrimitiveTypes
+        case INT96: // TODO check logicaltypes of INT96
+          metadata.put(
+              InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.NANOS);
+          newDataType = InternalType.TIMESTAMP;
+          break;
         case INT64:
           logicalType = schema.getLogicalTypeAnnotation();
           if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
@@ -169,6 +174,9 @@ public class ParquetSchemaExtractor {
             newDataType = InternalType.INT;
           }
           break;
+        case DOUBLE:
+          newDataType = InternalType.DOUBLE;
+          break;
         case FLOAT:
           newDataType = InternalType.FLOAT;
           break;
@@ -210,7 +218,6 @@ public class ParquetSchemaExtractor {
                 InternalSchema.MetadataKey.DECIMAL_SCALE,
                 ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) 
logicalType).getScale());
             newDataType = InternalType.DECIMAL;
-
           } else {
             newDataType = InternalType.BYTES;
           }
@@ -270,12 +277,10 @@ public class ParquetSchemaExtractor {
             .fields(valueSchema.getFields())
             .build();
       } else {
-
         subFields = new ArrayList<>(schema.asGroupType().getFields().size());
         for (Type parquetField : schema.asGroupType().getFields()) {
           String fieldName = parquetField.getName();
           Type.ID fieldId = parquetField.getId();
-          currentRepetition = parquetField.getRepetition();
           InternalSchema subFieldSchema =
               toInternalSchema(
                   parquetField, SchemaUtils.getFullyQualifiedPath(parentPath, 
fieldName));
@@ -284,6 +289,7 @@ public class ParquetSchemaExtractor {
               == 1) { // TODO Tuple (many subelements in a list)
             newDataType = subFieldSchema.getDataType();
             elementName = subFieldSchema.getName();
+            // subFields = subFieldSchema.getFields();
             break;
           }
           subFields.add(
@@ -295,15 +301,18 @@ public class ParquetSchemaExtractor {
                   .fieldId(fieldId == null ? null : fieldId.intValue())
                   .build());
         }
-        if (currentRepetition != Repetition.REPEATED
-            && schema.asGroupType().getName() != "list"
+        // RECORD Type (non-nullable elements)
+        if (!schema.asGroupType().getName().equals("list")
             && !Arrays.asList("key_value", 
"map").contains(schema.asGroupType().getName())) {
+          boolean isNullable =
+              subFields.stream().anyMatch(ele -> ele.getSchema().isNullable())
+                  && isNullable(schema.asGroupType());
           return InternalSchema.builder()
               .name(schema.getName())
               .comment(null)
               .dataType(InternalType.RECORD)
               .fields(subFields)
-              .isNullable(isNullable(schema.asGroupType()))
+              .isNullable(isNullable)
               .build();
         }
       }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
similarity index 64%
copy from xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
copy to 
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
index 606b9281..c1ab594c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSourceConfig.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.hudi;
+package org.apache.xtable.parquet;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,45 +28,35 @@ import lombok.Value;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.PartitionFieldSpec;
 import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.reflection.ReflectionUtils;
 
-/** Configuration of Hudi source format for the sync process. */
+/** Configuration of Parquet source format for the sync process. */
 @Value
-public class HudiSourceConfig {
+public class ParquetSourceConfig {
   public static final String PARTITION_SPEC_EXTRACTOR_CLASS =
-      "xtable.hudi.source.partition_spec_extractor_class";
+      "xtable.parquet.source.partition_spec_extractor_class";
   public static final String PARTITION_FIELD_SPEC_CONFIG =
-      "xtable.hudi.source.partition_field_spec_config";
+      "xtable.parquet.source.partition_field_spec_config";
 
   String partitionSpecExtractorClass;
   List<PartitionFieldSpec> partitionFieldSpecs;
 
-  public static HudiSourceConfig fromPartitionFieldSpecConfig(String 
partitionFieldSpecConfig) {
-    return new HudiSourceConfig(
-        ConfigurationBasedPartitionSpecExtractor.class.getName(),
-        parsePartitionFieldSpecs(partitionFieldSpecConfig));
-  }
-
-  public static HudiSourceConfig fromProperties(Properties properties) {
+  public static ParquetSourceConfig fromProperties(Properties properties) {
     String partitionSpecExtractorClass =
         properties.getProperty(
-            PARTITION_SPEC_EXTRACTOR_CLASS,
-            ConfigurationBasedPartitionSpecExtractor.class.getName());
+            PARTITION_SPEC_EXTRACTOR_CLASS, 
ParquetPartitionSpecExtractor.class.getName());
+
     String partitionFieldSpecString = 
properties.getProperty(PARTITION_FIELD_SPEC_CONFIG);
+
     List<PartitionFieldSpec> partitionFieldSpecs =
         parsePartitionFieldSpecs(partitionFieldSpecString);
-    return new HudiSourceConfig(partitionSpecExtractorClass, 
partitionFieldSpecs);
-  }
-
-  @Value
-  static class PartitionFieldSpec {
-    String sourceFieldPath;
-    PartitionTransformType transformType;
-    String format;
+    return new ParquetSourceConfig(partitionSpecExtractorClass, 
partitionFieldSpecs);
   }
 
-  private static List<PartitionFieldSpec> parsePartitionFieldSpecs(String 
input) {
+  public static List<PartitionFieldSpec> parsePartitionFieldSpecs(String 
input) {
     if (input == null || input.isEmpty()) {
       return Collections.emptyList();
     }
@@ -84,9 +74,10 @@ public class HudiSourceConfig {
     return partitionFields;
   }
 
-  public HudiSourcePartitionSpecExtractor loadSourcePartitionSpecExtractor() {
+  public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() {
     Preconditions.checkNotNull(
-        partitionSpecExtractorClass, "HudiSourcePartitionSpecExtractor class 
not provided");
-    return ReflectionUtils.createInstanceOfClass(partitionSpecExtractorClass, 
this);
+        this.partitionSpecExtractorClass, "PathBasedPartitionSpecExtractor 
class not provided");
+    return ReflectionUtils.createInstanceOfClass(
+        this.partitionSpecExtractorClass, this.getPartitionFieldSpecs());
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
new file mode 100644
index 00000000..e5fd2d07
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsConverterUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ParquetStatsConverterUtil {
+  public static Object convertStatBinaryTypeToLogicalType(
+      ColumnChunkMetaData columnMetaData, boolean isMin) {
+    Object returnedObj = null;
+    PrimitiveType primitiveType = columnMetaData.getPrimitiveType();
+    switch (primitiveType.getPrimitiveTypeName()) {
+      case BINARY: // TODO check if other primitiveType' needs to be handled 
as well
+        if (primitiveType.getLogicalTypeAnnotation() != null) {
+          if (columnMetaData
+              .getPrimitiveType()
+              .getLogicalTypeAnnotation()
+              .toString()
+              .equals("STRING")) {
+            returnedObj =
+                new String(
+                    (isMin
+                            ? (Binary) 
columnMetaData.getStatistics().genericGetMin()
+                            : (Binary) 
columnMetaData.getStatistics().genericGetMax())
+                        .getBytes(),
+                    StandardCharsets.UTF_8);
+          } else {
+            returnedObj =
+                isMin
+                    ? columnMetaData.getStatistics().genericGetMin()
+                    : columnMetaData.getStatistics().genericGetMax();
+          }
+        } else {
+          returnedObj =
+              isMin
+                  ? columnMetaData.getStatistics().genericGetMin()
+                  : columnMetaData.getStatistics().genericGetMax();
+        }
+        break;
+      default:
+        returnedObj =
+            isMin
+                ? columnMetaData.getStatistics().genericGetMin()
+                : columnMetaData.getStatistics().genericGetMax();
+        // TODO JSON and DECIMAL... of BINARY primitiveType
+    }
+    return returnedObj;
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
index 8d02b392..a1393b98 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -36,6 +36,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.stat.PartitionValue;
@@ -55,12 +56,15 @@ public class ParquetStatsExtractor {
   private static final ParquetMetadataExtractor parquetMetadataExtractor =
       ParquetMetadataExtractor.getInstance();
 
-  // private static final InputPartitionFields partitions = null;
-
   public static ParquetStatsExtractor getInstance() {
     return INSTANCE;
   }
 
+  private static final ParquetPartitionValueExtractor partitionValueExtractor =
+      ParquetPartitionValueExtractor.getInstance();
+  private static PathBasedPartitionSpecExtractor partitionSpecExtractor =
+      ParquetPartitionSpecExtractor.getInstance();
+
   public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata 
footer) {
     return getStatsForFile(footer).values().stream()
         .flatMap(List::stream)
@@ -112,39 +116,37 @@ public class ParquetStatsExtractor {
                                 .totalSize(columnMetaData.getTotalSize())
                                 .range(
                                     Range.vector(
-                                        
columnMetaData.getStatistics().genericGetMin(),
-                                        
columnMetaData.getStatistics().genericGetMax()))
+                                        ParquetStatsConverterUtil
+                                            
.convertStatBinaryTypeToLogicalType(
+                                                columnMetaData,
+                                                true), // if stats are string 
convert to
+                                        // litteraly a string stat and
+                                        // store to range
+                                        ParquetStatsConverterUtil
+                                            
.convertStatBinaryTypeToLogicalType(
+                                                columnMetaData, false)))
                                 .build(),
                         Collectors.toList())));
     return columnDescStats;
   }
 
-  /*  private static InputPartitionFields initPartitionInfo() {
-    return partitions;
-  }*/
-
   public static InternalDataFile toInternalDataFile(Configuration hadoopConf, 
Path parentPath)
       throws IOException {
-    FileStatus file = null;
-    List<PartitionValue> partitionValues = null;
-    ParquetMetadata footer = null;
-    List<ColumnStat> columnStatsForAFile = null;
-    try {
-      FileSystem fs = FileSystem.get(hadoopConf);
-      file = fs.getFileStatus(parentPath);
-      // InputPartitionFields partitionInfo = initPartitionInfo();
-      footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parentPath);
-      MessageType schema = parquetMetadataExtractor.getSchema(footer);
-      columnStatsForAFile = getColumnStatsForaFile(footer);
-      // partitionValues = partitionExtractor.createPartitionValues(
-      //               partitionInfo);
-    } catch (java.io.IOException e) {
-
-    }
+    FileSystem fs = FileSystem.get(hadoopConf);
+    FileStatus file = fs.getFileStatus(parentPath);
+    ParquetMetadata footer = 
parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath);
+    List<ColumnStat> columnStatsForAFile = getColumnStatsForaFile(footer);
+    List<PartitionValue> partitionValues =
+        partitionValueExtractor.extractPartitionValues(
+            partitionSpecExtractor.spec(
+                partitionValueExtractor.extractSchemaForParquetPartitions(
+                    parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
file.getPath()),
+                    file.getPath().toString())),
+            parentPath.toString());
     return InternalDataFile.builder()
         .physicalPath(parentPath.toString())
         .fileFormat(FileFormat.APACHE_PARQUET)
-        // .partitionValues(partitionValues)
+        .partitionValues(partitionValues)
         .fileSizeBytes(file.getLen())
         .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
         .columnStats(columnStatsForAFile)
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java 
b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
index 9e965c9d..68b40d35 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/reflection/ReflectionUtils.java
@@ -18,10 +18,10 @@
  
 package org.apache.xtable.reflection;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.util.Arrays;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -43,17 +43,28 @@ public class ReflectionUtils {
       if (constructorArgs.length == 0) {
         return clazz.newInstance();
       }
-      Class<?>[] constructorArgTypes =
-          
Arrays.stream(constructorArgs).map(Object::getClass).toArray(Class[]::new);
-      if (hasConstructor(clazz, constructorArgTypes)) {
-        return 
clazz.getConstructor(constructorArgTypes).newInstance(constructorArgs);
-      } else {
-        return clazz.newInstance();
+      for (Constructor<?> constructor : clazz.getConstructors()) {
+        Class<?>[] parameterTypes = constructor.getParameterTypes();
+        if (parameterTypes.length == constructorArgs.length) {
+          boolean matches = true;
+          for (int i = 0; i < parameterTypes.length; i++) {
+            if 
(!parameterTypes[i].isAssignableFrom(constructorArgs[i].getClass())) {
+              matches = false;
+              break;
+            }
+          }
+          if (matches) {
+            return (T) constructor.newInstance(constructorArgs);
+          }
+        }
       }
+      throw new NoSuchMethodException(
+          "Could not find a suitable constructor for class: " + className);
+
     } catch (InstantiationException
         | IllegalAccessException
-        | NoSuchMethodException
-        | InvocationTargetException e) {
+        | InvocationTargetException
+        | NoSuchMethodException e) {
       throw new ConfigurationException("Unable to load class: " + className, 
e);
     }
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java 
b/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
index 78b483ac..daadfef9 100644
--- a/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
+++ b/xtable-core/src/main/java/org/apache/xtable/schema/SchemaFieldFinder.java
@@ -45,7 +45,8 @@ public class SchemaFieldFinder {
    * @return the field if it exists, otherwise returns null
    */
   public InternalField findFieldByPath(InternalSchema schema, String path) {
-    return findFieldByPath(schema, path.split("\\."), 0);
+    int startIndex = 0;
+    return findFieldByPath(schema, path.split("\\."), startIndex);
   }
 
   private InternalField findFieldByPath(InternalSchema schema, String[] 
pathParts, int startIndex) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java 
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index db7e776b..14395e0d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -21,6 +21,7 @@ package org.apache.xtable;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PARQUET;
 
 import java.nio.file.Path;
 import java.util.Arrays;
@@ -32,6 +33,8 @@ import org.apache.spark.sql.SparkSession;
 
 import org.apache.hudi.common.model.HoodieTableType;
 
+import org.apache.xtable.parquet.TestSparkParquetTable;
+
 public interface GenericTable<T, Q> extends AutoCloseable {
   // A list of values for the level field which serves as a basic field to 
partition on for tests
   List<String> LEVEL_VALUES = Arrays.asList("INFO", "WARN", "ERROR");
@@ -76,6 +79,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
       String sourceFormat,
       boolean isPartitioned) {
     switch (sourceFormat) {
+      case PARQUET:
+        return TestSparkParquetTable.forStandardSchemaAndPartitioning(
+            tableName, tempDir, jsc, isPartitioned);
       case HUDI:
         return TestSparkHudiTable.forStandardSchemaAndPartitioning(
             tableName, tempDir, jsc, isPartitioned);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index dc90d4d5..bda54c0f 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -24,6 +24,7 @@ import static 
org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PARQUET;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
@@ -485,7 +486,11 @@ public class ITConversionController {
 
   private static List<String> getOtherFormats(String sourceTableFormat) {
     return Arrays.stream(TableFormat.values())
-        .filter(format -> !format.equals(sourceTableFormat))
+        .filter(
+            format ->
+                !format.equals(sourceTableFormat)
+                    && !format.equals(
+                        PARQUET)) // excluded file formats because upset, 
insert etc. not supported
         .collect(Collectors.toList());
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
index 5dd00174..b1cba5c7 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
@@ -693,9 +693,9 @@ public class ITHudiConversionSource {
             .setBasePath(basePath)
             .setLoadActiveTimelineOnLoad(true)
             .build();
-    HudiSourcePartitionSpecExtractor partitionSpecExtractor =
+    PathBasedPartitionSpecExtractor partitionSpecExtractor =
         new ConfigurationBasedPartitionSpecExtractor(
-            
HudiSourceConfig.fromPartitionFieldSpecConfig(xTablePartitionConfig));
+            HudiSourceConfig.parsePartitionFieldSpecs(xTablePartitionConfig));
     return new HudiConversionSource(hoodieTableMetaClient, 
partitionSpecExtractor);
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
index c5915b9e..4d3faa50 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiPartitionValuesExtractor.java
@@ -74,7 +74,7 @@ public class TestHudiPartitionValuesExtractor {
             
PartitionValue.builder().partitionField(column).range(Range.scalar("foo")).build());
 
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(Collections.singletonList(column), "foo");
     Assertions.assertEquals(expected, actual);
   }
@@ -100,7 +100,7 @@ public class TestHudiPartitionValuesExtractor {
             
PartitionValue.builder().partitionField(column).range(Range.scalar("foo/bar")).build());
 
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(Collections.singletonList(column), 
"foo/bar");
     Assertions.assertEquals(expected, actual);
   }
@@ -239,7 +239,7 @@ public class TestHudiPartitionValuesExtractor {
     Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
     pathToPartitionFieldFormat.put(column.getSourceField().getPath(), format);
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+        new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
             .extractPartitionValues(Collections.singletonList(column), 
partitionString);
     Assertions.assertEquals(expected, actual);
   }
@@ -298,7 +298,7 @@ public class TestHudiPartitionValuesExtractor {
     Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
     pathToPartitionFieldFormat.put(column2.getSourceField().getPath(), 
"yyyy/MM/dd");
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+        new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
             .extractPartitionValues(Arrays.asList(column1, column2, column3), 
"foo/2022/10/02/32");
     Assertions.assertEquals(expected, actual);
   }
@@ -350,7 +350,7 @@ public class TestHudiPartitionValuesExtractor {
     Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
     pathToPartitionFieldFormat.put(column2.getSourceField().getPath(), 
"yyyy-MM-dd");
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+        new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
             .extractPartitionValues(
                 Arrays.asList(column1, column2, column3), 
"foo/__HIVE_DEFAULT_PARTITION__/32");
     Assertions.assertEquals(expected, actual);
@@ -388,7 +388,7 @@ public class TestHudiPartitionValuesExtractor {
             
PartitionValue.builder().partitionField(column2).range(Range.scalar(32L)).build());
 
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(Arrays.asList(column1, column2), 
"column1=foo/column2=32");
     Assertions.assertEquals(expected, actual);
   }
@@ -425,7 +425,7 @@ public class TestHudiPartitionValuesExtractor {
             
PartitionValue.builder().partitionField(column1).range(Range.scalar(null)).build());
 
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(
                 Arrays.asList(column2, column1), 
"column2=32/column1=__HIVE_DEFAULT_PARTITION__");
     Assertions.assertEquals(expected, actual);
@@ -461,14 +461,14 @@ public class TestHudiPartitionValuesExtractor {
     Assertions.assertThrows(
         PartitionValuesExtractorException.class,
         () ->
-            new HudiPartitionValuesExtractor(Collections.emptyMap())
+            new PathBasedPartitionValuesExtractor(Collections.emptyMap())
                 .extractPartitionValues(Arrays.asList(column1, column2), 
"foo"));
   }
 
   @Test
   public void testNoPartitionColumnsConfigured() {
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(Collections.emptyList(), 
"column1=foo/column2=32");
     Assertions.assertTrue(actual.isEmpty());
   }
@@ -476,7 +476,7 @@ public class TestHudiPartitionValuesExtractor {
   @Test
   public void testNullPartitionColumns() {
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(null, "column1=foo/column2=32");
     Assertions.assertTrue(actual.isEmpty());
   }
@@ -503,7 +503,7 @@ public class TestHudiPartitionValuesExtractor {
     Assertions.assertThrows(
         PartitionValuesExtractorException.class,
         () ->
-            new HudiPartitionValuesExtractor(pathToPartitionFieldFormat)
+            new PathBasedPartitionValuesExtractor(pathToPartitionFieldFormat)
                 .extractPartitionValues(Collections.singletonList(column), 
"2022-10-02"));
   }
 
@@ -564,7 +564,7 @@ public class TestHudiPartitionValuesExtractor {
             .collect(Collectors.toList());
 
     List<PartitionValue> actual =
-        new HudiPartitionValuesExtractor(Collections.emptyMap())
+        new PathBasedPartitionValuesExtractor(Collections.emptyMap())
             .extractPartitionValues(partitionFields, partitionPath);
     Assertions.assertEquals(expected, actual);
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
new file mode 100644
index 00000000..6dda9db1
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.model.sync.SyncMode;
+
+public class ITParquetConversionSource {
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.parquet.source.partition_field_spec_config";
+  @TempDir public static Path tempDir;
+  private static JavaSparkContext jsc;
+  private static SparkSession sparkSession;
+  private static StructType schema;
+
+  @BeforeAll
+  public static void setupOnce() {
+    SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+    sparkConf = HoodieReadClient.addHoodieSupport(sparkConf);
+    sparkConf.set("parquet.avro.write-old-list-structure", "false");
+    sparkConf.set("spark.sql.parquet.writeLegacyFormat", "false");
+    sparkConf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS");
+
+    sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+  }
+
+  @AfterAll
+  public static void teardown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+  }
+  // delimiter must be / and not - or any other one
+  private static Stream<Arguments> provideArgsForFilePartitionTesting() {
+    String partitionConfig = // "timestamp:YEAR:year=yyyy";
+        "timestamp:MONTH:year=yyyy/month=MM"; // or 
"timestamp:YEAR:year=yyyy", or //
+    // timestamp:DAY:year=yyyy/month=MM/day=dd
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, 
partitionConfig)));
+  }
+
+  private static TableFormatPartitionDataHolder buildArgsForPartition(
+      String sourceFormat,
+      List<String> targetFormats,
+      String hudiPartitionConfig,
+      String xTablePartitionConfig) {
+    return TableFormatPartitionDataHolder.builder()
+        .sourceTableFormat(sourceFormat)
+        .targetTableFormats(targetFormats)
+        .hudiSourceConfig(Optional.ofNullable(hudiPartitionConfig))
+        .xTablePartitionConfig(xTablePartitionConfig)
+        .build();
+  }
+
+  private static ConversionConfig getTableSyncConfig(
+      String sourceTableFormat,
+      SyncMode syncMode,
+      String tableName,
+      GenericTable table,
+      List<String> targetTableFormats,
+      String partitionConfig,
+      Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    if (partitionConfig != null) {
+      sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+    }
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(sourceTableFormat)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
+                        .metadataRetention(metadataRetention)
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
+
+  private static Stream<Arguments> provideArgsForFileNonPartitionTesting() {
+    String partitionConfig = null;
+    return Stream.of(
+        Arguments.of(
+            buildArgsForPartition(
+                PARQUET, Arrays.asList(ICEBERG, DELTA, HUDI), partitionConfig, 
partitionConfig)));
+  }
+
+  private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
+    if (sourceTableFormat.equalsIgnoreCase(PARQUET)) {
+      ConversionSourceProvider<Long> parquetConversionSourceProvider =
+          new ParquetConversionSourceProvider();
+      parquetConversionSourceProvider.init(jsc.hadoopConfiguration());
+      return parquetConversionSourceProvider;
+    } else {
+      throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideArgsForFileNonPartitionTesting")
+  public void testFileNonPartitionedData(
+      TableFormatPartitionDataHolder tableFormatPartitionDataHolder) throws 
URISyntaxException {
+    String tableName = getTableName();
+    String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
+    List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
+    String xTablePartitionConfig = 
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+    ConversionSourceProvider<?> conversionSourceProvider =
+        getConversionSourceProvider(sourceTableFormat);
+
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", true, 30.1, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(
+                2, "Bob", false, 24.6, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(
+                3, "Charlie", true, 35.2, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(
+                4, "David", false, 29.5, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(
+                5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis() 
+ 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("hasSiblings", 
DataTypes.BooleanType, false),
+              DataTypes.createStructField("age", DataTypes.DoubleType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    String dataPath = tempDir.toAbsolutePath().toString() + 
"/non_partitioned_data";
+    df.write().mode(SaveMode.Overwrite).parquet(dataPath);
+    GenericTable table;
+    table =
+        GenericTable.getInstance(
+            tableName, Paths.get(dataPath), sparkSession, jsc, 
sourceTableFormat, false);
+    try (GenericTable tableToClose = table) {
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.FULL,
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, 
targetTableFormats, false);
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideArgsForFilePartitionTesting")
+  public void testFilePartitionedData(TableFormatPartitionDataHolder 
tableFormatPartitionDataHolder)
+      throws URISyntaxException {
+    String tableName = getTableName();
+    String sourceTableFormat = 
tableFormatPartitionDataHolder.getSourceTableFormat();
+    List<String> targetTableFormats = 
tableFormatPartitionDataHolder.getTargetTableFormats();
+    String xTablePartitionConfig = 
tableFormatPartitionDataHolder.getXTablePartitionConfig();
+    ConversionSourceProvider<?> conversionSourceProvider =
+        getConversionSourceProvider(sourceTableFormat);
+    // create the data
+    List<Row> data =
+        Arrays.asList(
+            RowFactory.create(1, "Alice", true, 30.1, new 
Timestamp(System.currentTimeMillis())),
+            RowFactory.create(
+                2, "Bob", false, 24.6, new 
Timestamp(System.currentTimeMillis() + 1000)),
+            RowFactory.create(
+                3, "Charlie", true, 35.2, new 
Timestamp(System.currentTimeMillis() + 2000)),
+            RowFactory.create(
+                4, "David", false, 29.5, new 
Timestamp(System.currentTimeMillis() + 3000)),
+            RowFactory.create(
+                5, "Eve", true, 22.2, new Timestamp(System.currentTimeMillis() 
+ 4000)));
+
+    schema =
+        DataTypes.createStructType(
+            new StructField[] {
+              DataTypes.createStructField("id", DataTypes.IntegerType, false),
+              DataTypes.createStructField("name", DataTypes.StringType, false),
+              DataTypes.createStructField("hasSiblings", 
DataTypes.BooleanType, false),
+              DataTypes.createStructField("age", DataTypes.DoubleType, false),
+              DataTypes.createStructField(
+                  "timestamp",
+                  DataTypes.TimestampType,
+                  false,
+                  new MetadataBuilder().putString("precision", 
"millis").build())
+            });
+    Dataset<Row> df = sparkSession.createDataFrame(data, schema);
+    String dataPathPart = tempDir.toAbsolutePath() + "/partitioned_data";
+    df.withColumn("year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+        .withColumn(
+            "month",
+            
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), 
"MM"))
+        .write()
+        .mode(SaveMode.Overwrite)
+        .partitionBy("year", "month")
+        .parquet(dataPathPart);
+    GenericTable table;
+    table =
+        GenericTable.getInstance(
+            tableName, Paths.get(dataPathPart), sparkSession, jsc, 
sourceTableFormat, true);
+    try (GenericTable tableToClose = table) {
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.FULL,
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalenceWithFilter(sourceTableFormat, tableToClose, 
targetTableFormats, true);
+      // update the current parquet file data with another attribute the sync 
again
+      List<Row> dataToAppend =
+          Arrays.asList(
+              RowFactory.create(
+                  10,
+                  "BobAppended",
+                  false,
+                  70.3,
+                  new Timestamp(System.currentTimeMillis() + 1500)));
+
+      Dataset<Row> dfAppend = sparkSession.createDataFrame(dataToAppend, 
schema);
+      dfAppend
+          .withColumn(
+              "year", 
functions.year(functions.col("timestamp").cast(DataTypes.TimestampType)))
+          .withColumn(
+              "month",
+              
functions.date_format(functions.col("timestamp").cast(DataTypes.TimestampType), 
"MM"))
+          .write()
+          .mode(SaveMode.Append)
+          .partitionBy("year", "month")
+          .parquet(dataPathPart);
+      GenericTable tableAppend;
+      tableAppend =
+          GenericTable.getInstance(
+              tableName, Paths.get(dataPathPart), sparkSession, jsc, 
sourceTableFormat, true);
+      try (GenericTable tableToCloseAppended = tableAppend) {
+        ConversionConfig conversionConfigAppended =
+            getTableSyncConfig(
+                sourceTableFormat,
+                SyncMode.FULL,
+                tableName,
+                tableAppend,
+                targetTableFormats,
+                xTablePartitionConfig,
+                null);
+        ConversionController conversionControllerAppended =
+            new ConversionController(jsc.hadoopConfiguration());
+        conversionControllerAppended.sync(conversionConfigAppended, 
conversionSourceProvider);
+      }
+    }
+  }
+
+  private void checkDatasetEquivalenceWithFilter(
+      String sourceFormat,
+      GenericTable<?, ?> sourceTable,
+      List<String> targetFormats,
+      boolean isPartitioned)
+      throws URISyntaxException {
+    checkDatasetEquivalence(
+        sourceFormat,
+        sourceTable,
+        Collections.emptyMap(),
+        targetFormats,
+        Collections.emptyMap(),
+        null,
+        isPartitioned);
+  }
+
+  private void checkDatasetEquivalence(
+      String sourceFormat,
+      GenericTable<?, ?> sourceTable,
+      Map<String, String> sourceOptions,
+      List<String> targetFormats,
+      Map<String, Map<String, String>> targetOptions,
+      Integer expectedCount,
+      boolean isPartitioned)
+      throws URISyntaxException {
+    Dataset<Row> sourceRows =
+        sparkSession
+            .read()
+            .schema(schema)
+            .options(sourceOptions)
+            .option("recursiveFileLookup", "true")
+            .option("pathGlobFilter", "*.parquet")
+            .parquet(sourceTable.getDataPath());
+    Map<String, Dataset<Row>> targetRowsByFormat =
+        targetFormats.stream()
+            .collect(
+                Collectors.toMap(
+                    Function.identity(),
+                    targetFormat -> {
+                      Map<String, String> finalTargetOptions =
+                          targetOptions.getOrDefault(targetFormat, 
Collections.emptyMap());
+                      if (targetFormat.equals(HUDI)) {
+                        finalTargetOptions = new HashMap<>(finalTargetOptions);
+                        
finalTargetOptions.put(HoodieMetadataConfig.ENABLE.key(), "true");
+                        finalTargetOptions.put(
+                            
"hoodie.datasource.read.extract.partition.values.from.path", "true");
+                      }
+                      return sparkSession
+                          .read()
+                          .options(finalTargetOptions)
+                          .format(targetFormat.toLowerCase())
+                          .load(sourceTable.getDataPath());
+                    }));
+
+    String[] selectColumnsArr = schema.fieldNames();
+    List<String> dataset1Rows = 
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+
+    Set<Map.Entry<String, Dataset<Row>>> entrySet = 
targetRowsByFormat.entrySet();
+
+    for (Map.Entry<String, Dataset<Row>> entry : entrySet) {
+
+      String format = entry.getKey();
+
+      Dataset<Row> targetRows = entry.getValue();
+      targetRows.show();
+
+      List<String> dataset2Rows = 
targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+
+      assertEquals(
+          dataset1Rows.size(),
+          dataset2Rows.size(),
+          String.format(
+              "Datasets have different row counts when reading from Spark. 
Source: %s, Target: %s",
+              sourceFormat, format));
+
+      if (expectedCount != null) {
+        assertEquals(expectedCount, dataset1Rows.size());
+      } else {
+        assertFalse(dataset1Rows.isEmpty());
+      }
+      if (isPartitioned) { // discard Hudi case because the partitioned col 
values won't be equal
+        if (!format.equals("HUDI")) {
+          assertEquals(
+              dataset1Rows,
+              dataset2Rows,
+              String.format(
+                  "Datasets are not equivalent when reading from Spark. 
Source: %s, Target: %s",
+                  sourceFormat, format));
+        }
+      } else {
+        assertEquals(
+            dataset1Rows,
+            dataset2Rows,
+            String.format(
+                "Datasets are not equivalent when reading from Spark. Source: 
%s, Target: %s",
+                sourceFormat, format));
+      }
+    }
+  }
+
+  @Builder
+  @Value
+  private static class TableFormatPartitionDataHolder {
+    String sourceTableFormat;
+    List<String> targetTableFormats;
+    String xTablePartitionConfig;
+    Optional<String> hudiSourceConfig;
+    String filter;
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
index 13d2299f..a78e07bf 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -277,7 +277,7 @@ public class TestParquetSchemaExtractor {
         InternalSchema.builder()
             .name("my_record")
             .dataType(InternalType.RECORD)
-            .isNullable(true)
+            .isNullable(false)
             .fields(
                 Arrays.asList(
                     InternalField.builder()
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
index 7522c292..df0aba33 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -58,6 +58,7 @@ public class TestParquetStatsExtractor {
 
   private static final ParquetSchemaExtractor schemaExtractor =
       ParquetSchemaExtractor.getInstance();
+  private static final ParquetStatsExtractor statsExtractor = 
ParquetStatsExtractor.getInstance();
 
   @TempDir static java.nio.file.Path tempDir = Paths.get("./");
 
@@ -349,7 +350,7 @@ public class TestParquetStatsExtractor {
     Path hadoopPath = new Path(file.toURI());
     // statsExtractor toInternalDataFile testing
     InternalDataFile internalDataFile =
-        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+        statsExtractor.toInternalDataFile(configuration, hadoopPath);
     InternalDataFile testInternalFile =
         InternalDataFile.builder()
             .physicalPath(
@@ -378,7 +379,7 @@ public class TestParquetStatsExtractor {
     Path hadoopPath = new Path(file.toURI());
     // statsExtractor toInternalDataFile testing
     InternalDataFile internalDataFile =
-        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+        statsExtractor.toInternalDataFile(configuration, hadoopPath);
     InternalDataFile testInternalFile =
         InternalDataFile.builder()
             .physicalPath(
@@ -406,7 +407,7 @@ public class TestParquetStatsExtractor {
     Path hadoopPath = new Path(file.toURI());
     // statsExtractor toInternalDataFile testing
     InternalDataFile internalDataFile =
-        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+        statsExtractor.toInternalDataFile(configuration, hadoopPath);
     InternalDataFile testInternalFile =
         InternalDataFile.builder()
             .physicalPath(
@@ -435,7 +436,7 @@ public class TestParquetStatsExtractor {
     Path hadoopPath = new Path(file.toURI());
     // statsExtractor toInternalDataFile testing
     InternalDataFile internalDataFile =
-        ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+        statsExtractor.toInternalDataFile(configuration, hadoopPath);
     InternalDataFile testInternalFile =
         InternalDataFile.builder()
             .physicalPath(
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
new file mode 100644
index 00000000..67db30c4
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestSparkParquetTable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.xtable.GenericTable;
+
+public class TestSparkParquetTable implements GenericTable<Group, String> {
+  // Name of the table
+  protected String tableName;
+  // Base path for the table
+  protected String basePath;
+  protected String dataPath;
+  protected Path tempDir;
+  protected String partitionConfig;
+  protected MessageType schema;
+  protected List<String> partitionFieldNames;
+  private JavaSparkContext jsc;
+
+  private TestSparkParquetTable(
+      String name, Path tempDir, JavaSparkContext jsc, String partitionConfig) 
{
+    // initialize spark session
+    try {
+      this.tableName = name;
+      this.tempDir = tempDir;
+      this.partitionConfig = partitionConfig;
+      this.jsc = jsc;
+      this.basePath = initBasePath(tempDir, name);
+      this.dataPath = initBasePath(tempDir, name);
+    } catch (IOException ex) {
+      throw new UncheckedIOException("Unable to initialize Test Parquet File", 
ex);
+    }
+  }
+
+  public static TestSparkParquetTable forStandardSchemaAndPartitioning(
+      String tableName, Path tempDir, JavaSparkContext jsc, boolean 
isPartitioned) {
+    String partitionConfig = isPartitioned ? "level:SIMPLE" : null;
+    return new TestSparkParquetTable(tableName, tempDir, jsc, partitionConfig);
+  }
+
+  protected String initBasePath(Path tempDir, String tableName) throws 
IOException {
+    Path basePath = tempDir.resolve(tableName).getParent();
+    Files.createDirectories(basePath);
+    return basePath.toUri().toString();
+  }
+
+  public List<Group> insertRows(int numRows) {
+    return null;
+  }
+
+  public List<Group> insertRecordsForSpecialPartition(int numRows) {
+    return null;
+  }
+
+  public void upsertRows(List<Group> rows) {}
+
+  public void deleteRows(List<Group> rows) {}
+
+  public void deletePartition(String partitionValue) {}
+
+  public void deleteSpecialPartition() {}
+
+  public String getBasePath() {
+    return basePath;
+  }
+
+  public String getDataPath() {
+    return dataPath;
+  }
+
+  public String getMetadataPath() {
+    return null;
+  }
+
+  public String getOrderByColumn() {
+    return null;
+  }
+
+  public void close() {}
+
+  public void reload() {}
+  // TODO either create BASIC_SCHEMA = new Schema.Parser().parse(schema); or 
move this code out
+  public List<String> getColumnsToSelect() {
+    return 
schema.getFields().stream().map(Type::getName).collect(Collectors.toList());
+  }
+
+  public String getFilterQuery() {
+    return null;
+  }
+}

Reply via email to