This is an automated email from the ASF dual-hosted git repository.

rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new aa65c06  Add support for TimeType / UUIDType (#2739)
aa65c06 is described below

commit aa65c064361feb37b4b0c1bea7e9531e879d8d71
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Mon Jun 28 17:48:24 2021 +0200

    Add support for TimeType / UUIDType (#2739)
---
 .../org/apache/iceberg/arrow/ArrowSchemaUtil.java  |   3 +
 .../iceberg/arrow/vectorized/ArrowReader.java      |   4 +-
 .../GenericArrowVectorAccessorFactory.java         |  39 ++++++++
 .../arrow/vectorized/VectorizedArrowReader.java    |  23 ++++-
 .../parquet/VectorizedColumnIterator.java          |  16 ++++
 ...orizedDictionaryEncodedParquetValuesReader.java |  39 ++++++++
 .../vectorized/parquet/VectorizedPageIterator.java |  30 ++++++
 .../VectorizedParquetDefinitionLevelReader.java    |  90 ++++++++++++++++++
 .../apache/iceberg/arrow/ArrowSchemaUtilTest.java  |   8 +-
 .../iceberg/arrow/vectorized/ArrowReaderTest.java  | 104 ++++++++++++++++++++-
 10 files changed, 346 insertions(+), 10 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java 
b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
index 5bef417..fe7ecc9 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
@@ -98,6 +98,9 @@ public class ArrowSchemaUtil {
       case TIME:
         arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
         break;
+      case UUID:
+        arrowType = new ArrowType.FixedSizeBinary(16);
+        break;
       case TIMESTAMP:
         arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND,
             ((Types.TimestampType) field.type()).shouldAdjustToUTC() ? "UTC" : 
null);
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 503a273..2386ce5 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -80,8 +80,8 @@ import org.apache.parquet.schema.MessageType;
  *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
  *     type is int32 instead of the type as per the schema.
  *     See https://github.com/apache/iceberg/issues/2484.</li>
- *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
- *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     <li>Data types: {@link Types.ListType}, {@link Types.MapType},
+ *     {@link Types.StructType}, {@link Types.FixedType} and
  *     {@link Types.DecimalType}
  *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
  *     <li>Iceberg v2 spec is not supported.
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index df4e216..77157ce 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -30,9 +30,11 @@ import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
 import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.ValueVector;
@@ -112,6 +114,7 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
         case BSON:
           return new DictionaryStringAccessor<>((IntVector) vector, 
dictionary, stringFactorySupplier.get());
         case INT_64:
+        case TIME_MICROS:
         case TIMESTAMP_MILLIS:
         case TIMESTAMP_MICROS:
           return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
@@ -189,6 +192,10 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
     } else if (vector instanceof StructVector) {
       StructVector structVector = (StructVector) vector;
       return new StructAccessor<>(structVector, 
structChildFactorySupplier.get());
+    } else if (vector instanceof TimeMicroVector) {
+      return new TimeMicroAccessor<>((TimeMicroVector) vector);
+    } else if (vector instanceof FixedSizeBinaryVector) {
+      return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
     }
     throw new UnsupportedOperationException("Unsupported vector: " + 
vector.getClass());
   }
@@ -469,6 +476,38 @@ public class GenericArrowVectorAccessorFactory<DecimalT, 
Utf8StringT, ArrayT, Ch
     }
   }
 
