This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 053bea6  ORC: Omit columns without field ids in schema conversion 
(#1140)
053bea6 is described below

commit 053bea6eaa705737aba9ef8961eefcaf509e107f
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Thu Jul 2 12:15:40 2020 -0700

    ORC: Omit columns without field ids in schema conversion (#1140)
---
 .../java/org/apache/iceberg/orc/ORCSchemaUtil.java | 152 ++---------------
 .../org/apache/iceberg/orc/OrcSchemaVisitor.java   | 105 ++++++++++++
 .../iceberg/orc/OrcSchemaWithTypeVisitor.java      |   2 +-
 .../apache/iceberg/orc/OrcToIcebergVisitor.java    | 190 +++++++++++++++++++++
 .../org/apache/iceberg/orc/TestORCSchemaUtil.java  |  60 +++++++
 5 files changed, 373 insertions(+), 136 deletions(-)

diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
index 970f891..0a09aea 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
@@ -19,17 +19,14 @@
 
 package org.apache.iceberg.orc;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
@@ -67,8 +64,8 @@ public final class ORCSchemaUtil {
     }
   }
 
-  private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
-  private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+  static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
+  static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
 
   /**
    * The name of the ORC {@link TypeDescription} attribute indicating the 
Iceberg type corresponding to an
@@ -80,7 +77,7 @@ public final class ORCSchemaUtil {
    * ORC long type. The values for this attribute are denoted in {@code 
LongType}.
    */
   public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
-  private static final String ICEBERG_FIELD_LENGTH = "iceberg.length";
+  static final String ICEBERG_FIELD_LENGTH = "iceberg.length";
 
   private static final ImmutableMap<Type.TypeID, TypeDescription.Category> 
TYPE_MAPPING =
       ImmutableMap.<Type.TypeID, TypeDescription.Category>builder()
@@ -202,10 +199,11 @@ public final class ORCSchemaUtil {
 
   /**
    * Convert an ORC schema to an Iceberg schema. This method handles the 
convertion from the original
-   * Iceberg column mapping IDs if present in the ORC column attributes, 
otherwise, ORC column IDs
-   * will be assigned following ORCs pre-order ID assignment.
+   * Iceberg column mapping IDs if present in the ORC column attributes, 
otherwise, ORC columns with no
+   * Iceberg IDs will be ignored and skipped in the conversion.
    *
    * @return the Iceberg schema
+   * @throws IllegalArgumentException if ORC schema has no columns with 
Iceberg ID attributes
    */
   public static Schema convert(TypeDescription orcSchema) {
     List<TypeDescription> children = orcSchema.getChildren();
@@ -213,14 +211,15 @@ public final class ORCSchemaUtil {
     Preconditions.checkState(children.size() == childrenNames.size(),
         "Error in ORC file, children fields and names do not match.");
 
-    List<Types.NestedField> icebergFields = 
Lists.newArrayListWithExpectedSize(children.size());
-    AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema));
-    for (int i = 0; i < children.size(); i++) {
-      icebergFields.add(convertOrcToIceberg(children.get(i), 
childrenNames.get(i),
-          lastColumnId::incrementAndGet));
+    OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor();
+    List<Types.NestedField> fields = 
OrcToIcebergVisitor.visitSchema(orcSchema, schemaConverter).stream()
+        
.filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+
+    if (fields.size() == 0) {
+      throw new IllegalArgumentException("ORC schema does not contain Iceberg 
IDs");
     }
 
-    return new Schema(icebergFields);
+    return new Schema(fields);
   }
 
   /**
@@ -388,129 +387,12 @@ public final class ORCSchemaUtil {
     return Integer.parseInt(idStr);
   }
 
-  private static boolean isRequired(TypeDescription orcType) {
+  static boolean isOptional(TypeDescription orcType) {
     String isRequiredStr = 
orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE);
     if (isRequiredStr != null) {
-      return Boolean.parseBoolean(isRequiredStr);
-    }
-    return false;
-  }
-
-  private static Types.NestedField getIcebergType(int icebergID, String name, 
Type type,
-                                                  boolean isRequired) {
-    return isRequired ?
-        Types.NestedField.required(icebergID, name, type) :
-        Types.NestedField.optional(icebergID, name, type);
-  }
-
-  private static Types.NestedField convertOrcToIceberg(TypeDescription 
orcType, String name,
-                                                       TypeUtil.NextID nextID) 
{
-
-    final int icebergID = icebergID(orcType).orElseGet(nextID::get);
-    final boolean isRequired = isRequired(orcType);
-
-    switch (orcType.getCategory()) {
-      case BOOLEAN:
-        return getIcebergType(icebergID, name, Types.BooleanType.get(), 
isRequired);
-      case BYTE:
-      case SHORT:
-      case INT:
-        return getIcebergType(icebergID, name, Types.IntegerType.get(), 
isRequired);
-      case LONG:
-        String longAttributeValue = 
orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
-        LongType longType = longAttributeValue == null ? LongType.LONG : 
LongType.valueOf(longAttributeValue);
-        switch (longType) {
-          case TIME:
-            return getIcebergType(icebergID, name, Types.TimeType.get(), 
isRequired);
-          case LONG:
-            return getIcebergType(icebergID, name, Types.LongType.get(), 
isRequired);
-          default:
-            throw new IllegalStateException("Invalid Long type found in ORC 
type attribute");
-        }
-      case FLOAT:
-        return getIcebergType(icebergID, name, Types.FloatType.get(), 
isRequired);
-      case DOUBLE:
-        return getIcebergType(icebergID, name, Types.DoubleType.get(), 
isRequired);
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return getIcebergType(icebergID, name, Types.StringType.get(), 
isRequired);
-      case BINARY:
-        String binaryAttributeValue = 
orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
-        BinaryType binaryType = binaryAttributeValue == null ? 
BinaryType.BINARY :
-            BinaryType.valueOf(binaryAttributeValue);
-        switch (binaryType) {
-          case UUID:
-            return getIcebergType(icebergID, name, Types.UUIDType.get(), 
isRequired);
-          case FIXED:
-            int fixedLength = 
Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH));
-            return getIcebergType(icebergID, name, 
Types.FixedType.ofLength(fixedLength), isRequired);
-          case BINARY:
-            return getIcebergType(icebergID, name, Types.BinaryType.get(), 
isRequired);
-          default:
-            throw new IllegalStateException("Invalid Binary type found in ORC 
type attribute");
-        }
-      case DATE:
-        return getIcebergType(icebergID, name, Types.DateType.get(), 
isRequired);
-      case TIMESTAMP:
-        return getIcebergType(icebergID, name, 
Types.TimestampType.withoutZone(), isRequired);
-      case TIMESTAMP_INSTANT:
-        return getIcebergType(icebergID, name, Types.TimestampType.withZone(), 
isRequired);
-      case DECIMAL:
-        return getIcebergType(icebergID, name,
-            Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()),
-            isRequired);
-      case STRUCT: {
-        List<String> fieldNames = orcType.getFieldNames();
-        List<TypeDescription> fieldTypes = orcType.getChildren();
-        List<Types.NestedField> fields = new ArrayList<>(fieldNames.size());
-        for (int c = 0; c < fieldNames.size(); ++c) {
-          String childName = fieldNames.get(c);
-          TypeDescription type = fieldTypes.get(c);
-          Types.NestedField field = convertOrcToIceberg(type, childName, 
nextID);
-          fields.add(field);
-        }
-
-        return getIcebergType(icebergID, name, Types.StructType.of(fields), 
isRequired);
-      }
-      case LIST: {
-        TypeDescription elementType = orcType.getChildren().get(0);
-        Types.NestedField element = convertOrcToIceberg(elementType, 
"element", nextID);
-
-        Types.ListType listTypeWithElem = isRequired(elementType) ?
-            Types.ListType.ofRequired(element.fieldId(), element.type()) :
-            Types.ListType.ofOptional(element.fieldId(), element.type());
-        return isRequired ?
-            Types.NestedField.required(icebergID, name, listTypeWithElem) :
-            Types.NestedField.optional(icebergID, name, listTypeWithElem);
-      }
-      case MAP: {
-        TypeDescription keyType = orcType.getChildren().get(0);
-        Types.NestedField key = convertOrcToIceberg(keyType, "key", nextID);
-        TypeDescription valueType = orcType.getChildren().get(1);
-        Types.NestedField value = convertOrcToIceberg(valueType, "value", 
nextID);
-
-        Types.MapType mapTypeWithKV = isRequired(valueType) ?
-            Types.MapType.ofRequired(key.fieldId(), value.fieldId(), 
key.type(), value.type()) :
-            Types.MapType.ofOptional(key.fieldId(), value.fieldId(), 
key.type(), value.type());
-
-        return getIcebergType(icebergID, name, mapTypeWithKV, isRequired);
-      }
-      default:
-        // We don't have an answer for union types.
-        throw new IllegalArgumentException("Can't handle " + orcType);
+      return !Boolean.parseBoolean(isRequiredStr);
     }
-  }
-
-  private static int getMaxIcebergId(TypeDescription originalOrcSchema) {
-    int maxId = icebergID(originalOrcSchema).orElse(0);
-    final List<TypeDescription> children = 
Optional.ofNullable(originalOrcSchema.getChildren())
-        .orElse(Collections.emptyList());
-    for (TypeDescription child : children) {
-      maxId = Math.max(maxId, getMaxIcebergId(child));
-    }
-
-    return maxId;
+    return true;
   }
 
   /**
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
new file mode 100644
index 0000000..6d1b127
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Generic visitor of an ORC Schema.
+ */
+public abstract class OrcSchemaVisitor<T> {
+
+  public static <T> List<T> visitSchema(TypeDescription schema, 
OrcSchemaVisitor<T> visitor) {
+    Preconditions.checkArgument(schema.getId() == 0, "TypeDescription must be 
root schema.");
+
+    List<TypeDescription> fields = schema.getChildren();
+    List<String> names = schema.getFieldNames();
+
+    return visitFields(fields, names, visitor);
+  }
+
+  public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> 
visitor) {
+    switch (schema.getCategory()) {
+      case STRUCT:
+        return visitRecord(schema, visitor);
+
+      case UNION:
+        throw new UnsupportedOperationException("Cannot handle " + schema);
+
+      case LIST:
+        return visitor.list(schema, visit(schema.getChildren().get(0), 
visitor));
+
+      case MAP:
+        return visitor.map(schema, visit(schema.getChildren().get(0), visitor),
+            visit(schema.getChildren().get(1), visitor));
+
+      default:
+        return visitor.primitive(schema);
+    }
+  }
+
+  private static <T> List<T> visitFields(List<TypeDescription> fields, 
List<String> names,
+                                         OrcSchemaVisitor<T> visitor) {
+    Preconditions.checkArgument(fields.size() == names.size(), "Not all fields 
have names in ORC struct");
+
+    List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
+    for (int i = 0; i < fields.size(); i++) {
+      TypeDescription field = fields.get(i);
+      String name = names.get(i);
+      visitor.beforeField(name, field);
+      try {
+        results.add(visit(field, visitor));
+      } finally {
+        visitor.afterField(name, field);
+      }
+    }
+    return results;
+  }
+
+  private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> 
visitor) {
+    List<TypeDescription> fields = record.getChildren();
+    List<String> names = record.getFieldNames();
+
+    return visitor.record(record, names, visitFields(fields, names, visitor));
+  }
+
+  public void beforeField(String name, TypeDescription type) {}
+
+  public void afterField(String name, TypeDescription type) {}
+
+  public T record(TypeDescription record, List<String> names, List<T> fields) {
+    return null;
+  }
+
+  public T list(TypeDescription array, T element) {
+    return null;
+  }
+
+  public T map(TypeDescription map, T key, T value) {
+    return null;
+  }
+
+  public T primitive(TypeDescription primitive) {
+    return null;
+  }
+}
diff --git 
a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java
index 175cddf..53b0c9f 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java
@@ -44,7 +44,7 @@ public abstract class OrcSchemaWithTypeVisitor<T> {
         Types.ListType list = iType != null ? iType.asListType() : null;
         return visitor.list(
             list, schema,
-            visit(list.elementType(), schema.getChildren().get(0), visitor));
+            visit(list != null ? list.elementType() : null, 
schema.getChildren().get(0), visitor));
 
       case MAP:
         Types.MapType map = iType != null ? iType.asMapType() : null;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java
new file mode 100644
index 0000000..6a6b895
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Converts an ORC schema to Iceberg.
+ */
+class OrcToIcebergVisitor extends 
OrcSchemaVisitor<Optional<Types.NestedField>> {
+
+  private final Deque<String> fieldNames;
+
+  OrcToIcebergVisitor() {
+    this.fieldNames = Lists.newLinkedList();
+  }
+
+  @Override
+  public void beforeField(String name, TypeDescription type) {
+    fieldNames.push(name);
+  }
+
+  @Override
+  public void afterField(String name, TypeDescription type) {
+    fieldNames.pop();
+  }
+
+  private String currentFieldName() {
+    return fieldNames.peek();
+  }
+
+  @Override
+  public Optional<Types.NestedField> record(TypeDescription record, 
List<String> names,
+                                            List<Optional<Types.NestedField>> 
fields) {
+    boolean isOptional = ORCSchemaUtil.isOptional(record);
+    Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(record);
+    if (!icebergIdOpt.isPresent() || 
fields.stream().noneMatch(Optional::isPresent)) {
+      return Optional.empty();
+    }
+
+    Types.StructType structType = Types.StructType.of(
+        
fields.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()));
+    return Optional.of(Types.NestedField.of(icebergIdOpt.get(), isOptional, 
currentFieldName(), structType));
+  }
+
+  @Override
+  public Optional<Types.NestedField> list(TypeDescription array,
+                                          Optional<Types.NestedField> element) 
{
+    boolean isOptional = ORCSchemaUtil.isOptional(array);
+    Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(array);
+
+    if (!icebergIdOpt.isPresent() || !element.isPresent()) {
+      return Optional.empty();
+    }
+
+    Types.NestedField foundElement = element.get();
+    Types.ListType listTypeWithElem = 
ORCSchemaUtil.isOptional(array.getChildren().get(0)) ?
+        Types.ListType.ofOptional(foundElement.fieldId(), foundElement.type()) 
:
+        Types.ListType.ofRequired(foundElement.fieldId(), foundElement.type());
+
+    return Optional.of(Types.NestedField.of(icebergIdOpt.get(), isOptional, 
currentFieldName(), listTypeWithElem));
+  }
+
+  @Override
+  public Optional<Types.NestedField> map(TypeDescription map, 
Optional<Types.NestedField> key,
+                                         Optional<Types.NestedField> value) {
+    boolean isOptional = ORCSchemaUtil.isOptional(map);
+    Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(map);
+
+    if (!icebergIdOpt.isPresent() || !key.isPresent() || !value.isPresent()) {
+      return Optional.empty();
+    }
+
+    Types.NestedField foundKey = key.get();
+    Types.NestedField foundValue = value.get();
+    Types.MapType mapTypeWithKV = 
ORCSchemaUtil.isOptional(map.getChildren().get(1)) ?
+        Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), 
foundKey.type(), foundValue.type()) :
+        Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), 
foundKey.type(), foundValue.type());
+
+    return Optional.of(Types.NestedField.of(icebergIdOpt.get(), isOptional, 
currentFieldName(), mapTypeWithKV));
+  }
+
+  @Override
+  public Optional<Types.NestedField> primitive(TypeDescription primitive) {
+    boolean isOptional = ORCSchemaUtil.isOptional(primitive);
+    Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(primitive);
+
+    if (!icebergIdOpt.isPresent()) {
+      return Optional.empty();
+    }
+
+    final Types.NestedField foundField;
+    int icebergID = icebergIdOpt.get();
+    String name = currentFieldName();
+    switch (primitive.getCategory()) {
+      case BOOLEAN:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.BooleanType.get());
+        break;
+      case BYTE:
+      case SHORT:
+      case INT:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.IntegerType.get());
+        break;
+      case LONG:
+        String longAttributeValue = 
primitive.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.LongType longType = longAttributeValue == null ?
+            ORCSchemaUtil.LongType.LONG : 
ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+        switch (longType) {
+          case TIME:
+            foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.TimeType.get());
+            break;
+          case LONG:
+            foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.LongType.get());
+            break;
+          default:
+            throw new IllegalStateException("Invalid Long type found in ORC 
type attribute");
+        }
+        break;
+      case FLOAT:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.FloatType.get());
+        break;
+      case DOUBLE:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.DoubleType.get());
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.StringType.get());
+        break;
+      case BINARY:
+        String binaryAttributeValue = 
primitive.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+        ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? 
ORCSchemaUtil.BinaryType.BINARY :
+            ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+        switch (binaryType) {
+          case UUID:
+            foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.UUIDType.get());
+            break;
+          case FIXED:
+            int fixedLength = 
Integer.parseInt(primitive.getAttributeValue(ORCSchemaUtil.ICEBERG_FIELD_LENGTH));
+            foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.FixedType.ofLength(fixedLength));
+            break;
+          case BINARY:
+            foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.BinaryType.get());
+            break;
+          default:
+            throw new IllegalStateException("Invalid Binary type found in ORC 
type attribute");
+        }
+        break;
+      case DATE:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.DateType.get());
+        break;
+      case TIMESTAMP:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.TimestampType.withoutZone());
+        break;
+      case TIMESTAMP_INSTANT:
+        foundField = Types.NestedField.of(icebergID, isOptional, name, 
Types.TimestampType.withZone());
+        break;
+      case DECIMAL:
+        foundField = Types.NestedField.of(icebergID, isOptional, name,
+            Types.DecimalType.of(primitive.getPrecision(), 
primitive.getScale()));
+        break;
+      default:
+        throw new IllegalArgumentException("Can't handle " + primitive);
+    }
+    return Optional.of(foundField);
+  }
+}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
index 461e1d4..269919c 100644
--- a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
@@ -25,6 +25,8 @@ import org.apache.orc.TypeDescription;
 import org.junit.Test;
 
 import static org.apache.iceberg.AssertHelpers.assertThrows;
