the-other-tim-brown commented on code in PR #728:
URL: https://github.com/apache/incubator-xtable/pull/728#discussion_r2286724127


##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java:
##########
@@ -68,12 +72,28 @@ public Metrics toIceberg(Schema schema, long totalRowCount, 
List<ColumnStat> fie
           nullValueCounts.put(fieldId, columnStats.getNumNulls());
           Type fieldType = icebergField.type();
           if (columnStats.getRange().getMinValue() != null) {
-            lowerBounds.put(
-                fieldId, Conversions.toByteBuffer(fieldType, 
columnStats.getRange().getMinValue()));
+            if (fieldType.toString() == "string" && format == 
"APACHE_PARQUET") {

Review Comment:
   String comparisons should be with `.equals` the `==` checks if it is the 
same object.
   
   Why is format now required? We have a standardized abstraction for stats so 
this should not be required if the spec is followed.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+  @Builder.Default
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetStatsExtractor parquetStatsExtractor =
+      ParquetStatsExtractor.getInstance();
+
+  private final ParquetPartitionValueExtractor partitionValueExtractor;
+  private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  private InternalTable createInternalTableFromFile(LocatedFileStatus 
latestFile) {
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.getPath());
+    MessageType parquetSchema = 
parquetMetadataExtractor.getSchema(parquetMetadata);
+    InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, 
"");
+    List<InternalPartitionField> partitionFields = 
partitionSpecExtractor.spec(schema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+        .build();
+  }
+
+  private InternalTable getMostRecentTable(Collection<LocatedFileStatus> 
parquetFiles) {
+    LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+    return createInternalTableFromFile(latestFile);
+  }
+
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+    // get parquetFile at specific time modificationTime
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+    return createInternalTableFromFile(file);
+  }
+
+  private List<InternalDataFile> 
getInternalDataFiles(Collection<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.stream()
+        .map(
+            file ->
+                InternalDataFile.builder()
+                    .physicalPath(file.getPath().toString())
+                    .fileFormat(FileFormat.APACHE_PARQUET)
+                    .fileSizeBytes(file.getLen())
+                    .partitionValues(
+                        partitionValueExtractor.extractPartitionValues(
+                            partitionSpecExtractor.spec(
+                                
partitionValueExtractor.extractSchemaForParquetPartitions(
+                                    
parquetMetadataExtractor.readParquetMetadata(
+                                        hadoopConf, file.getPath()),
+                                    file.getPath().toString())),
+                            basePath))
+                    .lastModified(file.getModificationTime())
+                    .columnStats(
+                        parquetStatsExtractor.getColumnStatsForaFile(
+                            parquetMetadataExtractor.readParquetMetadata(
+                                hadoopConf, file.getPath())))
+                    .build())
+        .collect(Collectors.toList());
+  }
+
+  private InternalDataFile createInternalDataFileFromParquetFile(FileStatus 
parquetFile) {
+    return InternalDataFile.builder()
+        .physicalPath(parquetFile.getPath().toString())
+        .partitionValues(
+            partitionValueExtractor.extractPartitionValues(
+                partitionSpecExtractor.spec(
+                    partitionValueExtractor.extractSchemaForParquetPartitions(
+                        parquetMetadataExtractor.readParquetMetadata(
+                            hadoopConf, parquetFile.getPath()),
+                        parquetFile.getPath().toString())),
+                basePath))
+        .lastModified(parquetFile.getModificationTime())
+        .fileSizeBytes(parquetFile.getLen())
+        .columnStats(
+            parquetStatsExtractor.getColumnStatsForaFile(
+                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath())))
+        .build();
+  }
+
+  @Override
+  public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync 
syncInstants) {
+    // based on either table formats?
+    List<Long> commitsToProcess =
+        
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+    return 
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+  }
+
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+    List<FileStatus> tableChangesAfter =
+        parquetFiles.stream()
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    InternalTable internalTable = getMostRecentTable(parquetFiles);
+    for (FileStatus tableStatus : tableChangesAfter) {
+      InternalDataFile currentDataFile = 
createInternalDataFileFromParquetFile(tableStatus);
+      addedInternalDataFiles.add(currentDataFile);
+    }
+
+    return TableChange.builder()
+        .tableAsOfChange(internalTable)
+        
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+        .build();
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    return getMostRecentTable(parquetFiles);
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    List<InternalDataFile> internalDataFiles = 
getInternalDataFiles(parquetFiles);
+    InternalTable table = getMostRecentTable(parquetFiles);
+    return InternalSnapshot.builder()
+        .table(table)
+        .sourceIdentifier(getCommitIdentifier(1L)) // TODO check for version 
number instead
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+        .build();
+  }
+
+  private LocatedFileStatus 
getMostRecentParquetFile(Collection<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.stream()
+        .max(Comparator.comparing(FileStatus::getModificationTime))
+        .orElseThrow(() -> new IllegalStateException("No files found"));
+  }
+
+  private LocatedFileStatus getParquetFileAt(
+      Collection<LocatedFileStatus> parquetFiles, long modificationTime) {
+    return parquetFiles.stream()
+        .filter(fileStatus -> fileStatus.getModificationTime() == 
modificationTime)
+        .findFirst()
+        .orElseThrow(
+            () -> new IllegalStateException("No file found at " + 
Long.valueOf(modificationTime)));
+  }
+
+  private Collection<LocatedFileStatus> getParquetFiles(Configuration 
hadoopConf, String basePath) {

Review Comment:
   Can we update this to return a stream to avoid requiring all files to be 
loaded into memory at once?



##########
pom.xml:
##########
@@ -60,15 +60,17 @@
         <project.version>0.2.0-SNAPSHOT</project.version>
         <!-- To ensure build content is re-producible -->
         
<project.build.outputTimestamp>2025-01-01T00:00:00Z</project.build.outputTimestamp>
-        <maven.compiler.target>8</maven.compiler.target>
+        <maven.compiler.source>17</maven.compiler.source><!--should be 8 for 
CI-->

Review Comment:
   We need to keep the java8 targets 



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.util.*;
+
+import lombok.AllArgsConstructor;
+
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+/**
+ * Parses the InternalPartitionFields from a configured list of specs with the 
format
+ * path:type:format for date types or path:type for value types.
+ */
+@AllArgsConstructor
+public class ParquetPartitionSpecExtractor implements 
PathBasedPartitionSpecExtractor {
+  private static final ParquetPartitionSpecExtractor INSTANCE =
+      new ParquetPartitionSpecExtractor(new ArrayList<>());

Review Comment:
   How will the list get update for the table that is being processed?



##########
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java:
##########
@@ -81,40 +77,40 @@ public List<PartitionValue> extractPartitionValues(
   }
 
   private PartialResult parsePartitionPath(
-      InternalPartitionField field, String remainingPath, int 
totalNumberOfPartitions) {
+      InternalPartitionField field, String basePath, int 
totalNumberOfPartitions) {
     switch (field.getTransformType()) {
       case YEAR:
+        return parseDate(basePath, field.getSourceField().getPath(), "yyyy");
       case MONTH:
       case DAY:
       case HOUR:
         return parseDate(
-            remainingPath, 
pathToPartitionFieldFormat.get(field.getSourceField().getPath()));
+            basePath, field.getSourceField().getPath(), ""); // TODO adjust 
for other cases

Review Comment:
   Is this TODO completed?



##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java:
##########
@@ -373,6 +373,7 @@ private CommitState(
     public void setReplaceMetadata(BaseFileUpdatesExtractor.ReplaceMetadata 
replaceMetadata) {
       if (!writeStatuses.isEmpty() || !partitionToReplacedFileIds.isEmpty()) {
         throw new IllegalArgumentException("Replace metadata can only be set 
once");
+        // is of size 1

Review Comment:
   nitpick: remove this



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+  @Builder.Default
+  private static final ParquetSchemaExtractor schemaExtractor =
+      ParquetSchemaExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetMetadataExtractor parquetMetadataExtractor =
+      ParquetMetadataExtractor.getInstance();
+
+  @Builder.Default
+  private static final ParquetStatsExtractor parquetStatsExtractor =
+      ParquetStatsExtractor.getInstance();
+
+  private final ParquetPartitionValueExtractor partitionValueExtractor;
+  private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+  private final String tableName;
+  private final String basePath;
+  @NonNull private final Configuration hadoopConf;
+
+  private InternalTable createInternalTableFromFile(LocatedFileStatus 
latestFile) {
+    ParquetMetadata parquetMetadata =
+        parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
latestFile.getPath());
+    MessageType parquetSchema = 
parquetMetadataExtractor.getSchema(parquetMetadata);
+    InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema, 
"");
+    List<InternalPartitionField> partitionFields = 
partitionSpecExtractor.spec(schema);
+
+    DataLayoutStrategy dataLayoutStrategy =
+        partitionFields.isEmpty()
+            ? DataLayoutStrategy.FLAT
+            : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+    return InternalTable.builder()
+        .tableFormat(TableFormat.PARQUET)
+        .basePath(basePath)
+        .name(tableName)
+        .layoutStrategy(dataLayoutStrategy)
+        .partitioningFields(partitionFields)
+        .readSchema(schema)
+        
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+        .build();
+  }
+
+  private InternalTable getMostRecentTable(Collection<LocatedFileStatus> 
parquetFiles) {
+    LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+    return createInternalTableFromFile(latestFile);
+  }
+
+  @Override
+  public InternalTable getTable(Long modificationTime) {
+    // get parquetFile at specific time modificationTime
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+    return createInternalTableFromFile(file);
+  }
+
+  private List<InternalDataFile> 
getInternalDataFiles(Collection<LocatedFileStatus> parquetFiles) {
+    return parquetFiles.stream()
+        .map(
+            file ->
+                InternalDataFile.builder()
+                    .physicalPath(file.getPath().toString())
+                    .fileFormat(FileFormat.APACHE_PARQUET)
+                    .fileSizeBytes(file.getLen())
+                    .partitionValues(
+                        partitionValueExtractor.extractPartitionValues(
+                            partitionSpecExtractor.spec(
+                                
partitionValueExtractor.extractSchemaForParquetPartitions(
+                                    
parquetMetadataExtractor.readParquetMetadata(
+                                        hadoopConf, file.getPath()),
+                                    file.getPath().toString())),
+                            basePath))
+                    .lastModified(file.getModificationTime())
+                    .columnStats(
+                        parquetStatsExtractor.getColumnStatsForaFile(
+                            parquetMetadataExtractor.readParquetMetadata(
+                                hadoopConf, file.getPath())))
+                    .build())
+        .collect(Collectors.toList());
+  }
+
+  private InternalDataFile createInternalDataFileFromParquetFile(FileStatus 
parquetFile) {
+    return InternalDataFile.builder()
+        .physicalPath(parquetFile.getPath().toString())
+        .partitionValues(
+            partitionValueExtractor.extractPartitionValues(
+                partitionSpecExtractor.spec(
+                    partitionValueExtractor.extractSchemaForParquetPartitions(
+                        parquetMetadataExtractor.readParquetMetadata(
+                            hadoopConf, parquetFile.getPath()),
+                        parquetFile.getPath().toString())),
+                basePath))
+        .lastModified(parquetFile.getModificationTime())
+        .fileSizeBytes(parquetFile.getLen())
+        .columnStats(
+            parquetStatsExtractor.getColumnStatsForaFile(
+                parquetMetadataExtractor.readParquetMetadata(hadoopConf, 
parquetFile.getPath())))
+        .build();
+  }
+
+  @Override
+  public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync 
syncInstants) {
+    // based on either table formats?
+    List<Long> commitsToProcess =
+        
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+    return 
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+  }
+
+  @Override
+  public TableChange getTableChangeForCommit(Long modificationTime) {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+    List<FileStatus> tableChangesAfter =
+        parquetFiles.stream()
+            .filter(fileStatus -> fileStatus.getModificationTime() > 
modificationTime)
+            .collect(Collectors.toList());
+    InternalTable internalTable = getMostRecentTable(parquetFiles);
+    for (FileStatus tableStatus : tableChangesAfter) {
+      InternalDataFile currentDataFile = 
createInternalDataFileFromParquetFile(tableStatus);
+      addedInternalDataFiles.add(currentDataFile);
+    }
+
+    return TableChange.builder()
+        .tableAsOfChange(internalTable)
+        
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+        .build();
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    return getMostRecentTable(parquetFiles);
+  }
+
+  /**
+   * Here to get current snapshot listing all files hence the -1 is being 
passed
+   *
+   * @return
+   */
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf, 
basePath);
+    List<InternalDataFile> internalDataFiles = 
getInternalDataFiles(parquetFiles);
+    InternalTable table = getMostRecentTable(parquetFiles);
+    return InternalSnapshot.builder()
+        .table(table)
+        .sourceIdentifier(getCommitIdentifier(1L)) // TODO check for version 
number instead

Review Comment:
   we can make this the last time as millis?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+  @Builder.Default

Review Comment:
   The builder does not apply to static variables, only instance variables. 
This annotation should be removed or the variables should be made to instance 
variables



##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java:
##########
@@ -111,6 +114,10 @@ public String convertStatsToDeltaFormat(
             .maxValues(getMaxValues(validColumnStats))
             .nullCount(getNullCount(validColumnStats))
             .build();
+    // Binary type in Parquet stats serialization is handled with 
BinarySerializer()

Review Comment:
   Did you validate that this is how Delta is serializing the binary stats?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Delta 
Lake table format. */

Review Comment:
   I think the comment here needs to be updated, it is not for Delta Lake.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -270,12 +267,11 @@ public InternalSchema toInternalSchema(Type schema, 
String parentPath) {
             .fields(valueSchema.getFields())
             .build();
       } else {
-
         subFields = new ArrayList<>(schema.asGroupType().getFields().size());
         for (Type parquetField : schema.asGroupType().getFields()) {
           String fieldName = parquetField.getName();
           Type.ID fieldId = parquetField.getId();
-          currentRepetition = parquetField.getRepetition();
+          // currentRepetition = parquetField.getRepetition();

Review Comment:
   Why is this change required?



##########
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java:
##########
@@ -188,6 +306,10 @@ private ConversionSourceProvider<?> 
getConversionSourceProvider(String sourceTab
       throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
     }
   }
+  /*
+     test for Parquet file conversion

Review Comment:
   Is there a test case added?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import java.util.*;
+
+import lombok.AllArgsConstructor;
+
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+/**
+ * Parses the InternalPartitionFields from a configured list of specs with the 
format
+ * path:type:format for date types or path:type for value types.
+ */
+@AllArgsConstructor
+public class ParquetPartitionSpecExtractor implements 
PathBasedPartitionSpecExtractor {
+  private static final ParquetPartitionSpecExtractor INSTANCE =
+      new ParquetPartitionSpecExtractor(new ArrayList<>());
+  private final List<PartitionFieldSpec> partitionFieldSpecs;
+
+  public static ParquetPartitionSpecExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public List<InternalPartitionField> spec(InternalSchema tableSchema) {
+
+    List<InternalPartitionField> partitionFields = new 
ArrayList<>(partitionFieldSpecs.size());
+    for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
+      InternalField sourceField =
+          SchemaFieldFinder.getInstance()
+              .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
+      partitionFields.add(
+          InternalPartitionField.builder()
+              .sourceField(sourceField)
+              .transformType(fieldSpec.getTransformType())
+              .build());
+    }
+    return partitionFields;
+  }
+
+  @Override
+  public Map<String, String> getPathToPartitionFieldFormat() {
+    Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
+    // ParquetSourceConfig partitionConfig =
+    // 
ParquetSourceConfig.fromPartitionFieldSpecConfig(partitionFieldSpecConfig);

Review Comment:
   Remove this comment?



##########
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java:
##########
@@ -483,55 +605,6 @@ public void testTimeTravelQueries(String 
sourceTableFormat) throws Exception {
     }
   }
 
-  private static List<String> getOtherFormats(String sourceTableFormat) {
-    return Arrays.stream(TableFormat.values())
-        .filter(format -> !format.equals(sourceTableFormat))
-        .collect(Collectors.toList());
-  }
-
-  private static Stream<Arguments> provideArgsForPartitionTesting() {

Review Comment:
   A lot of the changes to this file are just moving methods around. Let's 
avoid doing this going forward unless it is required or makes the code a lot 
easier to read. It adds more overhead to the review for me to go line by line 
through this to check what changes there are to the test setups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to