the-other-tim-brown commented on code in PR #592:
URL: https://github.com/apache/incubator-xtable/pull/592#discussion_r1875033692


##########
xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java:
##########
@@ -27,8 +27,9 @@ public class TableFormat {
   public static final String HUDI = "HUDI";
   public static final String ICEBERG = "ICEBERG";
   public static final String DELTA = "DELTA";
+  public static final String PARQUET="PARQUET";

Review Comment:
   Please run `mvn spotless:apply` to clean up some of the formatting issues in 
the draft



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =

Review Comment:
   Is there an implementation for this class missing?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Delta 
Lake table format. */
+public class ParquetConversionSourceProvider extends 
ConversionSourceProvider<Long> {
+  @Override
+  public ParquetConversionSource getConversionSourceInstance(SourceTable 
sourceTable) {
+
+return ParquetConversionSource.builder()
+        .tableName(sourceTable.getName())
+        .basePath(sourceTable.getBasePath())
+        .hadoopConf(new Configuration())

Review Comment:
   there is an `init` method called with the hadoop configuration, you should 
be able to simply use `hadoopConf` here



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.xtable.parquet;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+public class ParquetPartitionHelper {

Review Comment:
   nitpick on naming, "helper" is a bit vague and ends up being a place where 
developers will dump all sorts of functionality. This is a 
`ParquetPartitionExtractor` based on the naming conventions we've used in other 
formats



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetPartitionHelper parquetPartitionHelper =
+      ParquetPartitionHelper.getInstance();
+
+  private Map<String, List<String>> initPartitionInfo() {
+    return fsHelper.getPartitionFromDirectoryStructure(
+        hadoopConf, basePath, Collections.emptyMap());
+  }
+
+  /**
+   * To infer schema getting the latest file assumption is that latest file 
will have new fields
+   *
+   * @param modificationTime the commit to consider for reading the table state
+   * @return
+   */
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+
+    Optional<LocatedFileStatus> latestFile =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .max(Comparator.comparing(FileStatus::getModificationTime));

Review Comment:
   Is there a way to push down this filter so we don't need to iterate through 
all files under the base path? Maybe we can even limit the file listing to 
return files created after the modificationTime?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.xtable.parquet;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+public class ParquetPartitionHelper {
+  private static final ParquetPartitionHelper INSTANCE = new 
ParquetPartitionHelper();
+
+  public static ParquetPartitionHelper getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> getInternalPartitionField(
+      Set<String> partitionList, InternalSchema schema) {
+    List<InternalPartitionField> partitionFields = new ArrayList<>();
+
+    for (String partitionKey : partitionList) {
+
+      partitionFields.add(
+          InternalPartitionField.builder()
+              
.sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, 
partitionKey))
+              .transformType(PartitionTransformType.VALUE)
+              .build());
+    }
+
+    return partitionFields;
+  }
+
+  // TODO logic is too complicated can be simplified
+  public List<PartitionValue> getPartitionValue(
+      String basePath,
+      String filePath,
+      InternalSchema schema,
+      Map<String, List<String>> partitionInfo) {
+    List<PartitionValue> partitionValues = new ArrayList<>();
+    java.nio.file.Path base = Paths.get(basePath).normalize();
+    java.nio.file.Path file = Paths.get(filePath).normalize();
+    java.nio.file.Path relative = base.relativize(file);
+    for (Map.Entry<String, List<String>> entry : partitionInfo.entrySet()) {
+      String key = entry.getKey();
+      List<String> values = entry.getValue();
+      for (String value : values) {
+        String pathCheck = key + "=" + value;
+        if (relative.startsWith(pathCheck)) {
+          System.out.println("Relative " + relative + " " + pathCheck);

Review Comment:
   Be sure to remove this println



##########
xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java:
##########
@@ -0,0 +1,81 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.util.*;

Review Comment:
   nitpick: we're avoiding the use of `*` imports in this repo



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetPartitionHelper parquetPartitionHelper =
+      ParquetPartitionHelper.getInstance();
+
+  private Map<String, List<String>> initPartitionInfo() {
+    return fsHelper.getPartitionFromDirectoryStructure(
+        hadoopConf, basePath, Collections.emptyMap());
+  }
+
+  /**
+   * To infer schema getting the latest file assumption is that latest file 
will have new fields
+   *
+   * @param modificationTime the commit to consider for reading the table state
+   * @return
+   */
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+
+    Optional<LocatedFileStatus> latestFile =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .max(Comparator.comparing(FileStatus::getModificationTime));
+
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.get().getPath());
+    Schema tableSchema =
+        new org.apache.parquet.avro.AvroSchemaConverter()
+            .convert(parquetMetadata.getFileMetaData().getSchema());
+
+    Set<String> partitionKeys = initPartitionInfo().keySet();
+
+    // merge schema of partition into original as partition is not part of 
parquet fie
+    if (!partitionKeys.isEmpty()) {
+      tableSchema = mergeAvroSchema(tableSchema, partitionKeys);
+    }
+    InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);
+
+    List<InternalPartitionField> partitionFields =
+        partitionKeys.isEmpty()
+            ? Collections.emptyList()
+            : parquetPartitionHelper.getInternalPartitionField(partitionKeys, 
schema);
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
+        .build();
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+
+    List<LocatedFileStatus> latestFile =
+        fsHelper.getParquetFiles(hadoopConf, 
basePath).collect(Collectors.toList());
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    InternalTable table = getTable(-1L);
+    List<InternalDataFile> internalDataFiles =
+        latestFile.stream()
+            .map(
+                file ->
+                    InternalDataFile.builder()
+                        .physicalPath(file.getPath().toString())
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .fileSizeBytes(file.getLen())
+                        .partitionValues(
+                            parquetPartitionHelper.getPartitionValue(
+                                basePath,
+                                file.getPath().toString(),
+                                table.getReadSchema(),
+                                partitionInfo))
+                        .lastModified(file.getModificationTime())
+                        .columnStats(
+                            parquetMetadataExtractor.getColumnStatsForaFile(
+                                hadoopConf, file, table))
+                        .build())
+            .collect(Collectors.toList());
+
+    return InternalSnapshot.builder()
+        .table(table)
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  /**
+   * Whenever new file is added , condition to get new file is listing files 
whose modification time
+   * is greater than previous ysnc
+   *
+   * @param modificationTime commit to capture table changes for.
+   * @return
+   */
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    List<FileStatus> tableChanges =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    // TODO avoid doing full list of directory to get schema , just argument 
of modification time
+    // needs to be tweaked
+    InternalTable internalTable = getTable(-1L);
+    Set<InternalDataFile> internalDataFiles = new HashSet<>();
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    for (FileStatus tableStatus : tableChanges) {
+      internalDataFiles.add(
+          InternalDataFile.builder()
+              .physicalPath(tableStatus.getPath().toString())
+              .partitionValues(
+                  parquetPartitionHelper.getPartitionValue(
+                      basePath,
+                      tableStatus.getPath().toString(),
+                      internalTable.getReadSchema(),
+                      partitionInfo))
+              .lastModified(tableStatus.getModificationTime())
+              .fileSizeBytes(tableStatus.getLen())
+              .columnStats(
+                  parquetMetadataExtractor.getColumnStatsForaFile(
+                      hadoopConf, tableStatus, internalTable))
+              .build());
+    }
+
+    return TableChange.builder()
+        .tableAsOfChange(internalTable)
+        
.filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build())
+        .build();
+  }
+
+  @Override
+  public CommitsBacklog<Long> getCommitsBacklog(
+      InstantsForIncrementalSync instantsForIncrementalSync) {
+
+    List<Long> commitsToProcess =
+        
Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli());
+
+    return 
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+  }
+
+  // TODO  Need to understnad how this needs to be implemented should _SUCCESS 
or .staging dir needs
+  // to be checked
+  @Override
+  public boolean isIncrementalSyncSafeFrom(Instant instant) {
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {}
+
+  private Schema mergeAvroSchema(Schema internalSchema, Set<String> 
parititonFields) {
+
+    SchemaBuilder.FieldAssembler<Schema> fieldAssembler =
+        SchemaBuilder.record(internalSchema.getName()).fields();
+    for (Schema.Field field : internalSchema.getFields()) {
+      fieldAssembler = 
fieldAssembler.name(field.name()).type(field.schema()).noDefault();

Review Comment:
   Can the internal schema have defaults? Can it also have docs on fields? 
those would be dropped with this code



##########
xtable-core/src/main/java/org/apache/xtable/parquet/FileSystemHelper.java:
##########
@@ -0,0 +1,81 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class FileSystemHelper {

Review Comment:
   I think it would be a good idea to define an interface for getting the 
parquet files for the table and changes since the last run. Right now this is 
all being done through file listing but we should consider a case where someone 
implements a way to poll the changes through s3 events. 



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetPartitionHelper parquetPartitionHelper =
+      ParquetPartitionHelper.getInstance();
+
+  private Map<String, List<String>> initPartitionInfo() {
+    return fsHelper.getPartitionFromDirectoryStructure(
+        hadoopConf, basePath, Collections.emptyMap());
+  }
+
+  /**
+   * To infer schema getting the latest file assumption is that latest file 
will have new fields
+   *
+   * @param modificationTime the commit to consider for reading the table state
+   * @return
+   */
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+
+    Optional<LocatedFileStatus> latestFile =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .max(Comparator.comparing(FileStatus::getModificationTime));
+
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.get().getPath());
+    Schema tableSchema =
+        new org.apache.parquet.avro.AvroSchemaConverter()
+            .convert(parquetMetadata.getFileMetaData().getSchema());
+
+    Set<String> partitionKeys = initPartitionInfo().keySet();
+
+    // merge schema of partition into original as partition is not part of 
parquet fie
+    if (!partitionKeys.isEmpty()) {
+      tableSchema = mergeAvroSchema(tableSchema, partitionKeys);
+    }
+    InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);
+
+    List<InternalPartitionField> partitionFields =
+        partitionKeys.isEmpty()
+            ? Collections.emptyList()
+            : parquetPartitionHelper.getInternalPartitionField(partitionKeys, 
schema);
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
+        .build();
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+
+    List<LocatedFileStatus> latestFile =
+        fsHelper.getParquetFiles(hadoopConf, 
basePath).collect(Collectors.toList());
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    InternalTable table = getTable(-1L);
+    List<InternalDataFile> internalDataFiles =
+        latestFile.stream()
+            .map(
+                file ->
+                    InternalDataFile.builder()
+                        .physicalPath(file.getPath().toString())
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .fileSizeBytes(file.getLen())
+                        .partitionValues(
+                            parquetPartitionHelper.getPartitionValue(
+                                basePath,
+                                file.getPath().toString(),
+                                table.getReadSchema(),
+                                partitionInfo))
+                        .lastModified(file.getModificationTime())
+                        .columnStats(
+                            parquetMetadataExtractor.getColumnStatsForaFile(
+                                hadoopConf, file, table))
+                        .build())
+            .collect(Collectors.toList());
+
+    return InternalSnapshot.builder()
+        .table(table)
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  /**
+   * Whenever new file is added , condition to get new file is listing files 
whose modification time
+   * is greater than previous ysnc
+   *
+   * @param modificationTime commit to capture table changes for.
+   * @return
+   */
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    List<FileStatus> tableChanges =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    // TODO avoid doing full list of directory to get schema , just argument 
of modification time
+    // needs to be tweaked
+    InternalTable internalTable = getTable(-1L);

Review Comment:
   This call will also scan the file system if I'm understanding this 
correctly, can we avoid that? It can be expensive for large tables



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionHelper.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.xtable.parquet;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+public class ParquetPartitionHelper {
+  private static final ParquetPartitionHelper INSTANCE = new 
ParquetPartitionHelper();
+
+  public static ParquetPartitionHelper getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> getInternalPartitionField(
+      Set<String> partitionList, InternalSchema schema) {
+    List<InternalPartitionField> partitionFields = new ArrayList<>();
+
+    for (String partitionKey : partitionList) {
+
+      partitionFields.add(
+          InternalPartitionField.builder()
+              
.sourceField(SchemaFieldFinder.getInstance().findFieldByPath(schema, 
partitionKey))
+              .transformType(PartitionTransformType.VALUE)
+              .build());
+    }
+
+    return partitionFields;
+  }
+
+  // TODO logic is too complicated can be simplified
+  public List<PartitionValue> getPartitionValue(
+      String basePath,
+      String filePath,
+      InternalSchema schema,
+      Map<String, List<String>> partitionInfo) {
+    List<PartitionValue> partitionValues = new ArrayList<>();
+    java.nio.file.Path base = Paths.get(basePath).normalize();

Review Comment:
   If you're always going to convert the basePath, you should try to find a way 
to convert it once in the caller and pass it in.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {

Review Comment:
   I think it may be a bit more robust if we use a time interval instead of a 
single `long` here. Then you will be able to draw a clear boundary for each run 
of the conversion source, what are you thoughts?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,209 @@
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  @Builder.Default
+  private static final AvroSchemaConverter schemaExtractor = 
AvroSchemaConverter.getInstance();
+
+  @Builder.Default private static final FileSystemHelper fsHelper = 
FileSystemHelper.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetPartitionHelper parquetPartitionHelper =
+      ParquetPartitionHelper.getInstance();
+
+  private Map<String, List<String>> initPartitionInfo() {
+    return fsHelper.getPartitionFromDirectoryStructure(
+        hadoopConf, basePath, Collections.emptyMap());
+  }
+
+  /**
+   * To infer schema getting the latest file assumption is that latest file 
will have new fields
+   *
+   * @param modificationTime the commit to consider for reading the table state
+   * @return
+   */
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+
+    Optional<LocatedFileStatus> latestFile =
+        fsHelper
+            .getParquetFiles(hadoopConf, basePath)
+            .max(Comparator.comparing(FileStatus::getModificationTime));
+
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.get().getPath());
+    Schema tableSchema =
+        new org.apache.parquet.avro.AvroSchemaConverter()
+            .convert(parquetMetadata.getFileMetaData().getSchema());
+
+    Set<String> partitionKeys = initPartitionInfo().keySet();
+
+    // merge schema of partition into original as partition is not part of 
parquet fie
+    if (!partitionKeys.isEmpty()) {
+      tableSchema = mergeAvroSchema(tableSchema, partitionKeys);
+    }
+    InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);
+
+    List<InternalPartitionField> partitionFields =
+        partitionKeys.isEmpty()
+            ? Collections.emptyList()
+            : parquetPartitionHelper.getInternalPartitionField(partitionKeys, 
schema);
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
+        .build();
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+
+    List<LocatedFileStatus> latestFile =
+        fsHelper.getParquetFiles(hadoopConf, 
basePath).collect(Collectors.toList());
+    Map<String, List<String>> partitionInfo = initPartitionInfo();
+    InternalTable table = getTable(-1L);
+    List<InternalDataFile> internalDataFiles =
+        latestFile.stream()
+            .map(
+                file ->
+                    InternalDataFile.builder()

Review Comment:
   Can the logic for this conversion move to a common method that can also be 
called from the `getTableChangeForCommit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to