+  private static class TimeMicroAccessor<DecimalT, Utf8StringT, ArrayT, 
ChildVectorT extends AutoCloseable>
+      extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> 
{
+
+    private final TimeMicroVector vector;
+
+    TimeMicroAccessor(TimeMicroVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class FixedSizeBinaryAccessor<DecimalT, Utf8StringT, ArrayT, 
ChildVectorT extends AutoCloseable>
+      extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> 
{
+
+    private final FixedSizeBinaryVector vector;
+
+    FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    public byte[] getBinary(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
   private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT, 
ChildVectorT extends AutoCloseable>
           extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, 
ChildVectorT> {
 
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 5ed323f..e8d96b9 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -30,6 +30,7 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
 import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -108,6 +109,8 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
     FLOAT,
     DOUBLE,
     TIMESTAMP_MILLIS,
+    TIME_MICROS,
+    UUID,
     DICTIONARY
   }
 
@@ -169,6 +172,9 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
           case TIMESTAMP_MILLIS:
             vectorizedColumnIterator.nextBatchTimestampMillis(vec, typeWidth, 
nullabilityHolder);
             break;
+          case UUID:
+            vectorizedColumnIterator.nextBatchFixedSizeBinary(vec, typeWidth, 
nullabilityHolder);
+            break;
         }
       }
     }
@@ -178,6 +184,7 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
         nullabilityHolder, icebergField.type());
   }
 
