the-other-tim-brown commented on code in PR #728:
URL: https://github.com/apache/incubator-xtable/pull/728#discussion_r2286724127
##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java:
##########
@@ -68,12 +72,28 @@ public Metrics toIceberg(Schema schema, long totalRowCount,
List<ColumnStat> fie
nullValueCounts.put(fieldId, columnStats.getNumNulls());
Type fieldType = icebergField.type();
if (columnStats.getRange().getMinValue() != null) {
- lowerBounds.put(
- fieldId, Conversions.toByteBuffer(fieldType,
columnStats.getRange().getMinValue()));
+ if (fieldType.toString() == "string" && format ==
"APACHE_PARQUET") {
Review Comment:
String comparisons should be with `.equals` the `==` checks if it is the
same object.
Why is format now required? We have a standardized abstraction for stats so
this should not be required if the spec is followed.
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+ @Builder.Default
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetStatsExtractor parquetStatsExtractor =
+ ParquetStatsExtractor.getInstance();
+
+ private final ParquetPartitionValueExtractor partitionValueExtractor;
+ private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+ private final String tableName;
+ private final String basePath;
+ @NonNull private final Configuration hadoopConf;
+
+ private InternalTable createInternalTableFromFile(LocatedFileStatus
latestFile) {
+ ParquetMetadata parquetMetadata =
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
latestFile.getPath());
+ MessageType parquetSchema =
parquetMetadataExtractor.getSchema(parquetMetadata);
+ InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema,
"");
+ List<InternalPartitionField> partitionFields =
partitionSpecExtractor.spec(schema);
+
+ DataLayoutStrategy dataLayoutStrategy =
+ partitionFields.isEmpty()
+ ? DataLayoutStrategy.FLAT
+ : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(schema)
+
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+ .build();
+ }
+
+ private InternalTable getMostRecentTable(Collection<LocatedFileStatus>
parquetFiles) {
+ LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+ return createInternalTableFromFile(latestFile);
+ }
+
+ @Override
+ public InternalTable getTable(Long modificationTime) {
+ // get parquetFile at specific time modificationTime
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+ return createInternalTableFromFile(file);
+ }
+
+ private List<InternalDataFile>
getInternalDataFiles(Collection<LocatedFileStatus> parquetFiles) {
+ return parquetFiles.stream()
+ .map(
+ file ->
+ InternalDataFile.builder()
+ .physicalPath(file.getPath().toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(file.getLen())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+
partitionValueExtractor.extractSchemaForParquetPartitions(
+
parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath()),
+ file.getPath().toString())),
+ basePath))
+ .lastModified(file.getModificationTime())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath())))
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ private InternalDataFile createInternalDataFileFromParquetFile(FileStatus
parquetFile) {
+ return InternalDataFile.builder()
+ .physicalPath(parquetFile.getPath().toString())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+ partitionValueExtractor.extractSchemaForParquetPartitions(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, parquetFile.getPath()),
+ parquetFile.getPath().toString())),
+ basePath))
+ .lastModified(parquetFile.getModificationTime())
+ .fileSizeBytes(parquetFile.getLen())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parquetFile.getPath())))
+ .build();
+ }
+
+ @Override
+ public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync
syncInstants) {
+ // based on either table formats?
+ List<Long> commitsToProcess =
+
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+ return
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Long modificationTime) {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+ List<FileStatus> tableChangesAfter =
+ parquetFiles.stream()
+ .filter(fileStatus -> fileStatus.getModificationTime() >
modificationTime)
+ .collect(Collectors.toList());
+ InternalTable internalTable = getMostRecentTable(parquetFiles);
+ for (FileStatus tableStatus : tableChangesAfter) {
+ InternalDataFile currentDataFile =
createInternalDataFileFromParquetFile(tableStatus);
+ addedInternalDataFiles.add(currentDataFile);
+ }
+
+ return TableChange.builder()
+ .tableAsOfChange(internalTable)
+
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+ .build();
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ return getMostRecentTable(parquetFiles);
+ }
+
+ /**
+ * Here to get current snapshot listing all files hence the -1 is being
passed
+ *
+ * @return
+ */
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ List<InternalDataFile> internalDataFiles =
getInternalDataFiles(parquetFiles);
+ InternalTable table = getMostRecentTable(parquetFiles);
+ return InternalSnapshot.builder()
+ .table(table)
+ .sourceIdentifier(getCommitIdentifier(1L)) // TODO check for version
number instead
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+ .build();
+ }
+
+ private LocatedFileStatus
getMostRecentParquetFile(Collection<LocatedFileStatus> parquetFiles) {
+ return parquetFiles.stream()
+ .max(Comparator.comparing(FileStatus::getModificationTime))
+ .orElseThrow(() -> new IllegalStateException("No files found"));
+ }
+
+ private LocatedFileStatus getParquetFileAt(
+ Collection<LocatedFileStatus> parquetFiles, long modificationTime) {
+ return parquetFiles.stream()
+ .filter(fileStatus -> fileStatus.getModificationTime() ==
modificationTime)
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalStateException("No file found at " +
Long.valueOf(modificationTime)));
+ }
+
+ private Collection<LocatedFileStatus> getParquetFiles(Configuration
hadoopConf, String basePath) {
Review Comment:
Can we update this to return a stream to avoid requiring all files to be
loaded into memory at once?
##########
pom.xml:
##########
@@ -60,15 +60,17 @@
<project.version>0.2.0-SNAPSHOT</project.version>
<!-- To ensure build content is re-producible -->
<project.build.outputTimestamp>2025-01-01T00:00:00Z</project.build.outputTimestamp>
- <maven.compiler.target>8</maven.compiler.target>
+ <maven.compiler.source>17</maven.compiler.source><!--should be 8 for
CI-->
Review Comment:
We need to keep the java8 targets
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.*;
+
+import lombok.AllArgsConstructor;
+
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+/**
+ * Parses the InternalPartitionFields from a configured list of specs with the
format
+ * path:type:format for date types or path:type for value types.
+ */
+@AllArgsConstructor
+public class ParquetPartitionSpecExtractor implements
PathBasedPartitionSpecExtractor {
+ private static final ParquetPartitionSpecExtractor INSTANCE =
+ new ParquetPartitionSpecExtractor(new ArrayList<>());
Review Comment:
How will the list get update for the table that is being processed?
##########
xtable-core/src/main/java/org/apache/xtable/hudi/PathBasedPartitionValuesExtractor.java:
##########
@@ -81,40 +77,40 @@ public List<PartitionValue> extractPartitionValues(
}
private PartialResult parsePartitionPath(
- InternalPartitionField field, String remainingPath, int
totalNumberOfPartitions) {
+ InternalPartitionField field, String basePath, int
totalNumberOfPartitions) {
switch (field.getTransformType()) {
case YEAR:
+ return parseDate(basePath, field.getSourceField().getPath(), "yyyy");
case MONTH:
case DAY:
case HOUR:
return parseDate(
- remainingPath,
pathToPartitionFieldFormat.get(field.getSourceField().getPath()));
+ basePath, field.getSourceField().getPath(), ""); // TODO adjust
for other cases
Review Comment:
Is this TODO completed?
##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java:
##########
@@ -373,6 +373,7 @@ private CommitState(
public void setReplaceMetadata(BaseFileUpdatesExtractor.ReplaceMetadata
replaceMetadata) {
if (!writeStatuses.isEmpty() || !partitionToReplacedFileIds.isEmpty()) {
throw new IllegalArgumentException("Replace metadata can only be set
once");
+ // is of size 1
Review Comment:
nitpick: remove this
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+ @Builder.Default
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetStatsExtractor parquetStatsExtractor =
+ ParquetStatsExtractor.getInstance();
+
+ private final ParquetPartitionValueExtractor partitionValueExtractor;
+ private final PathBasedPartitionSpecExtractor partitionSpecExtractor;
+ private final String tableName;
+ private final String basePath;
+ @NonNull private final Configuration hadoopConf;
+
+ private InternalTable createInternalTableFromFile(LocatedFileStatus
latestFile) {
+ ParquetMetadata parquetMetadata =
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
latestFile.getPath());
+ MessageType parquetSchema =
parquetMetadataExtractor.getSchema(parquetMetadata);
+ InternalSchema schema = schemaExtractor.toInternalSchema(parquetSchema,
"");
+ List<InternalPartitionField> partitionFields =
partitionSpecExtractor.spec(schema);
+
+ DataLayoutStrategy dataLayoutStrategy =
+ partitionFields.isEmpty()
+ ? DataLayoutStrategy.FLAT
+ : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(schema)
+
.latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+ .build();
+ }
+
+ private InternalTable getMostRecentTable(Collection<LocatedFileStatus>
parquetFiles) {
+ LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles);
+ return createInternalTableFromFile(latestFile);
+ }
+
+ @Override
+ public InternalTable getTable(Long modificationTime) {
+ // get parquetFile at specific time modificationTime
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime);
+ return createInternalTableFromFile(file);
+ }
+
+ private List<InternalDataFile>
getInternalDataFiles(Collection<LocatedFileStatus> parquetFiles) {
+ return parquetFiles.stream()
+ .map(
+ file ->
+ InternalDataFile.builder()
+ .physicalPath(file.getPath().toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(file.getLen())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+
partitionValueExtractor.extractSchemaForParquetPartitions(
+
parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath()),
+ file.getPath().toString())),
+ basePath))
+ .lastModified(file.getModificationTime())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath())))
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ private InternalDataFile createInternalDataFileFromParquetFile(FileStatus
parquetFile) {
+ return InternalDataFile.builder()
+ .physicalPath(parquetFile.getPath().toString())
+ .partitionValues(
+ partitionValueExtractor.extractPartitionValues(
+ partitionSpecExtractor.spec(
+ partitionValueExtractor.extractSchemaForParquetPartitions(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, parquetFile.getPath()),
+ parquetFile.getPath().toString())),
+ basePath))
+ .lastModified(parquetFile.getModificationTime())
+ .fileSizeBytes(parquetFile.getLen())
+ .columnStats(
+ parquetStatsExtractor.getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parquetFile.getPath())))
+ .build();
+ }
+
+ @Override
+ public CommitsBacklog<Long> getCommitsBacklog(InstantsForIncrementalSync
syncInstants) {
+ // based on either table formats?
+ List<Long> commitsToProcess =
+
Collections.singletonList(syncInstants.getLastSyncInstant().toEpochMilli());
+ return
CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
+ }
+
+ @Override
+ public TableChange getTableChangeForCommit(Long modificationTime) {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ Set<InternalDataFile> addedInternalDataFiles = new HashSet<>();
+
+ List<FileStatus> tableChangesAfter =
+ parquetFiles.stream()
+ .filter(fileStatus -> fileStatus.getModificationTime() >
modificationTime)
+ .collect(Collectors.toList());
+ InternalTable internalTable = getMostRecentTable(parquetFiles);
+ for (FileStatus tableStatus : tableChangesAfter) {
+ InternalDataFile currentDataFile =
createInternalDataFileFromParquetFile(tableStatus);
+ addedInternalDataFiles.add(currentDataFile);
+ }
+
+ return TableChange.builder()
+ .tableAsOfChange(internalTable)
+
.filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build())
+ .build();
+ }
+
+ @Override
+ public InternalTable getCurrentTable() {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ return getMostRecentTable(parquetFiles);
+ }
+
+ /**
+ * Here to get current snapshot listing all files hence the -1 is being
passed
+ *
+ * @return
+ */
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ Collection<LocatedFileStatus> parquetFiles = getParquetFiles(hadoopConf,
basePath);
+ List<InternalDataFile> internalDataFiles =
getInternalDataFiles(parquetFiles);
+ InternalTable table = getMostRecentTable(parquetFiles);
+ return InternalSnapshot.builder()
+ .table(table)
+ .sourceIdentifier(getCommitIdentifier(1L)) // TODO check for version
number instead
Review Comment:
we can make this the last time as millis?
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.NonNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.hudi.*;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.CommitsBacklog;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Builder
+public class ParquetConversionSource implements ConversionSource<Long> {
+ @Builder.Default
Review Comment:
The builder does not apply to static variables, only instance variables.
This annotation should be removed or the variables should be made to instance
variables
##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java:
##########
@@ -111,6 +114,10 @@ public String convertStatsToDeltaFormat(
.maxValues(getMaxValues(validColumnStats))
.nullCount(getNullCount(validColumnStats))
.build();
+ // Binary type in Parquet stats serialization is handled with
BinarySerializer()
Review Comment:
Did you validate that this is how Delta is serializing the binary stats?
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Delta
Lake table format. */
Review Comment:
I think the comment here needs to be updated, it is not for Delta Lake.
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -270,12 +267,11 @@ public InternalSchema toInternalSchema(Type schema,
String parentPath) {
.fields(valueSchema.getFields())
.build();
} else {
-
subFields = new ArrayList<>(schema.asGroupType().getFields().size());
for (Type parquetField : schema.asGroupType().getFields()) {
String fieldName = parquetField.getName();
Type.ID fieldId = parquetField.getId();
- currentRepetition = parquetField.getRepetition();
+ // currentRepetition = parquetField.getRepetition();
Review Comment:
Why is this change required?
##########
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java:
##########
@@ -188,6 +306,10 @@ private ConversionSourceProvider<?>
getConversionSourceProvider(String sourceTab
throw new IllegalArgumentException("Unsupported source format: " +
sourceTableFormat);
}
}
+ /*
+ test for Parquet file conversion
Review Comment:
Is there a test case added?
##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionSpecExtractor.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.*;
+
+import lombok.AllArgsConstructor;
+
+import org.apache.xtable.hudi.PathBasedPartitionSpecExtractor;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+/**
+ * Parses the InternalPartitionFields from a configured list of specs with the
format
+ * path:type:format for date types or path:type for value types.
+ */
+@AllArgsConstructor
+public class ParquetPartitionSpecExtractor implements
PathBasedPartitionSpecExtractor {
+ private static final ParquetPartitionSpecExtractor INSTANCE =
+ new ParquetPartitionSpecExtractor(new ArrayList<>());
+ private final List<PartitionFieldSpec> partitionFieldSpecs;
+
+ public static ParquetPartitionSpecExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public List<InternalPartitionField> spec(InternalSchema tableSchema) {
+
+ List<InternalPartitionField> partitionFields = new
ArrayList<>(partitionFieldSpecs.size());
+ for (PartitionFieldSpec fieldSpec : partitionFieldSpecs) {
+ InternalField sourceField =
+ SchemaFieldFinder.getInstance()
+ .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
+ partitionFields.add(
+ InternalPartitionField.builder()
+ .sourceField(sourceField)
+ .transformType(fieldSpec.getTransformType())
+ .build());
+ }
+ return partitionFields;
+ }
+
+ @Override
+ public Map<String, String> getPathToPartitionFieldFormat() {
+ Map<String, String> pathToPartitionFieldFormat = new HashMap<>();
+ // ParquetSourceConfig partitionConfig =
+ //
ParquetSourceConfig.fromPartitionFieldSpecConfig(partitionFieldSpecConfig);
Review Comment:
Remove this comment?
##########
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java:
##########
@@ -483,55 +605,6 @@ public void testTimeTravelQueries(String
sourceTableFormat) throws Exception {
}
}
- private static List<String> getOtherFormats(String sourceTableFormat) {
- return Arrays.stream(TableFormat.values())
- .filter(format -> !format.equals(sourceTableFormat))
- .collect(Collectors.toList());
- }
-
- private static Stream<Arguments> provideArgsForPartitionTesting() {
Review Comment:
A lot of the changes to this file are just moving methods around. Let's
avoid doing this going forward unless it is required or makes the code a lot
easier to read. It adds more overhead to the review for me to go line by line
through this to check what changes there are to the test setups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]