the-other-tim-brown commented on code in PR #669:
URL: https://github.com/apache/incubator-xtable/pull/669#discussion_r2017784924


##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.xtable.schema.SchemaUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+//import org.apache.parquet.avro.AvroSchemaConverter;
+
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema 
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+    // parquet only supports string keys in maps
+    private static final InternalField MAP_KEY_FIELD =
+            InternalField.builder()
+                    .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                    .schema(
+                            InternalSchema.builder()
+                                    .name("map_key")
+                                    .dataType(InternalType.STRING)
+                                    .isNullable(false)
+                                    .build())
+                    .defaultValue("")
+                    .build();
+    private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
+    private static final String ELEMENT = "element";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+
+    public static ParquetSchemaExtractor getInstance() {
+        return INSTANCE;
+    }
+
+    private static boolean groupTypeContainsNull(Type schema) {
+        if (!schema.isPrimitive()) {
+            for (Type field : schema.asGroupType().getFields()) {
+                if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == 
null) {
+                    return true;
+                }
+            }
+        } else{
+            if (schema.equals(null)){
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /*    private static LogicalTypeAnnotation 
finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) {
+            if (inputSchema.isNullable()) {
+                return targetSchema.union(null); // 
LogicalTypeAnnotation.unknownType()
+            }
+            return targetSchema;
+        }*/
+    private Map<String, IdMapping> getChildIdMap(IdMapping idMapping) {
+        if (idMapping == null) {
+            return Collections.emptyMap();
+        }
+        return idMapping.getFields().stream()
+                .collect(Collectors.toMap(IdMapping::getName, 
Function.identity()));
+    }
+
+    /**
+     * Converts the parquet {@link Schema} to {@link InternalSchema}.
+     *
+     * @param schema               The schema being converted
+     * @param parentPath           If this schema is nested within another, 
this will be a dot separated string
+     *                             representing the path from the top most 
field to the current schema.
+     * @param fieldNameToIdMapping map of fieldName to IdMapping to track 
field IDs provided by the
+     *                             source schema. If source schema does not 
contain IdMappings, map will be empty.
+     * @return a converted schema
+     */
+    public InternalSchema toInternalSchema(
+            Type schema, String parentPath, Map<String, IdMapping> 
fieldNameToIdMapping) {
+        // TODO - Does not handle recursion in parquet schema
+        InternalType newDataType = null;
+        PrimitiveType typeName;
+        LogicalTypeAnnotation logicalType;
+        Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>();
+        if (schema.isPrimitive()/*schema.getFields().size()==1*/) {
+            //Type schemaField = schema.getType(0);

Review Comment:
   Can the commented out code be removed?



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.xtable.schema.SchemaUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+//import org.apache.parquet.avro.AvroSchemaConverter;
+
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema 
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+    // parquet only supports string keys in maps
+    private static final InternalField MAP_KEY_FIELD =
+            InternalField.builder()
+                    .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                    .schema(
+                            InternalSchema.builder()
+                                    .name("map_key")
+                                    .dataType(InternalType.STRING)
+                                    .isNullable(false)
+                                    .build())
+                    .defaultValue("")
+                    .build();
+    private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
+    private static final String ELEMENT = "element";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+
+    public static ParquetSchemaExtractor getInstance() {
+        return INSTANCE;
+    }
+
+    private static boolean groupTypeContainsNull(Type schema) {
+        if (!schema.isPrimitive()) {
+            for (Type field : schema.asGroupType().getFields()) {
+                if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == 
null) {
+                    return true;
+                }
+            }
+        } else{
+            if (schema.equals(null)){
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /*    private static LogicalTypeAnnotation 
finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) {
+            if (inputSchema.isNullable()) {
+                return targetSchema.union(null); // 
LogicalTypeAnnotation.unknownType()
+            }
+            return targetSchema;
+        }*/
+    private Map<String, IdMapping> getChildIdMap(IdMapping idMapping) {
+        if (idMapping == null) {
+            return Collections.emptyMap();
+        }
+        return idMapping.getFields().stream()
+                .collect(Collectors.toMap(IdMapping::getName, 
Function.identity()));
+    }
+
+    /**
+     * Converts the parquet {@link Schema} to {@link InternalSchema}.
+     *
+     * @param schema               The schema being converted
+     * @param parentPath           If this schema is nested within another, 
this will be a dot separated string
+     *                             representing the path from the top most 
field to the current schema.
+     * @param fieldNameToIdMapping map of fieldName to IdMapping to track 
field IDs provided by the
+     *                             source schema. If source schema does not 
contain IdMappings, map will be empty.
+     * @return a converted schema
+     */
+    public InternalSchema toInternalSchema(
+            Type schema, String parentPath, Map<String, IdMapping> 
fieldNameToIdMapping) {

Review Comment:
   The id is present within the `Type` object itself so you don't need another 
IdMapping here



##########
xtable-core/pom.xml:
##########
@@ -61,6 +61,12 @@
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>1.15.0</version>

Review Comment:
   Should we update the version defined in the parent pom? 
https://github.com/apache/incubator-xtable/blob/main/pom.xml#L152
   
   We define the version there to keep it consistent between modules.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class ParquetMetadataExtractor {
+
+  private static final ParquetMetadataExtractor INSTANCE = new 
ParquetMetadataExtractor();
+
+  public static ParquetMetadataExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public static MessageType getSchema(ParquetMetadata footer) {
+    MessageType schema = footer.getFileMetaData().getSchema();
+    return schema;
+  }
+
+  public static ParquetMetadata readParquetMetadata(Configuration conf, Path 
filePath) {
+    ParquetFileReader fileReader = null;
+    ParquetReadOptions options = HadoopReadOptions.builder(conf, 
filePath).build();
+    try {
+      InputFile file = HadoopInputFile.fromPath(filePath, conf);
+      fileReader = ParquetFileReader.open(file, options);
+    } catch (java.io.IOException e) {
+      // TODO add proper processing

Review Comment:
   We can throw a `ReadException` here with some context that there is a 
failure reading the parquet metadata



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class ParquetMetadataExtractor {
+
+  private static final ParquetMetadataExtractor INSTANCE = new 
ParquetMetadataExtractor();
+
+  public static ParquetMetadataExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public static MessageType getSchema(ParquetMetadata footer) {
+    MessageType schema = footer.getFileMetaData().getSchema();
+    return schema;
+  }
+
+  public static ParquetMetadata readParquetMetadata(Configuration conf, Path 
filePath) {
+    ParquetFileReader fileReader = null;
+    ParquetReadOptions options = HadoopReadOptions.builder(conf, 
filePath).build();
+    try {
+      InputFile file = HadoopInputFile.fromPath(filePath, conf);
+      fileReader = ParquetFileReader.open(file, options);

Review Comment:
   The ParquetFileReader needs to be closed, you can declare this in the `try` 
so it gets closed for you:
   
   ```
   try (ParquetFileReader fileReader = ParquetFileReader.open(file, options)) {
     return fileReader.getFooter();
   } catch (IOException e) {
   ```



##########
xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.model.config;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+import org.apache.xtable.model.schema.PartitionTransformType;
+
+@Data
+@Value
+@Builder(toBuilder = true)
+public class InputPartitionField {

Review Comment:
   There is already `InternalPartitionField`, let's try to reuse that



##########
xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.xtable.parquet;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+import org.apache.parquet.schema.*;
+import org.junit.jupiter.api.Assertions;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+
+
+public class TestParquetSchemaExtractor {

Review Comment:
   Let's make sure to add a test with a full record schema to match the case of 
translating the full file's schema



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.xtable.schema.SchemaUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+//import org.apache.parquet.avro.AvroSchemaConverter;
+
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema 
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+    // parquet only supports string keys in maps
+    private static final InternalField MAP_KEY_FIELD =
+            InternalField.builder()
+                    .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                    .schema(
+                            InternalSchema.builder()
+                                    .name("map_key")
+                                    .dataType(InternalType.STRING)
+                                    .isNullable(false)
+                                    .build())
+                    .defaultValue("")
+                    .build();
+    private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
+    private static final String ELEMENT = "element";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+
+    public static ParquetSchemaExtractor getInstance() {
+        return INSTANCE;
+    }
+
+    private static boolean groupTypeContainsNull(Type schema) {
+        if (!schema.isPrimitive()) {
+            for (Type field : schema.asGroupType().getFields()) {
+                if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == 
null) {
+                    return true;
+                }
+            }
+        } else{
+            if (schema.equals(null)){
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /*    private static LogicalTypeAnnotation 
finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) {
+            if (inputSchema.isNullable()) {
+                return targetSchema.union(null); // 
LogicalTypeAnnotation.unknownType()
+            }
+            return targetSchema;
+        }*/
+    private Map<String, IdMapping> getChildIdMap(IdMapping idMapping) {
+        if (idMapping == null) {
+            return Collections.emptyMap();
+        }
+        return idMapping.getFields().stream()
+                .collect(Collectors.toMap(IdMapping::getName, 
Function.identity()));
+    }
+
+    /**
+     * Converts the parquet {@link Schema} to {@link InternalSchema}.
+     *
+     * @param schema               The schema being converted
+     * @param parentPath           If this schema is nested within another, 
this will be a dot separated string
+     *                             representing the path from the top most 
field to the current schema.
+     * @param fieldNameToIdMapping map of fieldName to IdMapping to track 
field IDs provided by the
+     *                             source schema. If source schema does not 
contain IdMappings, map will be empty.
+     * @return a converted schema
+     */
+    public InternalSchema toInternalSchema(
+            Type schema, String parentPath, Map<String, IdMapping> 
fieldNameToIdMapping) {
+        // TODO - Does not handle recursion in parquet schema
+        InternalType newDataType = null;
+        PrimitiveType typeName;
+        LogicalTypeAnnotation logicalType;
+        Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>();
+        if (schema.isPrimitive()/*schema.getFields().size()==1*/) {
+            //Type schemaField = schema.getType(0);
+            //typeName = schemaField.asPrimitiveType();
+            typeName = schema.asPrimitiveType();
+            switch (typeName.getPrimitiveTypeName()) {
+                // PrimitiveTypes
+                case INT64:
+                    logicalType = schema.getLogicalTypeAnnotation();
+                    if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+                        LogicalTypeAnnotation.TimeUnit timeUnit =
+                                
((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit();
+                        if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) 
{
+                            newDataType = InternalType.TIMESTAMP;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.MICROS);
+                        } else if (timeUnit == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+                            newDataType = InternalType.TIMESTAMP_NTZ;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.MILLIS);
+                        } else if (timeUnit == 
LogicalTypeAnnotation.TimeUnit.NANOS) {
+                            newDataType = InternalType.TIMESTAMP_NTZ;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.NANOS);
+                        }
+                    } else if (logicalType instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+                        newDataType = InternalType.INT;
+                    } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+                        LogicalTypeAnnotation.TimeUnit timeUnit = 
((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit();
+                        if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS 
|| timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+                            // check if INT is the InternalType needed here
+                            newDataType = InternalType.INT;
+                        }
+                    } else {
+                        newDataType = InternalType.INT;
+                    }
+                    break;
+                case INT32:
+                    logicalType = schema.getLogicalTypeAnnotation();
+                    if (logicalType instanceof 
LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
+                        newDataType = InternalType.DATE;
+                    } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+                        LogicalTypeAnnotation.TimeUnit timeUnit = 
((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit();
+                        if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) 
{
+                            // check if INT is the InternalType needed here
+                            newDataType = InternalType.INT;
+                        }
+                    } else {
+                        newDataType = InternalType.INT;
+                    }
+                    break;
+                case INT96:
+                    newDataType = InternalType.INT;

Review Comment:
   a 96 bit int can't be cast down to 32 bits, I think this type is deprecated 
in parquet as well so I don't think we need to support it now



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;

Review Comment:
   Nitpick: we prefer to avoid the * imports and just import the classes we're 
using.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.xtable.schema.SchemaUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+//import org.apache.parquet.avro.AvroSchemaConverter;
+
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema 
{@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 
1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+    // parquet only supports string keys in maps
+    private static final InternalField MAP_KEY_FIELD =
+            InternalField.builder()
+                    .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+                    .schema(
+                            InternalSchema.builder()
+                                    .name("map_key")
+                                    .dataType(InternalType.STRING)
+                                    .isNullable(false)
+                                    .build())
+                    .defaultValue("")
+                    .build();
+    private static final ParquetSchemaExtractor INSTANCE = new 
ParquetSchemaExtractor();
+    private static final String ELEMENT = "element";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+
+    public static ParquetSchemaExtractor getInstance() {
+        return INSTANCE;
+    }
+
+    private static boolean groupTypeContainsNull(Type schema) {
+        if (!schema.isPrimitive()) {
+            for (Type field : schema.asGroupType().getFields()) {
+                if (field/*.getLogicalTypeAnnotation().toOriginalType()*/ == 
null) {
+                    return true;
+                }
+            }
+        } else{
+            if (schema.equals(null)){
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /*    private static LogicalTypeAnnotation 
finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) {
+            if (inputSchema.isNullable()) {
+                return targetSchema.union(null); // 
LogicalTypeAnnotation.unknownType()
+            }
+            return targetSchema;
+        }*/
+    private Map<String, IdMapping> getChildIdMap(IdMapping idMapping) {
+        if (idMapping == null) {
+            return Collections.emptyMap();
+        }
+        return idMapping.getFields().stream()
+                .collect(Collectors.toMap(IdMapping::getName, 
Function.identity()));
+    }
+
+    /**
+     * Converts the parquet {@link Schema} to {@link InternalSchema}.
+     *
+     * @param schema               The schema being converted
+     * @param parentPath           If this schema is nested within another, 
this will be a dot separated string
+     *                             representing the path from the top most 
field to the current schema.
+     * @param fieldNameToIdMapping map of fieldName to IdMapping to track 
field IDs provided by the
+     *                             source schema. If source schema does not 
contain IdMappings, map will be empty.
+     * @return a converted schema
+     */
+    public InternalSchema toInternalSchema(
+            Type schema, String parentPath, Map<String, IdMapping> 
fieldNameToIdMapping) {
+        // TODO - Does not handle recursion in parquet schema
+        InternalType newDataType = null;
+        PrimitiveType typeName;
+        LogicalTypeAnnotation logicalType;
+        Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>();
+        if (schema.isPrimitive()/*schema.getFields().size()==1*/) {
+            //Type schemaField = schema.getType(0);
+            //typeName = schemaField.asPrimitiveType();
+            typeName = schema.asPrimitiveType();
+            switch (typeName.getPrimitiveTypeName()) {
+                // PrimitiveTypes
+                case INT64:
+                    logicalType = schema.getLogicalTypeAnnotation();
+                    if (logicalType instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+                        LogicalTypeAnnotation.TimeUnit timeUnit =
+                                
((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit();
+                        if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) 
{
+                            newDataType = InternalType.TIMESTAMP;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.MICROS);
+                        } else if (timeUnit == 
LogicalTypeAnnotation.TimeUnit.MILLIS) {
+                            newDataType = InternalType.TIMESTAMP_NTZ;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.MILLIS);
+                        } else if (timeUnit == 
LogicalTypeAnnotation.TimeUnit.NANOS) {
+                            newDataType = InternalType.TIMESTAMP_NTZ;
+                            metadata.put(
+                                    
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                                    InternalSchema.MetadataValue.NANOS);
+                        }
+                    } else if (logicalType instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+                        newDataType = InternalType.INT;
+                    } else if (logicalType instanceof 
LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+                        LogicalTypeAnnotation.TimeUnit timeUnit = 
((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit();
+                        if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS 
|| timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+                            // check if INT is the InternalType needed here
+                            newDataType = InternalType.INT;
+                        }
+                    } else {
+                        newDataType = InternalType.INT;

Review Comment:
   This should be `LONG` for any 64 bit integer values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to