+  @SuppressWarnings("MethodLength")
   private void allocateFieldVector(boolean dictionaryEncodedVector) {
     if (dictionaryEncodedVector) {
       Field field = new Field(
@@ -240,6 +247,12 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
             this.readType = ReadType.LONG;
             this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
             break;
+          case TIME_MICROS:
+            this.vec = arrowField.createVector(rootAlloc);
+            ((TimeMicroVector) vec).allocateNew(batchSize);
+            this.readType = ReadType.LONG;
+            this.typeWidth = 8;
+            break;
           case DECIMAL:
             this.vec = arrowField.createVector(rootAlloc);
             ((DecimalVector) vec).allocateNew(batchSize);
@@ -269,11 +282,17 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
       } else {
         switch (primitive.getPrimitiveTypeName()) {
           case FIXED_LEN_BYTE_ARRAY:
-            int len = ((Types.FixedType) icebergField.type()).length();
+            int len;
+            if (icebergField.type() instanceof Types.UUIDType) {
+              len = 16;
+              this.readType = ReadType.UUID;
+            } else {
+              len = ((Types.FixedType) icebergField.type()).length();
+              this.readType = ReadType.FIXED_WIDTH_BINARY;
+            }
             this.vec = arrowField.createVector(rootAlloc);
             vec.setInitialCapacity(batchSize * len);
             vec.allocateNew();
-            this.readType = ReadType.FIXED_WIDTH_BINARY;
             this.typeWidth = len;
             break;
           case BINARY:
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index d963c45..2483563 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -175,6 +175,22 @@ public class VectorizedColumnIterator extends 
BaseColumnIterator {
     }
   }
 
+  public void nextBatchFixedSizeBinary(
+      FieldVector fieldVector,
+      int typeWidth,
+      NullabilityHolder nullabilityHolder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch =
+          vectorizedPageIterator.nextBatchFixedSizeBinary(fieldVector, 
batchSize - rowsReadSoFar,
+              rowsReadSoFar, typeWidth, nullabilityHolder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.triplesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
   public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder 
nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 74d9e15..e0f476b 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -24,6 +24,7 @@ import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.BitVectorHelper;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
 import org.apache.parquet.column.Dictionary;
@@ -410,4 +411,42 @@ public class 
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
       currentCount -= num;
     }
   }
+
+  void readBatchOfDictionaryEncodedFixedSizeBinary(
+      FieldVector vector, int typeWidth, int startOffset,
+      int numValuesToRead, Dictionary dict,
+      NullabilityHolder nullabilityHolder) {
+    int left = numValuesToRead;
+    int idx = startOffset;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < num; i++) {
+            byte[] bytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
+            ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            byte[] decimalBytes = 
dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+            ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
 }
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 9876962..381edda 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -369,6 +369,36 @@ public class VectorizedPageIterator extends 
BasePageIterator {
     return actualBatchSize;
   }
 
+  public int nextBatchFixedSizeBinary(
+      final FieldVector vector, final int expectedBatchSize, final int 
numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
+      
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedSizeBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfFixedSizeBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
   /**
    * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, 
BSON).
    */
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index d330f09..34a996e 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -26,6 +26,7 @@ import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.BitVectorHelper;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
@@ -656,6 +657,50 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
     }
   }
 
+  public void readBatchOfFixedSizeBinary(
+      final FieldVector vector, final int startOffset,
+      final int typeWidth, final int numValsToRead, NullabilityHolder 
nullabilityHolder,
+      ValuesAsBytesReader valuesReader) {
+    int bufferIdx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      byte[] byteArray = new byte[typeWidth];
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < num; i++) {
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              ((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
+              nullabilityHolder.setNotNull(bufferIdx);
+              bufferIdx++;
+            }
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, num, 
vector.getValidityBuffer());
+            bufferIdx += num;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+              ((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
+              nullabilityHolder.setNotNull(bufferIdx);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, 
vector.getValidityBuffer());
+            }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
   public void readBatchOfDictionaryEncodedFixedLengthDecimals(
       final FieldVector vector,
       final int startOffset,
@@ -701,6 +746,51 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
     }
   }
 
+  public void readBatchOfDictionaryEncodedFixedSizeBinary(
+      final FieldVector vector,
+      final int startOffset,
+      final int typeWidth,
+      final int numValsToRead,
+      NullabilityHolder nullabilityHolder,
+      VectorizedDictionaryEncodedParquetValuesReader 
dictionaryEncodedValuesReader,
+      Dictionary dict) {
+    int idx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedFixedSizeBinary(vector,
 typeWidth, idx,
+                num, dict, nullabilityHolder);
+          } else {
+            setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
+          }
+          idx += num;
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              byte[] bytes = 
dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytes();
+              byte[] vectorBytes = new byte[typeWidth];
+              System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
+              ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+              nullabilityHolder.setNotNull(idx);
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+            }
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
   public void readBatchVarWidth(
       final FieldVector vector,
       final int startOffset,
diff --git 
a/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java 
b/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
index 7440ed3..99d0e3e 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
@@ -56,6 +56,7 @@ public class ArrowSchemaUtilTest {
   private static final String STRUCT_FIELD = "st";
   private static final String LIST_FIELD = "lt";
   private static final String MAP_FIELD = "mt";
+  private static final String UUID_FIELD = "uu";
 
   @Test
   public void convertPrimitive() {
@@ -74,7 +75,8 @@ public class ArrowSchemaUtilTest {
         Types.NestedField.optional(12, LIST_FIELD, 
Types.ListType.ofOptional(13, Types.IntegerType.get())),
         Types.NestedField.required(14, MAP_FIELD, Types.MapType.ofOptional(15, 
16,
             StringType.get(), IntegerType.get())),
-        Types.NestedField.optional(17, FIXED_WIDTH_BINARY_FIELD, 
Types.FixedType.ofLength(10)));
+        Types.NestedField.optional(17, FIXED_WIDTH_BINARY_FIELD, 
Types.FixedType.ofLength(10)),
+        Types.NestedField.optional(18, UUID_FIELD, Types.UUIDType.get()));
 
     org.apache.arrow.vector.types.pojo.Schema arrow = 
ArrowSchemaUtil.convert(iceberg);
 
@@ -171,6 +173,10 @@ public class ArrowSchemaUtilTest {
         Assert.assertEquals(MAP_FIELD, field.getName());
         Assert.assertEquals(ArrowType.ArrowTypeID.Map, arrowType.getTypeID());
         break;
+      case UUID:
+        Assert.assertEquals(UUID_FIELD, field.getName());
+        Assert.assertEquals(ArrowType.FixedSizeBinary.TYPE_TYPE, 
arrowType.getTypeID());
+        break;
       default:
         throw new UnsupportedOperationException("Check not implemented for 
type: " + iceberg);
     }
diff --git 
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java 
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
index 6829173..43f9df5 100644
--- 
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
+++ 
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
@@ -34,6 +35,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -41,9 +43,11 @@ import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
 import org.apache.arrow.vector.TimeStampMicroTZVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.VarBinaryVector;
@@ -74,6 +78,8 @@ import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
+import org.assertj.core.api.Assertions;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -112,7 +118,11 @@ public class ArrowReaderTest {
           "bytes_nullable",
           "date",
           "date_nullable",
-          "int_promotion"
+          "int_promotion",
+          "time",
+          "time_nullable",
+          "uuid",
+          "uuid_nullable"
       );
 
   @Rule
@@ -343,6 +353,7 @@ public class ArrowReaderTest {
     assertEquals(expectedTotalRows, totalRows);
   }
 
+  @SuppressWarnings("MethodLength")
   private void checkColumnarBatch(
       int expectedNumRows,
       List<GenericRecord> expectedRows,
@@ -364,6 +375,7 @@ public class ArrowReaderTest {
         (records, i) -> records.get(i).getField("timestamp"),
         (array, i) -> timestampFromMicros(array.getLong(i))
     );
+
     checkColumnarArrayValues(
         expectedNumRows, expectedRows, batch, 
columnNameToIndex.get("timestamp_nullable"),
         columnSet, "timestamp_nullable",
@@ -484,6 +496,33 @@ public class ArrowReaderTest {
         (records, i) -> records.get(i).getField("int_promotion"),
         ColumnVector::getInt
     );
+
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("uuid"),
+        columnSet, "uuid",
+        (records, i) -> records.get(i).getField("uuid"),
+        ColumnVector::getBinary
+
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, 
columnNameToIndex.get("uuid_nullable"),
+        columnSet, "uuid_nullable",
+        (records, i) -> records.get(i).getField("uuid_nullable"),
+        ColumnVector::getBinary
+    );
+
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, columnNameToIndex.get("time"),
+        columnSet, "time",
+        (records, i) -> records.get(i).getField("time"),
+        (array, i) -> LocalTime.ofNanoOfDay(array.getLong(i) * 1000)
+    );
+    checkColumnarArrayValues(
+        expectedNumRows, expectedRows, batch, 
columnNameToIndex.get("time_nullable"),
+        columnSet, "time_nullable",
+        (records, i) -> records.get(i).getField("time_nullable"),
+        (array, i) -> LocalTime.ofNanoOfDay(array.getLong(i) * 1000)
+    );
   }
 
   private static void checkColumnarArrayValues(
@@ -500,7 +539,9 @@ public class ArrowReaderTest {
       for (int i = 0; i < expectedNumRows; i++) {
         Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
         Object actualValue = vectorValueExtractor.apply(columnVector, i);
-        assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+        // we need to use assertThat() here because it does a 
java.util.Objects.deepEquals() and that
+        // is relevant for byte[]
+        Assertions.assertThat(actualValue).as("Row#" + i + " 
mismatches").isEqualTo(expectedValue);
       }
     }
   }
@@ -539,7 +580,11 @@ public class ArrowReaderTest {
         Types.NestedField.optional(18, "bytes_nullable", 
Types.BinaryType.get()),
         Types.NestedField.required(19, "date", Types.DateType.get()),
         Types.NestedField.optional(20, "date_nullable", Types.DateType.get()),
-        Types.NestedField.required(21, "int_promotion", 
Types.IntegerType.get())
+        Types.NestedField.required(21, "int_promotion", 
Types.IntegerType.get()),
+        Types.NestedField.required(22, "time", Types.TimeType.get()),
+        Types.NestedField.optional(23, "time_nullable", Types.TimeType.get()),
+        Types.NestedField.required(24, "uuid", Types.UUIDType.get()),
+        Types.NestedField.optional(25, "uuid_nullable", Types.UUIDType.get())
     );
 
     PartitionSpec spec = PartitionSpec.builderFor(schema)
@@ -617,7 +662,15 @@ public class ArrowReaderTest {
         new Field(
             "date_nullable", new FieldType(true, MinorType.DATEDAY.getType(), 
null), null),
         new Field(
-            "int_promotion", new FieldType(false, MinorType.INT.getType(), 
null), null)
+            "int_promotion", new FieldType(false, MinorType.INT.getType(), 
null), null),
+        new Field(
+            "time", new FieldType(false, MinorType.TIMEMICRO.getType(), null), 
null),
+        new Field(
+            "time_nullable", new FieldType(true, 
MinorType.TIMEMICRO.getType(), null), null),
+        new Field(
+            "uuid", new FieldType(false, new ArrowType.FixedSizeBinary(16), 
null), null),
+        new Field(
+            "uuid_nullable", new FieldType(true, new 
ArrowType.FixedSizeBinary(16), null), null)
     );
     List<Field> filteredFields = allFields.stream()
         .filter(f -> columnSet.contains(f.getName()))
@@ -650,6 +703,12 @@ public class ArrowReaderTest {
       rec.setField("date", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
       rec.setField("date_nullable", LocalDate.of(2020, 1, 1).plus(i, 
ChronoUnit.DAYS));
       rec.setField("int_promotion", i);
+      rec.setField("time", LocalTime.of(11, i));
+      rec.setField("time_nullable", LocalTime.of(11, i));
+      ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID());
+      byte[] uuid = bb.array();
+      rec.setField("uuid", uuid);
+      rec.setField("uuid_nullable", uuid);
       records.add(rec);
     }
     return records;
@@ -680,6 +739,12 @@ public class ArrowReaderTest {
       rec.setField("date", LocalDate.of(2020, 1, 1));
       rec.setField("date_nullable", LocalDate.of(2020, 1, 1));
       rec.setField("int_promotion", 1);
+      rec.setField("time", LocalTime.of(11, 30));
+      rec.setField("time_nullable", LocalTime.of(11, 30));
+      ByteBuffer bb = 
UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"));
+      byte[] uuid = bb.array();
+      rec.setField("uuid", uuid);
+      rec.setField("uuid_nullable", uuid);
       records.add(rec);
     }
     return records;
@@ -753,6 +818,10 @@ public class ArrowReaderTest {
     assertEqualsForField(root, columnSet, "bytes_nullable", 
VarBinaryVector.class);
     assertEqualsForField(root, columnSet, "date", DateDayVector.class);
     assertEqualsForField(root, columnSet, "date_nullable", 
DateDayVector.class);
+    assertEqualsForField(root, columnSet, "time", TimeMicroVector.class);
+    assertEqualsForField(root, columnSet, "time_nullable", 
TimeMicroVector.class);
+    assertEqualsForField(root, columnSet, "uuid", FixedSizeBinaryVector.class);
+    assertEqualsForField(root, columnSet, "uuid_nullable", 
FixedSizeBinaryVector.class);
     assertEqualsForField(root, columnSet, "int_promotion", IntVector.class);
   }
 
@@ -875,6 +944,29 @@ public class ArrowReaderTest {
         (records, i) -> records.get(i).getField("int_promotion"),
         (vector, i) -> ((IntVector) vector).get(i)
     );
+
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "uuid",
+        (records, i) -> records.get(i).getField("uuid"),
+        (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)
+    );
+
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "uuid_nullable",
+        (records, i) -> records.get(i).getField("uuid_nullable"),
+        (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)
+    );
+
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "time",
+        (records, i) -> records.get(i).getField("time"),
+        (vector, i) -> LocalTime.ofNanoOfDay(((TimeMicroVector) vector).get(i) 
* 1000)
+    );
+    checkVectorValues(
+        expectedNumRows, expectedRows, root, columnSet, "time_nullable",
+        (records, i) -> records.get(i).getField("time_nullable"),
+        (vector, i) -> LocalTime.ofNanoOfDay(((TimeMicroVector) vector).get(i) 
* 1000)
+    );
   }
 
   private static void checkVectorValues(
@@ -891,7 +983,9 @@ public class ArrowReaderTest {
       for (int i = 0; i < expectedNumRows; i++) {
         Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
         Object actualValue = vectorValueExtractor.apply(vector, i);
-        assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+        // we need to use assertThat() here because it does a 
java.util.Objects.deepEquals() and that
+        // is relevant for byte[]
+        Assertions.assertThat(actualValue).as("Row#" + i + " 
mismatches").isEqualTo(expectedValue);
       }
     }
   }

Reply via email to