+import static org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE;
+import static org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_REQUIRED_ATTRIBUTE;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.Assert.assertEquals;
@@ -209,4 +211,62 @@ public class TestORCSchemaUtil {
           ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema);
         });
   }
+
+  @Test
+  public void testSkipNonIcebergColumns() {
+    TypeDescription schema = TypeDescription.createStruct();
+    TypeDescription intCol = TypeDescription.createInt();
+    intCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "1");
+    intCol.setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, "true");
+    TypeDescription listCol = TypeDescription
+        .createList(TypeDescription.createMap(TypeDescription.createString(), 
TypeDescription.createDate()));
+    listCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "2");
+    schema.addField("intCol", intCol);
+    schema.addField("listCol", listCol);
+    TypeDescription stringKey = TypeDescription.createString();
+    stringKey.setAttribute(ICEBERG_ID_ATTRIBUTE, "3");
+    TypeDescription booleanVal = TypeDescription.createBoolean();
+    booleanVal.setAttribute(ICEBERG_ID_ATTRIBUTE, "4");
+    TypeDescription mapCol = TypeDescription.createMap(stringKey, booleanVal);
+    mapCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "5");
+    schema.addField("mapCol", mapCol);
+
+    Schema icebergSchema = ORCSchemaUtil.convert(schema);
+    Schema expectedSchema = new Schema(
+        required(1, "intCol", Types.IntegerType.get()),
+        // Skipped listCol since element has no Iceberg ID
+        optional(5, "mapCol", Types.MapType.ofOptional(3, 4,
+            Types.StringType.get(), Types.BooleanType.get()))
+    );
+    assertEquals("Schemas must match.", expectedSchema.asStruct(), 
icebergSchema.asStruct());
+
+    TypeDescription structCol = TypeDescription.createStruct();
+    structCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "7");
+    structCol.setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, "true");
+    TypeDescription binaryCol = TypeDescription.createBinary();
+    TypeDescription doubleCol = TypeDescription.createDouble();
+    doubleCol.setAttribute(ICEBERG_ID_ATTRIBUTE, "6");
+    doubleCol.setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, "true");
+    structCol.addField("binaryCol", binaryCol);
+    structCol.addField("doubleCol", doubleCol);
+    schema.addField("structCol", structCol);
+    TypeDescription stringKey2 = TypeDescription.createString();
+    stringKey2.setAttribute(ICEBERG_ID_ATTRIBUTE, "8");
+    TypeDescription mapCol2 = TypeDescription.createMap(stringKey2, 
TypeDescription.createDate());
+    mapCol2.setAttribute(ICEBERG_ID_ATTRIBUTE, "10");
+    schema.addField("mapCol2", mapCol2);
+
+    Schema icebergSchema2 = ORCSchemaUtil.convert(schema);
+    Schema expectedSchema2 = new Schema(
+        required(1, "intCol", Types.IntegerType.get()),
+        optional(5, "mapCol", Types.MapType.ofOptional(3, 4,
+            Types.StringType.get(), Types.BooleanType.get())),
+        required(7, "structCol", Types.StructType.of(
+            // Skipped binaryCol
+            required(6, "doubleCol", Types.DoubleType.get())
+        // Skipped mapCol2 since value has no Iceberg ID
+        ))
+    );
+    assertEquals("Schemas must match.", expectedSchema2.asStruct(), 
icebergSchema2.asStruct());
+  }
 }

Reply via email to