This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 108ec647 [553] Parquet Schema and Column Stats Converters (#669)
108ec647 is described below
commit 108ec6473c6419a6b93ad0ffe2d39e46d9290f07
Author: Selim Soufargi <[email protected]>
AuthorDate: Sat May 17 21:00:13 2025 +0200
[553] Parquet Schema and Column Stats Converters (#669)
* smaller PR for parquet
* read parquet file for metadataExtractor: compiling, not testd
* cleanups for statsExtractor: compiling, not testd
* refactoring for statsExtractor: compiling, not testd
* added avro dependency
* added tests for SchemaExtractor: int and string primitiveTypes test passes
* fixed some minor bugs in SchemaExtractor
* close fileReader and handle exception
* adjusted fromInternalSchema()
* added a test and adjusted SchemaExtractor
* added a testing code
* bug fix for Schema extractor: groupType
* bug fix for Schema extractor
* bug fix for tests
* bug fix for SchemaExtractor and added tests for nested lists support
* bug fix for tests for nested lists support
* bug fix for complex test which now passes!
* added test for Map
* schemaExtractor refactored
* bug fixed isNullable() schema
* fromInternalSchema : list and map types
* decimal primitive test added
* float primitive + list and map tests for fromInternalSchema
* added tests for primitive type (date and timestamp)
* refactoring for partitionValues extractor
* git build error fixed
* cleanups for schemaExtractor + refactoring for schemaExtractorTests +
added test code for statsExtractor
* added assertsEqual test for stats + removed partitionFields from the
test, TODO check if field is needed in ColumnStats
* bug fixed for stats tests: columnStats + tests data are read using
FileReader
* bug fixed for stats tests, TODO equality test for two objects
* added compareFiles() in InternDataFile for the statsExtractor tests to
pass: OK
* added custom comparison test for ColumnStat and InternDataFile, test
passes, TODO: other stat types and other schema types testing
* added custom comparison test for ColumnStat (field) and exec spotless
apply
* tempDir for parquet stats testing
* binaryStatistics test passes
* added int32 file schema test for statsExtractor
* cleanups + added fields comparison for InternalDataFile
* cleanups + added fixed_len_byte_array primitive type schema file test
* use of genericGetMax instead for stats extraction + cleanups
* boolean schema file test for statsExtractor added
* removed hard coded path in statsExtractor test
* cleanups + imports
* separate tests for int and binary for stats
* custom equals() not needed for InternalDataFile and ColumnStat
* removed parquet version from core sub-project pom
* statsExtractor tests as a suite, removed comments + run spotless apply
* removed uncessary classes
* removed uncessary classes: undo
* undo irrelevant changes
* fixed formatting issues with spotless:apply cmd
* cleanups for test class and fixes for failed build
* tmp file name fixed for failed build
* cleanups
* splotless apply run + assertion internalDataFile equality changed to
display errors
* fixes for build, PhysicalPath and BinaryStats
* fixes for build, PhysicalPath and BinaryStats + synced fork
* fixes for build, PhysicalPath and BinaryStats + synced fork
* fixes for build and cleanups
* fixes for build and cleanups
* Parquet dep set as provided to use Spark's
* parquet dep version back to 1.15.1
* parquet-avro moved from core to project's pom
* parquet-avro moved after hadoop-common
* parquet dep scope removed
* run spotless:apply
---------
Co-authored-by: Selim Soufargi <[email protected]~>
---
.../apache/xtable/conversion/ExternalTable.java | 4 +
.../ThreePartHierarchicalTableIdentifier.java | 1 +
.../apache/xtable/model/schema/InternalField.java | 2 +
.../apache/xtable/model/schema/InternalSchema.java | 3 +-
xtable-core/pom.xml | 6 +-
.../xtable/parquet/ParquetMetadataExtractor.java | 63 +++
.../xtable/parquet/ParquetSchemaExtractor.java | 494 +++++++++++++++++++++
.../xtable/parquet/ParquetStatsExtractor.java | 154 +++++++
.../org/apache/xtable/TestAbstractHudiTable.java | 1 +
.../java/org/apache/xtable/TestJavaHudiTable.java | 1 +
.../org/apache/xtable/iceberg/StubCatalog.java | 1 +
.../xtable/parquet/TestParquetSchemaExtractor.java | 344 ++++++++++++++
.../xtable/parquet/TestParquetStatsExtractor.java | 454 +++++++++++++++++++
.../apache/xtable/utilities/RunCatalogSync.java | 7 +
14 files changed, 1533 insertions(+), 2 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index 939c59c0..a6c97d8f 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -34,12 +34,16 @@ import com.google.common.base.Preconditions;
class ExternalTable {
/** The name of the table. */
protected final @NonNull String name;
+
/** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
protected final @NonNull String formatName;
+
/** The path to the root of the table or the metadata directory depending on
the format */
protected final @NonNull String basePath;
+
/** Optional namespace for the table */
protected final String[] namespace;
+
/** The configuration for interacting with the catalog that manages this
table */
protected final CatalogConfig catalogConfig;
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
index 2608d36a..f387c7d3 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
@@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements
HierarchicalTableId
* name varies depending on the catalogType.
*/
String catalogName;
+
/**
* Catalogs have the ability to group tables logically, databaseName is the
identifier for such
* logical classification. The alternate names for this field include
namespace, schemaName etc.
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
index 16b6da8a..31eb0ed4 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
@@ -43,9 +43,11 @@ public class InternalField {
// The id field for the field. This is used to identify the field in the
schema even after
// renames.
Integer fieldId;
+
// represents the fully qualified path to the field (dot separated)
@Getter(lazy = true)
String path = createPath();
+
// splits the dot separated path into parts
@Getter(lazy = true)
String[] pathParts = splitPath();
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
index 20af37e0..8b7e0fc5 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
@@ -75,7 +75,8 @@ public class InternalSchema {
public enum MetadataValue {
MICROS,
- MILLIS
+ MILLIS,
+ NANOS
}
public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 24bc31df..6bd5282c 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -56,7 +56,7 @@
<artifactId>guava</artifactId>
</dependency>
- <!-- Avro -->
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
@@ -116,6 +116,10 @@
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
<!-- Logging API -->
<dependency>
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
new file mode 100644
index 00000000..f29a186d
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.exception.ReadException;
+
+public class ParquetMetadataExtractor {
+
+ private static final ParquetMetadataExtractor INSTANCE = new
ParquetMetadataExtractor();
+
+ public static ParquetMetadataExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public static MessageType getSchema(ParquetMetadata footer) {
+ MessageType schema = footer.getFileMetaData().getSchema();
+ return schema;
+ }
+
+ public static ParquetMetadata readParquetMetadata(Configuration conf, Path
filePath) {
+ InputFile file = null;
+ try {
+ file = HadoopInputFile.fromPath(filePath, conf);
+ } catch (IOException e) {
+ throw new ReadException("Failed to read the parquet file", e);
+ }
+
+ ParquetReadOptions options = HadoopReadOptions.builder(conf,
filePath).build();
+ try (ParquetFileReader fileReader = ParquetFileReader.open(file, options))
{
+ return fileReader.getFooter();
+ } catch (Exception e) {
+ throw new ReadException("Failed to read the parquet file", e);
+ }
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
new file mode 100644
index 00000000..9043ac0d
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+ // parquet only supports string keys in maps
+ private static final InternalField MAP_KEY_FIELD =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .schema(
+ InternalSchema.builder()
+ .name("map_key")
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .defaultValue("")
+ .build();
+ private static final ParquetSchemaExtractor INSTANCE = new
ParquetSchemaExtractor();
+ private static final String ELEMENT = "element";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+
+ public static ParquetSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ private static boolean isNullable(Type schema) {
+ return schema.getRepetition() != Repetition.REQUIRED;
+ }
+
+ /**
+ * Converts the parquet {@link Schema} to {@link InternalSchema}.
+ *
+ * @param schema The schema being converted
+ * @param parentPath If this schema is nested within another, this will be a
dot separated string
+ * representing the path from the top most field to the current schema.
+ * @return a converted schema
+ */
+ public InternalSchema toInternalSchema(Type schema, String parentPath) {
+ InternalType newDataType = null;
+ Type.Repetition currentRepetition = null;
+ List<InternalField> subFields = null;
+ PrimitiveType primitiveType;
+ LogicalTypeAnnotation logicalType;
+ Map<InternalSchema.MetadataKey, Object> metadata =
+ new EnumMap<>(InternalSchema.MetadataKey.class);
+ String elementName = schema.getName();
+ if (schema.isPrimitive()) {
+ primitiveType = schema.asPrimitiveType();
+ switch (primitiveType.getPrimitiveTypeName()) {
+ // PrimitiveTypes
+ case INT64:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType).getUnit();
+ boolean isAdjustedToUTC =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
logicalType)
+ .isAdjustedToUTC();
+ if (isAdjustedToUTC) {
+ newDataType = InternalType.TIMESTAMP;
+ } else {
+ newDataType = InternalType.TIMESTAMP_NTZ;
+ }
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) {
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.MICROS);
+ } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.MILLIS);
+ } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.NANOS);
+ }
+ } else if (logicalType instanceof
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+ newDataType = InternalType.INT;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit =
+ ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation)
logicalType).getUnit();
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS
+ || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+ newDataType = InternalType.INT;
+ }
+ } else if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_PRECISION,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getPrecision());
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_SCALE,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
+ newDataType = InternalType.DECIMAL;
+
+ } else {
+ newDataType = InternalType.LONG;
+ }
+ break;
+ case INT32:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
+ newDataType = InternalType.DATE;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit =
+ ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation)
logicalType).getUnit();
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ newDataType = InternalType.INT;
+ }
+ } else if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_PRECISION,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getPrecision());
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_SCALE,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
+ newDataType = InternalType.DECIMAL;
+
+ } else {
+ newDataType = InternalType.INT;
+ }
+ break;
+ case FLOAT:
+ newDataType = InternalType.FLOAT;
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
+ newDataType = InternalType.UUID;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) {
+ metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12);
+ newDataType = InternalType.FIXED;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_PRECISION,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getPrecision());
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_SCALE,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
+ newDataType = InternalType.DECIMAL;
+ }
+ break;
+ case BINARY:
+ // TODO Variant,GEOMETRY, GEOGRAPHY,
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.ENUM_VALUES,
logicalType.toOriginalType().values());
+ newDataType = InternalType.ENUM;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.JsonLogicalTypeAnnotation) {
+ newDataType = InternalType.BYTES;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.BsonLogicalTypeAnnotation) {
+ newDataType = InternalType.BYTES;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+ newDataType = InternalType.STRING;
+ } else if (logicalType instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_PRECISION,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getPrecision());
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_SCALE,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
logicalType).getScale());
+ newDataType = InternalType.DECIMAL;
+
+ } else {
+ newDataType = InternalType.BYTES;
+ }
+ break;
+ case BOOLEAN:
+ newDataType = InternalType.BOOLEAN;
+ break;
+ default:
+ throw new UnsupportedSchemaTypeException(
+ String.format("Unsupported schema type %s", schema));
+ }
+ } else {
+ // GroupTypes
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+ String schemaName = schema.asGroupType().getName();
+ Type.ID schemaId = schema.getId();
+ InternalSchema elementSchema =
+ toInternalSchema(
+ schema.asGroupType().getType(0),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath,
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME));
+ InternalField elementField =
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(elementSchema)
+ .fieldId(schemaId == null ? null : schemaId.intValue())
+ .build();
+ return InternalSchema.builder()
+ .name(schema.getName())
+ .dataType(InternalType.LIST)
+ .comment(null)
+ .isNullable(isNullable(schema.asGroupType()))
+ .fields(Collections.singletonList(elementField))
+ .build();
+ } else if (logicalType instanceof
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+ String schemaName = schema.asGroupType().getName();
+ Type.ID schemaId = schema.getId();
+ InternalSchema valueSchema =
+ toInternalSchema(
+ schema.asGroupType().getType(0),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME));
+ InternalField valueField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(valueSchema)
+ .fieldId(schemaId == null ? null : schemaId.intValue())
+ .build();
+ return InternalSchema.builder()
+ .name(schemaName)
+ .dataType(InternalType.MAP)
+ .comment(null)
+ .isNullable(isNullable(schema.asGroupType()))
+ .fields(valueSchema.getFields())
+ .build();
+ } else {
+
+ subFields = new ArrayList<>(schema.asGroupType().getFields().size());
+ for (Type parquetField : schema.asGroupType().getFields()) {
+ String fieldName = parquetField.getName();
+ Type.ID fieldId = parquetField.getId();
+ currentRepetition = parquetField.getRepetition();
+ InternalSchema subFieldSchema =
+ toInternalSchema(
+ parquetField, SchemaUtils.getFullyQualifiedPath(parentPath,
fieldName));
+
+ if (schema.asGroupType().getFields().size()
+ == 1) { // TODO Tuple (many subelements in a list)
+ newDataType = subFieldSchema.getDataType();
+ elementName = subFieldSchema.getName();
+ break;
+ }
+ subFields.add(
+ InternalField.builder()
+ .parentPath(parentPath)
+ .name(fieldName)
+ .schema(subFieldSchema)
+ .defaultValue(null)
+ .fieldId(fieldId == null ? null : fieldId.intValue())
+ .build());
+ }
+ if (currentRepetition != Repetition.REPEATED
+ && schema.asGroupType().getName() != "list"
+ && !Arrays.asList("key_value",
"map").contains(schema.asGroupType().getName())) {
+ return InternalSchema.builder()
+ .name(schema.getName())
+ .comment(null)
+ .dataType(InternalType.RECORD)
+ .fields(subFields)
+ .isNullable(isNullable(schema.asGroupType()))
+ .build();
+ }
+ }
+ }
+ return InternalSchema.builder()
+ .name(elementName)
+ .dataType(newDataType)
+ .fields(subFields == null || subFields.size() == 0 ? null : subFields)
+ .comment(null)
+ .isNullable(isNullable(schema))
+ .metadata(metadata.isEmpty() ? null : metadata)
+ .build();
+ }
+
+ /**
+ * Internal method for converting the {@link InternalSchema} to parquet
{@link Schema}.
+ *
+ * @param internalSchema internal schema representation
+ * @param currentPath If this schema is nested within another, this will be
a dot separated
+ * string. This is used for the parquet namespace to guarantee unique
names for nested
+ * records.
+ * @return an parquet schema
+ */
+ public Type fromInternalSchema(InternalSchema internalSchema, String
currentPath) {
+ Type type = null;
+ Type listType = null;
+ Type mapType = null;
+ Type mapKeyType = null;
+ Type mapValueType = null;
+ String fieldName = internalSchema.getName();
+ InternalType internalType = internalSchema.getDataType();
+ switch (internalType) {
+ case BOOLEAN:
+ type =
+ Types.required(PrimitiveTypeName.BOOLEAN)
+ .as(LogicalTypeAnnotation.intType(8, false))
+ .named(fieldName);
+ break;
+ case INT:
+ type =
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.intType(32, false))
+ .named(fieldName);
+ break;
+ case LONG:
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.intType(64, false))
+ .named(fieldName);
+ break;
+ case STRING:
+ type =
+ Types.required(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named(fieldName);
+ break;
+ case FLOAT:
+ type = Types.required(PrimitiveTypeName.FLOAT).named(fieldName);
+ break;
+ case DECIMAL:
+ int precision =
+ (int)
internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+ int scale =
+ (int)
internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+ type =
+ Types.required(PrimitiveTypeName.FLOAT)
+ .as(LogicalTypeAnnotation.decimalType(scale, precision))
+ .named(fieldName);
+ break;
+
+ case ENUM:
+ type =
+ new org.apache.parquet.avro.AvroSchemaConverter()
+ .convert(
+ Schema.createEnum(
+ fieldName,
+ internalSchema.getComment(),
+ null,
+ (List<String>)
+ internalSchema
+ .getMetadata()
+ .get(InternalSchema.MetadataKey.ENUM_VALUES),
+ null))
+ .getType(fieldName);
+ break;
+ case DATE:
+ type =
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.dateType())
+ .named(fieldName);
+ break;
+ case TIMESTAMP:
+ if
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.MICROS) {
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named(fieldName);
+ }
+ if
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.MILLIS) {
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named(fieldName);
+ } else if
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.NANOS) {
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named(fieldName);
+ }
+ break;
+ case TIMESTAMP_NTZ:
+ if
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.MICROS) {
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named(fieldName);
+
+ } else {
+ type =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(
+ LogicalTypeAnnotation.timestampType(
+ true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named(fieldName);
+ }
+ break;
+ case LIST:
+ InternalField elementField =
+ internalSchema.getFields().stream()
+ .filter(
+ field ->
+
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(field.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid array
schema"));
+ listType = fromInternalSchema(elementField.getSchema(),
elementField.getPath());
+ type =
Types.requiredList().setElementType(listType).named(internalSchema.getName());
+ // TODO nullable lists
+ break;
+ case MAP:
+ InternalField keyField =
+ internalSchema.getFields().stream()
+ .filter(field ->
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(field.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
+ InternalField valueField =
+ internalSchema.getFields().stream()
+ .filter(
+ field ->
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(field.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
+ mapKeyType = fromInternalSchema(keyField.getSchema(),
valueField.getPath());
+ mapValueType = fromInternalSchema(valueField.getSchema(),
valueField.getPath());
+ type =
+
Types.requiredMap().key(mapKeyType).value(mapValueType).named(internalSchema.getName());
+ // TODO nullable lists
+ break;
+ case RECORD:
+ List<Type> fields =
+ internalSchema.getFields().stream()
+ .map(
+ field ->
+ fromInternalSchema(
+ field.getSchema(),
+ SchemaUtils.getFullyQualifiedPath(field.getName(),
currentPath)))
+
.collect(CustomCollectors.toList(internalSchema.getFields().size()));
+ type =
+
Types.requiredGroup().addFields(fields.stream().toArray(Type[]::new)).named(fieldName);
+ break;
+ default:
+ throw new UnsupportedSchemaTypeException(
+ "Encountered unhandled type during InternalSchema to parquet
conversion:"
+ + internalType);
+ }
+ return type;
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
new file mode 100644
index 00000000..8d02b392
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.Builder;
+import lombok.Value;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@Value
+@Builder
+public class ParquetStatsExtractor {
+
+ private static final ParquetStatsExtractor INSTANCE = new
ParquetStatsExtractor();
+
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ // private static final InputPartitionFields partitions = null;
+
+ public static ParquetStatsExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public static List<ColumnStat> getColumnStatsForaFile(ParquetMetadata
footer) {
+ return getStatsForFile(footer).values().stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ }
+
+ private static Optional<Long> getMaxFromColumnStats(List<ColumnStat>
columnStats) {
+ return columnStats.stream()
+ .filter(entry -> entry.getField().getParentPath() == null)
+ .map(ColumnStat::getNumValues)
+ .filter(numValues -> numValues > 0)
+ .max(Long::compareTo);
+ }
+
+ public static Map<ColumnDescriptor, List<ColumnStat>>
getStatsForFile(ParquetMetadata footer) {
+ Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+ MessageType schema = parquetMetadataExtractor.getSchema(footer);
+ List<ColumnChunkMetaData> columns = new ArrayList<>();
+ columns =
+ footer.getBlocks().stream()
+ .flatMap(blockMetaData -> blockMetaData.getColumns().stream())
+ .collect(Collectors.toList());
+ columnDescStats =
+ columns.stream()
+ .collect(
+ Collectors.groupingBy(
+ columnMetaData ->
+
schema.getColumnDescription(columnMetaData.getPath().toArray()),
+ Collectors.mapping(
+ columnMetaData ->
+ ColumnStat.builder()
+ .field(
+ InternalField.builder()
+
.name(columnMetaData.getPrimitiveType().getName())
+ .fieldId(
+
columnMetaData.getPrimitiveType().getId() == null
+ ? null
+ : columnMetaData
+ .getPrimitiveType()
+ .getId()
+ .intValue())
+ .parentPath(null)
+ .schema(
+ schemaExtractor.toInternalSchema(
+
columnMetaData.getPrimitiveType(),
+
columnMetaData.getPath().toDotString()))
+ .build())
+ .numValues(columnMetaData.getValueCount())
+ .totalSize(columnMetaData.getTotalSize())
+ .range(
+ Range.vector(
+
columnMetaData.getStatistics().genericGetMin(),
+
columnMetaData.getStatistics().genericGetMax()))
+ .build(),
+ Collectors.toList())));
+ return columnDescStats;
+ }
+
+ /* private static InputPartitionFields initPartitionInfo() {
+ return partitions;
+ }*/
+
+ public static InternalDataFile toInternalDataFile(Configuration hadoopConf,
Path parentPath)
+ throws IOException {
+ FileStatus file = null;
+ List<PartitionValue> partitionValues = null;
+ ParquetMetadata footer = null;
+ List<ColumnStat> columnStatsForAFile = null;
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ file = fs.getFileStatus(parentPath);
+ // InputPartitionFields partitionInfo = initPartitionInfo();
+ footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf,
parentPath);
+ MessageType schema = parquetMetadataExtractor.getSchema(footer);
+ columnStatsForAFile = getColumnStatsForaFile(footer);
+ // partitionValues = partitionExtractor.createPartitionValues(
+ // partitionInfo);
+ } catch (java.io.IOException e) {
+
+ }
+ return InternalDataFile.builder()
+ .physicalPath(parentPath.toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ // .partitionValues(partitionValues)
+ .fileSizeBytes(file.getLen())
+ .recordCount(getMaxFromColumnStats(columnStatsForAFile).orElse(0L))
+ .columnStats(columnStatsForAFile)
+ .lastModified(file.getModificationTime())
+ .build();
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 252b5b26..89460c40 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -127,6 +127,7 @@ public abstract class TestAbstractHudiTable
throw new UncheckedIOException(ex);
}
}
+
// Name of the table
protected String tableName;
// Base path for the table
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index ce3b25bd..abbe7fe6 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
private final Configuration conf;
+
/**
* Create a test table instance for general testing. The table is created
with the schema defined
* in basic_schema.avsc which contains many data types to ensure they are
handled correctly.
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
index a25063c4..9475d503 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
@@ -36,6 +36,7 @@ public class StubCatalog implements Catalog {
public static void registerMock(String catalogName, Catalog catalog) {
REGISTERED_MOCKS.put(catalogName, catalog);
}
+
// use a mocked catalog instance to more easily test
private Catalog mockedCatalog;
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
new file mode 100644
index 00000000..13d2299f
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.schema.*;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestParquetSchemaExtractor {
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @Test
+ public void testPrimitiveTypes() {
+
+ InternalSchema primitive1 =
+
InternalSchema.builder().name("integer").dataType(InternalType.INT).build();
+ InternalSchema primitive2 =
+
InternalSchema.builder().name("string").dataType(InternalType.STRING).build();
+
+ Map<InternalSchema.MetadataKey, Object> fixedDecimalMetadata = new
HashMap<>();
+ fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 6);
+ fixedDecimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 5);
+ InternalSchema decimalType =
+ InternalSchema.builder()
+ .name("decimal")
+ .dataType(InternalType.DECIMAL)
+ .isNullable(false)
+ .metadata(fixedDecimalMetadata)
+ .build();
+
+ Type stringPrimitiveType =
+ Types.required(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("string");
+
+ Type intPrimitiveType =
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.intType(32, false))
+ .named("integer");
+
+ Type decimalPrimitive =
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.decimalType(5, 6))
+ .named("decimal");
+
+ Assertions.assertEquals(primitive1,
schemaExtractor.toInternalSchema(intPrimitiveType, null));
+
+ Assertions.assertEquals(
+ primitive2, schemaExtractor.toInternalSchema(stringPrimitiveType,
null));
+
+ Assertions.assertEquals(decimalType,
schemaExtractor.toInternalSchema(decimalPrimitive, null));
+
+ // tests for timestamp and date
+ InternalSchema testDate =
+
InternalSchema.builder().name("date").dataType(InternalType.DATE).isNullable(false).build();
+
+ Map<InternalSchema.MetadataKey, Object> millisMetadata =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.MILLIS);
+ Map<InternalSchema.MetadataKey, Object> microsMetadata =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.MICROS);
+ Map<InternalSchema.MetadataKey, Object> nanosMetadata =
+ Collections.singletonMap(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.NANOS);
+
+ InternalSchema testTimestampMillis =
+ InternalSchema.builder()
+ .name("timestamp_millis")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(false)
+ .metadata(millisMetadata)
+ .build();
+
+ InternalSchema testTimestampMicros =
+ InternalSchema.builder()
+ .name("timestamp_micros")
+ .dataType(InternalType.TIMESTAMP)
+ .isNullable(false)
+ .metadata(microsMetadata)
+ .build();
+
+ InternalSchema testTimestampNanos =
+ InternalSchema.builder()
+ .name("timestamp_nanos")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(false)
+ .metadata(nanosMetadata)
+ .build();
+
+ Type timestampMillisPrimitiveType =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(false,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp_millis");
+ Type timestampNanosPrimitiveType =
+ Types.required(PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(false,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("timestamp_nanos");
+ Assertions.assertEquals(
+ testTimestampMillis,
schemaExtractor.toInternalSchema(timestampMillisPrimitiveType, null));
+ Assertions.assertEquals(
+ testTimestampNanos,
schemaExtractor.toInternalSchema(timestampNanosPrimitiveType, null));
+
+ // test date
+
+ Type datePrimitiveType =
+
Types.required(PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.dateType()).named("date");
+ Assertions.assertEquals(testDate,
schemaExtractor.toInternalSchema(datePrimitiveType, null));
+ }
+
+ @Test
+ public void testGroupTypes() {
+
+ // map
+
+ InternalSchema internalMap =
+ InternalSchema.builder()
+ .name("map")
+ .isNullable(false)
+ .dataType(InternalType.MAP)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("key")
+ .parentPath("_one_field_value")
+ .schema(
+ InternalSchema.builder()
+ .name("key")
+ .dataType(InternalType.FLOAT)
+ .isNullable(false)
+ .build())
+ .defaultValue(null)
+ .build(),
+ InternalField.builder()
+ .name("value")
+ .parentPath("_one_field_value")
+ .schema(
+ InternalSchema.builder()
+ .name("value")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build()))
+ .build();
+
+ /* testing from fromInternalSchema()*/
+
+ GroupType fromSimpleList =
+ Types.requiredList()
+ .element(
+ Types.required(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.intType(32, false))
+ .named(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME))
+ .named("my_list");
+
+ InternalSchema fromInternalList =
+ InternalSchema.builder()
+ .name("my_list")
+ .isNullable(false)
+ .dataType(InternalType.LIST)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath(null)
+ .schema(
+ InternalSchema.builder()
+
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build()))
+ .build();
+
+ GroupType fromTestMap =
+ Types.requiredMap()
+ .key(Types.primitive(PrimitiveTypeName.FLOAT,
Repetition.REQUIRED).named("key"))
+ .value(
+ Types.primitive(PrimitiveTypeName.INT32, Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.intType(32, false))
+ .named("value"))
+ .named("map");
+ InternalSchema fromInternalMap =
+ InternalSchema.builder()
+ .name("map")
+ .isNullable(false)
+ .dataType(InternalType.MAP)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("_one_field_key") // "key")
+ .parentPath("_one_field_value")
+ .schema(
+ InternalSchema.builder()
+ .name("key")
+ .dataType(InternalType.FLOAT)
+ .isNullable(false)
+ .build())
+ .defaultValue(null)
+ .build(),
+ InternalField.builder()
+ .name("_one_field_value") // "value")
+ .parentPath("_one_field_value")
+ .schema(
+ InternalSchema.builder()
+ .name("value")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build()))
+ .build();
+
+ InternalSchema recordListElementSchema =
+ InternalSchema.builder()
+ .name("my_group")
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("id")
+ .parentPath("my_group")
+ .schema(
+ InternalSchema.builder()
+ .name("id")
+ .dataType(InternalType.LONG)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("name")
+ .parentPath("my_group")
+ .schema(
+ InternalSchema.builder()
+ .name("name")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .defaultValue(null)
+ .build()))
+ .dataType(InternalType.RECORD)
+ .build();
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("my_record")
+ .dataType(InternalType.RECORD)
+ .isNullable(true)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("my_list")
+ .schema(
+ InternalSchema.builder()
+ .name("my_list")
+ .isNullable(false)
+ .dataType(InternalType.LIST)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+ .parentPath("my_list")
+ .schema(
+ InternalSchema.builder()
+ .name("element")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .build()))
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("my_group")
+ .schema(recordListElementSchema)
+ .defaultValue(null)
+ .build()))
+ .build();
+
+ Assertions.assertEquals(fromTestMap,
schemaExtractor.fromInternalSchema(fromInternalMap, null));
+ Assertions.assertEquals(
+ fromSimpleList, schemaExtractor.fromInternalSchema(fromInternalList,
null));
+
+ GroupType testGroupType =
+ Types.requiredGroup()
+ .required(PrimitiveTypeName.INT64)
+ .named("id")
+ .optional(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("name")
+ .named("my_group");
+
+ GroupType testMap =
+ Types.requiredMap()
+ .key(Types.primitive(PrimitiveTypeName.FLOAT,
Repetition.REQUIRED).named("key"))
+ .value(Types.primitive(PrimitiveTypeName.INT32,
Repetition.REQUIRED).named("value"))
+ .named("map");
+ GroupType listType =
+ Types.requiredList()
+ .setElementType(
+ Types.primitive(PrimitiveTypeName.INT32,
Repetition.REQUIRED).named("element"))
+ .named("my_list");
+ MessageType messageType =
+ Types.buildMessage()
+ // .addField(testMap)
+ .addField(listType)
+ .addField(testGroupType)
+ .named("my_record");
+
+ Assertions.assertEquals(internalMap,
schemaExtractor.toInternalSchema(testMap, null));
+ Assertions.assertEquals(internalSchema,
schemaExtractor.toInternalSchema(messageType, null));
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
new file mode 100644
index 00000000..7522c292
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.*;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class TestParquetStatsExtractor {
+
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @TempDir static java.nio.file.Path tempDir = Paths.get("./");
+
+ public static List<ColumnStat> initBooleanFileTest(File file) throws
IOException {
+ // create the parquet file by parsing a schema
+ Path path = new Path(file.toURI());
+ Configuration configuration = new Configuration();
+
+ MessageType schema =
+ MessageTypeParser.parseMessageType("message m { required group a
{required boolean b;}}");
+ String[] columnPath = {"a", "b"};
+ ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+ CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+ BooleanStatistics stats = new BooleanStatistics();
+ stats.updateStats(true);
+ stats.updateStats(false);
+
+ // write the string columned file
+
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+ w.startColumn(c1, 5, codec);
+ w.writeDataPage(2, 4, BytesInput.fromInt(1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.writeDataPage(3, 4, BytesInput.fromInt(0), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.startBlock(4);
+ w.startColumn(c1, 8, codec);
+
+ w.writeDataPage(7, 4, BytesInput.fromInt(0), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.end(new HashMap<String, String>());
+
+ // reconstruct the stats for the InternalDataFile testing object
+
+ boolean minStat = stats.genericGetMin();
+
+ boolean maxStat = stats.genericGetMax();
+ PrimitiveType primitiveType =
+ new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BOOLEAN, "b");
+ List<Integer> col1NumValTotSize =
+ new ArrayList<>(Arrays.asList(5, 8)); // (5, 8)// start column indexes
+ List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+ List<ColumnStat> testColumnStats = new ArrayList<>();
+ String[] columnDotPath = {"a.b", "a.b"};
+ for (int i = 0; i < columnDotPath.length; i++) {
+ testColumnStats.add(
+ ColumnStat.builder()
+ .field(
+ InternalField.builder()
+ .name(primitiveType.getName())
+ .parentPath(null)
+ .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
+ .build())
+ .numValues(col1NumValTotSize.get(i))
+ .totalSize(col2NumValTotSize.get(i))
+ .range(Range.vector(minStat, maxStat))
+ .build());
+ }
+
+ return testColumnStats;
+ }
+
+ public static List<ColumnStat> initStringFileTest(File file) throws
IOException {
+ // create the parquet file by parsing a schema
+ Path path = new Path(file.toURI());
+ Configuration configuration = new Configuration();
+
+ MessageType schema =
+ MessageTypeParser.parseMessageType(
+ "message m { required group a {required fixed_len_byte_array(10)
b;}}");
+ String[] columnPath = {"a", "b"};
+ ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+ CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+ BinaryStatistics stats = new BinaryStatistics();
+ stats.updateStats(Binary.fromString("1"));
+ stats.updateStats(Binary.fromString("2"));
+ stats.updateStats(Binary.fromString("5"));
+
+ byte[] bytes1 = "First string".getBytes();
+ byte[] bytes2 = "Second string".getBytes();
+
+ // write the string columned file
+
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+ w.startColumn(c1, 5, codec);
+
+ w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.startBlock(4);
+ w.startColumn(c1, 8, codec);
+
+ w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.end(new HashMap<String, String>());
+
+ // reconstruct the stats for the InternalDataFile testing object
+ BinaryStatistics stats_clone = new BinaryStatistics();
+ Binary originalBinary1 = Binary.fromString("1");
+ Binary originalBinary2 = Binary.fromString("2");
+ Binary originalBinary5 = Binary.fromString("5");
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
+
+ Binary minStat = stats_clone.genericGetMin();
+
+ Binary maxStat = stats_clone.genericGetMax();
+ PrimitiveType primitiveType =
+ new PrimitiveType(Repetition.REQUIRED,
PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, "b");
+ List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 8));
+ List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(71, 36));
+ List<ColumnStat> testColumnStats = new ArrayList<>();
+ String[] columnDotPath = {"a.b", "a.b"};
+ for (int i = 0; i < columnDotPath.length; i++) {
+ testColumnStats.add(
+ ColumnStat.builder()
+ .field(
+ InternalField.builder()
+ .name(primitiveType.getName())
+ .parentPath(null)
+ .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
+ .build())
+ .numValues(col1NumValTotSize.get(i))
+ .totalSize(col2NumValTotSize.get(i))
+ .range(Range.vector(minStat, maxStat))
+ .build());
+ }
+
+ return testColumnStats;
+ }
+
+ public static List<ColumnStat> initBinaryFileTest(File file) throws
IOException {
+ // create the parquet file by parsing a schema
+ Path path = new Path(file.toURI());
+ Configuration configuration = new Configuration();
+
+ MessageType schema =
+ MessageTypeParser.parseMessageType("message m { required group a
{required binary b;}}");
+ String[] columnPath = {"a", "b"};
+ ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+
+ byte[] bytes1 = {0, 1, 2, 3};
+ byte[] bytes2 = {2, 3, 4, 5};
+ CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+ BinaryStatistics stats = new BinaryStatistics();
+ stats.updateStats(Binary.fromString("1"));
+ stats.updateStats(Binary.fromString("2"));
+ stats.updateStats(Binary.fromString("5"));
+
+ // to simplify the test we keep the same stats for both columns
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+ w.startColumn(c1, 5, codec);
+
+ w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+
+ w.writeDataPage(3, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+
+ w.endColumn();
+ w.endBlock();
+ w.startBlock(4);
+
+ w.startColumn(c1, 1, codec);
+ w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+
+ w.endColumn();
+ w.endBlock();
+ w.end(new HashMap<String, String>());
+
+ // reconstruct the stats for the InternalDataFile testing object
+ BinaryStatistics stats_clone = new BinaryStatistics();
+ Binary originalBinary1 = Binary.fromString("1");
+ Binary originalBinary2 = Binary.fromString("2");
+ Binary originalBinary5 = Binary.fromString("5");
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary1.getBytes()));
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary2.getBytes()));
+ stats_clone.updateStats(Binary.fromByteArray(originalBinary5.getBytes()));
+
+ Binary minStat = stats_clone.genericGetMin();
+ Binary maxStat = stats_clone.genericGetMax();
+ PrimitiveType primitiveType =
+ new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "b");
+ List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(5, 1));
+ List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+ List<ColumnStat> testColumnStats = new ArrayList<>();
+ String[] columnDotPath = {"a.b", "a.b"};
+ for (int i = 0; i < columnDotPath.length; i++) {
+ testColumnStats.add(
+ ColumnStat.builder()
+ .field(
+ InternalField.builder()
+ .name(primitiveType.getName())
+ .parentPath(null)
+ .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
+ .build())
+ .numValues(col1NumValTotSize.get(i))
+ .totalSize(col2NumValTotSize.get(i))
+ .range(Range.vector(minStat, maxStat))
+ .build());
+ }
+
+ return testColumnStats;
+ }
+
+ public static List<ColumnStat> initIntFileTest(File file) throws IOException
{
+ // create the parquet file by parsing a schema
+ Path path = new Path(file.toURI());
+ Configuration configuration = new Configuration();
+
+ MessageType schema =
+ MessageTypeParser.parseMessageType("message m { required group a
{required int32 b;}}");
+ String[] columnPath = {"a", "b"};
+ ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+
+ CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+ IntStatistics stats = new IntStatistics();
+ stats.updateStats(1);
+ stats.updateStats(2);
+ stats.updateStats(5);
+
+ // to simplify the test we keep the same stats for both columns
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+
+ w.startColumn(c1, 2, codec);
+
+ w.writeDataPage(3, 3, BytesInput.fromInt(3), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+
+ w.writeDataPage(3, 3, BytesInput.fromInt(2), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.startBlock(4);
+
+ w.startColumn(c1, 1, codec);
+
+ w.writeDataPage(3, 3, BytesInput.fromInt(1), stats, BIT_PACKED,
BIT_PACKED, PLAIN);
+ w.endColumn();
+ w.endBlock();
+ w.end(new HashMap<String, String>());
+
+ // reconstruct the stats for the InternalDataFile testing object
+
+ java.lang.Integer minStat = stats.genericGetMin();
+ java.lang.Integer maxStat = stats.genericGetMax();
+ PrimitiveType primitiveType =
+ new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.INT32, "b");
+ List<Integer> col1NumValTotSize = new ArrayList<>(Arrays.asList(2, 1));
+ List<Integer> col2NumValTotSize = new ArrayList<>(Arrays.asList(54, 27));
+ List<ColumnStat> testColumnStats = new ArrayList<>();
+ String[] columnDotPath = {"a.b", "a.b"};
+ for (int i = 0; i < columnDotPath.length; i++) {
+ testColumnStats.add(
+ ColumnStat.builder()
+ .field(
+ InternalField.builder()
+ .name(primitiveType.getName())
+ .parentPath(null)
+ .schema(schemaExtractor.toInternalSchema(primitiveType,
columnDotPath[i]))
+ .build())
+ .numValues(col1NumValTotSize.get(i))
+ .totalSize(col2NumValTotSize.get(i))
+ .range(Range.vector(minStat, maxStat))
+ .build());
+ }
+
+ return testColumnStats;
+ }
+
+ @Test
+ public void testInternalDataFileStringStat() throws IOException {
+
+ Configuration configuration = new Configuration();
+
+ java.nio.file.Path path = tempDir.resolve("parquet-test-string-file");
+ File file = path.toFile();
+ file.deleteOnExit();
+ List<ColumnStat> testColumnStats = initStringFileTest(file);
+ Path hadoopPath = new Path(file.toURI());
+ // statsExtractor toInternalDataFile testing
+ InternalDataFile internalDataFile =
+ ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ InternalDataFile testInternalFile =
+ InternalDataFile.builder()
+ .physicalPath(
+ "file:"
+ .concat(
+
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+ .columnStats(testColumnStats)
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(file.lastModified())
+ .fileSizeBytes(file.length())
+ .recordCount(8)
+ .build();
+
+ Assertions.assertEquals(testInternalFile, internalDataFile);
+ }
+
+ @Test
+ public void testInternalDataFileBinaryStat() throws IOException {
+
+ Configuration configuration = new Configuration();
+
+ java.nio.file.Path path = tempDir.resolve("parquet-test-binary-file");
+ File file = path.toFile();
+ file.deleteOnExit();
+ List<ColumnStat> testColumnStats = initBinaryFileTest(file);
+ Path hadoopPath = new Path(file.toURI());
+ // statsExtractor toInternalDataFile testing
+ InternalDataFile internalDataFile =
+ ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ InternalDataFile testInternalFile =
+ InternalDataFile.builder()
+ .physicalPath(
+ "file:"
+ .concat(
+
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+ .columnStats(testColumnStats)
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(file.lastModified())
+ .fileSizeBytes(file.length())
+ .recordCount(5)
+ .build();
+
+ Assertions.assertEquals(testInternalFile, internalDataFile);
+ }
+
+ @Test
+ public void testInternalDataFileIntStat() throws IOException {
+
+ Configuration configuration = new Configuration();
+ java.nio.file.Path path = tempDir.resolve("parquet-test-int-file");
+ File file = path.toFile();
+ file.deleteOnExit();
+ List<ColumnStat> testColumnStats = initIntFileTest(file);
+ Path hadoopPath = new Path(file.toURI());
+ // statsExtractor toInternalDataFile testing
+ InternalDataFile internalDataFile =
+ ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ InternalDataFile testInternalFile =
+ InternalDataFile.builder()
+ .physicalPath(
+ "file:"
+ .concat(
+
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+ .columnStats(testColumnStats)
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(file.lastModified())
+ .fileSizeBytes(file.length())
+ .recordCount(2)
+ .build();
+
+ Assertions.assertEquals(testInternalFile, internalDataFile);
+ }
+
+ @Test
+ public void testInternalDataFileBooleanStat() throws IOException {
+ Configuration configuration = new Configuration();
+
+ java.nio.file.Path path = tempDir.resolve("parquet-test-boolean-file");
+ File file = path.toFile();
+ file.deleteOnExit();
+
+ List<ColumnStat> testColumnStats = initBooleanFileTest(file);
+ Path hadoopPath = new Path(file.toURI());
+ // statsExtractor toInternalDataFile testing
+ InternalDataFile internalDataFile =
+ ParquetStatsExtractor.toInternalDataFile(configuration, hadoopPath);
+ InternalDataFile testInternalFile =
+ InternalDataFile.builder()
+ .physicalPath(
+ "file:"
+ .concat(
+
file.toPath().normalize().toAbsolutePath().toString().replace("\\", "/")))
+ .columnStats(testColumnStats)
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(file.lastModified())
+ .fileSizeBytes(file.length())
+ .recordCount(8)
+ .build();
+
+ Assertions.assertEquals(testInternalFile, internalDataFile);
+ }
+}
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 43549d1b..e04d6985 100644
---
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -297,11 +297,13 @@ public class RunCatalogSync {
* necessary connection and access details for describing and listing
tables
*/
ExternalCatalogConfig sourceCatalog;
+
/**
* Defines configuration one or more target catalogs, to which XTable will
write or update
* tables. Unlike the source, these catalogs must be writable
*/
List<ExternalCatalogConfig> targetCatalogs;
+
/** A list of datasets that specify how a source table maps to one or more
target tables. */
List<Dataset> datasets;
@@ -314,6 +316,7 @@ public class RunCatalogSync {
public static class Dataset {
/** Identifies the source table in sourceCatalog. */
SourceTableIdentifier sourceCatalogTableIdentifier;
+
/** A list of one or more targets that this source table should be
written to. */
List<TargetTableIdentifier> targetCatalogTableIdentifiers;
}
@@ -324,6 +327,7 @@ public class RunCatalogSync {
public static class SourceTableIdentifier {
/** Specifies the table identifier in the source catalog. */
TableIdentifier tableIdentifier;
+
/**
* (Optional) Provides direct storage details such as a table’s base
path (like an S3
* location) and the partition specification. This allows reading from a
source even if it is
@@ -341,11 +345,13 @@ public class RunCatalogSync {
* updated
*/
String catalogId;
+
/**
* The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how
the data will be
* stored at the target.
*/
String tableFormat;
+
/** Specifies the table identifier in the target catalog. */
TableIdentifier tableIdentifier;
}
@@ -359,6 +365,7 @@ public class RunCatalogSync {
* HierarchicalTableIdentifier}
*/
String hierarchicalId;
+
/** Specifies the partition spec of the table */
String partitionSpec;
}