This is an automated email from the ASF dual-hosted git repository.
rymurr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new aa65c06 Add support for TimeType / UUIDType (#2739)
aa65c06 is described below
commit aa65c064361feb37b4b0c1bea7e9531e879d8d71
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Mon Jun 28 17:48:24 2021 +0200
Add support for TimeType / UUIDType (#2739)
---
.../org/apache/iceberg/arrow/ArrowSchemaUtil.java | 3 +
.../iceberg/arrow/vectorized/ArrowReader.java | 4 +-
.../GenericArrowVectorAccessorFactory.java | 39 ++++++++
.../arrow/vectorized/VectorizedArrowReader.java | 23 ++++-
.../parquet/VectorizedColumnIterator.java | 16 ++++
...orizedDictionaryEncodedParquetValuesReader.java | 39 ++++++++
.../vectorized/parquet/VectorizedPageIterator.java | 30 ++++++
.../VectorizedParquetDefinitionLevelReader.java | 90 ++++++++++++++++++
.../apache/iceberg/arrow/ArrowSchemaUtilTest.java | 8 +-
.../iceberg/arrow/vectorized/ArrowReaderTest.java | 104 ++++++++++++++++++++-
10 files changed, 346 insertions(+), 10 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
index 5bef417..fe7ecc9 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
@@ -98,6 +98,9 @@ public class ArrowSchemaUtil {
case TIME:
arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
break;
+ case UUID:
+ arrowType = new ArrowType.FixedSizeBinary(16);
+ break;
case TIMESTAMP:
arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND,
((Types.TimestampType) field.type()).shouldAdjustToUTC() ? "UTC" :
null);
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 503a273..2386ce5 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -80,8 +80,8 @@ import org.apache.parquet.schema.MessageType;
* <li>Columns with constant values are physically encoded as a
dictionary. The Arrow vector
* type is int32 instead of the type as per the schema.
* See https://github.com/apache/iceberg/issues/2484.</li>
- * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link
Types.MapType},
- * {@link Types.StructType}, {@link Types.UUIDType}, {@link
Types.FixedType} and
+ * <li>Data types: {@link Types.ListType}, {@link Types.MapType},
+ * {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
* <li>Iceberg v2 spec is not supported.
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index df4e216..77157ce 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -30,9 +30,11 @@ import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.ValueVector;
@@ -112,6 +114,7 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
case BSON:
return new DictionaryStringAccessor<>((IntVector) vector,
dictionary, stringFactorySupplier.get());
case INT_64:
+ case TIME_MICROS:
case TIMESTAMP_MILLIS:
case TIMESTAMP_MICROS:
return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
@@ -189,6 +192,10 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
return new StructAccessor<>(structVector,
structChildFactorySupplier.get());
+ } else if (vector instanceof TimeMicroVector) {
+ return new TimeMicroAccessor<>((TimeMicroVector) vector);
+ } else if (vector instanceof FixedSizeBinaryVector) {
+ return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
}
throw new UnsupportedOperationException("Unsupported vector: " +
vector.getClass());
}
@@ -469,6 +476,38 @@ public class GenericArrowVectorAccessorFactory<DecimalT,
Utf8StringT, ArrayT, Ch
}
}
+ private static class TimeMicroAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
{
+
+ private final TimeMicroVector vector;
+
+ TimeMicroAccessor(TimeMicroVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class FixedSizeBinaryAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
{
+
+ private final FixedSizeBinaryVector vector;
+
+ FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 5ed323f..e8d96b9 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -30,6 +30,7 @@ import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -108,6 +109,8 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
+ TIME_MICROS,
+ UUID,
DICTIONARY
}
@@ -169,6 +172,9 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
case TIMESTAMP_MILLIS:
vectorizedColumnIterator.nextBatchTimestampMillis(vec, typeWidth,
nullabilityHolder);
break;
+ case UUID:
+ vectorizedColumnIterator.nextBatchFixedSizeBinary(vec, typeWidth,
nullabilityHolder);
+ break;
}
}
}
@@ -178,6 +184,7 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
nullabilityHolder, icebergField.type());
}
+ @SuppressWarnings("MethodLength")
private void allocateFieldVector(boolean dictionaryEncodedVector) {
if (dictionaryEncodedVector) {
Field field = new Field(
@@ -240,6 +247,12 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
+ case TIME_MICROS:
+ this.vec = arrowField.createVector(rootAlloc);
+ ((TimeMicroVector) vec).allocateNew(batchSize);
+ this.readType = ReadType.LONG;
+ this.typeWidth = 8;
+ break;
case DECIMAL:
this.vec = arrowField.createVector(rootAlloc);
((DecimalVector) vec).allocateNew(batchSize);
@@ -269,11 +282,17 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
} else {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
- int len = ((Types.FixedType) icebergField.type()).length();
+ int len;
+ if (icebergField.type() instanceof Types.UUIDType) {
+ len = 16;
+ this.readType = ReadType.UUID;
+ } else {
+ len = ((Types.FixedType) icebergField.type()).length();
+ this.readType = ReadType.FIXED_WIDTH_BINARY;
+ }
this.vec = arrowField.createVector(rootAlloc);
vec.setInitialCapacity(batchSize * len);
vec.allocateNew();
- this.readType = ReadType.FIXED_WIDTH_BINARY;
this.typeWidth = len;
break;
case BINARY:
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index d963c45..2483563 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -175,6 +175,22 @@ public class VectorizedColumnIterator extends
BaseColumnIterator {
}
}
+ public void nextBatchFixedSizeBinary(
+ FieldVector fieldVector,
+ int typeWidth,
+ NullabilityHolder nullabilityHolder) {
+ int rowsReadSoFar = 0;
+ while (rowsReadSoFar < batchSize && hasNext()) {
+ advance();
+ int rowsInThisBatch =
+ vectorizedPageIterator.nextBatchFixedSizeBinary(fieldVector,
batchSize - rowsReadSoFar,
+ rowsReadSoFar, typeWidth, nullabilityHolder);
+ rowsReadSoFar += rowsInThisBatch;
+ this.triplesRead += rowsInThisBatch;
+ fieldVector.setValueCount(rowsReadSoFar);
+ }
+ }
+
public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder
nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 74d9e15..e0f476b 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -24,6 +24,7 @@ import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.parquet.column.Dictionary;
@@ -410,4 +411,42 @@ public class
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
currentCount -= num;
}
}
+
+ void readBatchOfDictionaryEncodedFixedSizeBinary(
+ FieldVector vector, int typeWidth, int startOffset,
+ int numValuesToRead, Dictionary dict,
+ NullabilityHolder nullabilityHolder) {
+ int left = numValuesToRead;
+ int idx = startOffset;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ for (int i = 0; i < num; i++) {
+ byte[] bytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
+ ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+ nullabilityHolder.setNotNull(idx);
+ idx++;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < num; i++) {
+ byte[] decimalBytes =
dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+ ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+ nullabilityHolder.setNotNull(idx);
+ idx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 9876962..381edda 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -369,6 +369,36 @@ public class VectorizedPageIterator extends
BasePageIterator {
return actualBatchSize;
}
+ public int nextBatchFixedSizeBinary(
+ final FieldVector vector, final int expectedBatchSize, final int
numValsInVector,
+ final int typeWidth, NullabilityHolder nullabilityHolder) {
+ final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+ if (actualBatchSize <= 0) {
+ return 0;
+ }
+ if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
+
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedSizeBinary(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ dictionaryEncodedValuesReader,
+ dictionary);
+ } else {
+ vectorizedDefinitionLevelReader.readBatchOfFixedSizeBinary(
+ vector,
+ numValsInVector,
+ typeWidth,
+ actualBatchSize,
+ nullabilityHolder,
+ plainValuesReader);
+ }
+ triplesRead += actualBatchSize;
+ this.hasNext = triplesRead < triplesCount;
+ return actualBatchSize;
+ }
+
/**
* Method for reading a batch of variable width data type (ENUM, JSON, UTF8,
BSON).
*/
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index d330f09..34a996e 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -26,6 +26,7 @@ import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
@@ -656,6 +657,50 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
}
}
+ public void readBatchOfFixedSizeBinary(
+ final FieldVector vector, final int startOffset,
+ final int typeWidth, final int numValsToRead, NullabilityHolder
nullabilityHolder,
+ ValuesAsBytesReader valuesReader) {
+ int bufferIdx = startOffset;
+ int left = numValsToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ byte[] byteArray = new byte[typeWidth];
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+ for (int i = 0; i < num; i++) {
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ ((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
+ nullabilityHolder.setNotNull(bufferIdx);
+ bufferIdx++;
+ }
+ } else {
+ setNulls(nullabilityHolder, bufferIdx, num,
vector.getValidityBuffer());
+ bufferIdx += num;
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < num; ++i) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
+ ((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
+ nullabilityHolder.setNotNull(bufferIdx);
+ } else {
+ setNull(nullabilityHolder, bufferIdx,
vector.getValidityBuffer());
+ }
+ bufferIdx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
+
public void readBatchOfDictionaryEncodedFixedLengthDecimals(
final FieldVector vector,
final int startOffset,
@@ -701,6 +746,51 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
}
}
+ public void readBatchOfDictionaryEncodedFixedSizeBinary(
+ final FieldVector vector,
+ final int startOffset,
+ final int typeWidth,
+ final int numValsToRead,
+ NullabilityHolder nullabilityHolder,
+ VectorizedDictionaryEncodedParquetValuesReader
dictionaryEncodedValuesReader,
+ Dictionary dict) {
+ int idx = startOffset;
+ int left = numValsToRead;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int num = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == maxDefLevel) {
+
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedFixedSizeBinary(vector,
typeWidth, idx,
+ num, dict, nullabilityHolder);
+ } else {
+ setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
+ }
+ idx += num;
+ break;
+ case PACKED:
+ for (int i = 0; i < num; i++) {
+ if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+ byte[] bytes =
dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytes();
+ byte[] vectorBytes = new byte[typeWidth];
+ System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
+ ((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
+ nullabilityHolder.setNotNull(idx);
+ } else {
+ setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+ }
+ idx++;
+ }
+ break;
+ }
+ left -= num;
+ currentCount -= num;
+ }
+ }
+
public void readBatchVarWidth(
final FieldVector vector,
final int startOffset,
diff --git
a/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
b/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
index 7440ed3..99d0e3e 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
@@ -56,6 +56,7 @@ public class ArrowSchemaUtilTest {
private static final String STRUCT_FIELD = "st";
private static final String LIST_FIELD = "lt";
private static final String MAP_FIELD = "mt";
+ private static final String UUID_FIELD = "uu";
@Test
public void convertPrimitive() {
@@ -74,7 +75,8 @@ public class ArrowSchemaUtilTest {
Types.NestedField.optional(12, LIST_FIELD,
Types.ListType.ofOptional(13, Types.IntegerType.get())),
Types.NestedField.required(14, MAP_FIELD, Types.MapType.ofOptional(15,
16,
StringType.get(), IntegerType.get())),
- Types.NestedField.optional(17, FIXED_WIDTH_BINARY_FIELD,
Types.FixedType.ofLength(10)));
+ Types.NestedField.optional(17, FIXED_WIDTH_BINARY_FIELD,
Types.FixedType.ofLength(10)),
+ Types.NestedField.optional(18, UUID_FIELD, Types.UUIDType.get()));
org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
@@ -171,6 +173,10 @@ public class ArrowSchemaUtilTest {
Assert.assertEquals(MAP_FIELD, field.getName());
Assert.assertEquals(ArrowType.ArrowTypeID.Map, arrowType.getTypeID());
break;
+ case UUID:
+ Assert.assertEquals(UUID_FIELD, field.getName());
+ Assert.assertEquals(ArrowType.FixedSizeBinary.TYPE_TYPE,
arrowType.getTypeID());
+ break;
default:
throw new UnsupportedOperationException("Check not implemented for
type: " + iceberg);
}
diff --git
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
index 6829173..43f9df5 100644
---
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
+++
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
@@ -34,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -41,9 +43,11 @@ import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.VarBinaryVector;
@@ -74,6 +78,8 @@ import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
+import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -112,7 +118,11 @@ public class ArrowReaderTest {
"bytes_nullable",
"date",
"date_nullable",
- "int_promotion"
+ "int_promotion",
+ "time",
+ "time_nullable",
+ "uuid",
+ "uuid_nullable"
);
@Rule
@@ -343,6 +353,7 @@ public class ArrowReaderTest {
assertEquals(expectedTotalRows, totalRows);
}
+ @SuppressWarnings("MethodLength")
private void checkColumnarBatch(
int expectedNumRows,
List<GenericRecord> expectedRows,
@@ -364,6 +375,7 @@ public class ArrowReaderTest {
(records, i) -> records.get(i).getField("timestamp"),
(array, i) -> timestampFromMicros(array.getLong(i))
);
+
checkColumnarArrayValues(
expectedNumRows, expectedRows, batch,
columnNameToIndex.get("timestamp_nullable"),
columnSet, "timestamp_nullable",
@@ -484,6 +496,33 @@ public class ArrowReaderTest {
(records, i) -> records.get(i).getField("int_promotion"),
ColumnVector::getInt
);
+
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("uuid"),
+ columnSet, "uuid",
+ (records, i) -> records.get(i).getField("uuid"),
+ ColumnVector::getBinary
+
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch,
columnNameToIndex.get("uuid_nullable"),
+ columnSet, "uuid_nullable",
+ (records, i) -> records.get(i).getField("uuid_nullable"),
+ ColumnVector::getBinary
+ );
+
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch, columnNameToIndex.get("time"),
+ columnSet, "time",
+ (records, i) -> records.get(i).getField("time"),
+ (array, i) -> LocalTime.ofNanoOfDay(array.getLong(i) * 1000)
+ );
+ checkColumnarArrayValues(
+ expectedNumRows, expectedRows, batch,
columnNameToIndex.get("time_nullable"),
+ columnSet, "time_nullable",
+ (records, i) -> records.get(i).getField("time_nullable"),
+ (array, i) -> LocalTime.ofNanoOfDay(array.getLong(i) * 1000)
+ );
}
private static void checkColumnarArrayValues(
@@ -500,7 +539,9 @@ public class ArrowReaderTest {
for (int i = 0; i < expectedNumRows; i++) {
Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
Object actualValue = vectorValueExtractor.apply(columnVector, i);
- assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+ // we need to use assertThat() here because it does a
java.util.Objects.deepEquals() and that
+ // is relevant for byte[]
+ Assertions.assertThat(actualValue).as("Row#" + i + "
mismatches").isEqualTo(expectedValue);
}
}
}
@@ -539,7 +580,11 @@ public class ArrowReaderTest {
Types.NestedField.optional(18, "bytes_nullable",
Types.BinaryType.get()),
Types.NestedField.required(19, "date", Types.DateType.get()),
Types.NestedField.optional(20, "date_nullable", Types.DateType.get()),
- Types.NestedField.required(21, "int_promotion",
Types.IntegerType.get())
+ Types.NestedField.required(21, "int_promotion",
Types.IntegerType.get()),
+ Types.NestedField.required(22, "time", Types.TimeType.get()),
+ Types.NestedField.optional(23, "time_nullable", Types.TimeType.get()),
+ Types.NestedField.required(24, "uuid", Types.UUIDType.get()),
+ Types.NestedField.optional(25, "uuid_nullable", Types.UUIDType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
@@ -617,7 +662,15 @@ public class ArrowReaderTest {
new Field(
"date_nullable", new FieldType(true, MinorType.DATEDAY.getType(),
null), null),
new Field(
- "int_promotion", new FieldType(false, MinorType.INT.getType(),
null), null)
+ "int_promotion", new FieldType(false, MinorType.INT.getType(),
null), null),
+ new Field(
+ "time", new FieldType(false, MinorType.TIMEMICRO.getType(), null),
null),
+ new Field(
+ "time_nullable", new FieldType(true,
MinorType.TIMEMICRO.getType(), null), null),
+ new Field(
+ "uuid", new FieldType(false, new ArrowType.FixedSizeBinary(16),
null), null),
+ new Field(
+ "uuid_nullable", new FieldType(true, new
ArrowType.FixedSizeBinary(16), null), null)
);
List<Field> filteredFields = allFields.stream()
.filter(f -> columnSet.contains(f.getName()))
@@ -650,6 +703,12 @@ public class ArrowReaderTest {
rec.setField("date", LocalDate.of(2020, 1, 1).plus(i, ChronoUnit.DAYS));
rec.setField("date_nullable", LocalDate.of(2020, 1, 1).plus(i,
ChronoUnit.DAYS));
rec.setField("int_promotion", i);
+ rec.setField("time", LocalTime.of(11, i));
+ rec.setField("time_nullable", LocalTime.of(11, i));
+ ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID());
+ byte[] uuid = bb.array();
+ rec.setField("uuid", uuid);
+ rec.setField("uuid_nullable", uuid);
records.add(rec);
}
return records;
@@ -680,6 +739,12 @@ public class ArrowReaderTest {
rec.setField("date", LocalDate.of(2020, 1, 1));
rec.setField("date_nullable", LocalDate.of(2020, 1, 1));
rec.setField("int_promotion", 1);
+ rec.setField("time", LocalTime.of(11, 30));
+ rec.setField("time_nullable", LocalTime.of(11, 30));
+ ByteBuffer bb =
UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"));
+ byte[] uuid = bb.array();
+ rec.setField("uuid", uuid);
+ rec.setField("uuid_nullable", uuid);
records.add(rec);
}
return records;
@@ -753,6 +818,10 @@ public class ArrowReaderTest {
assertEqualsForField(root, columnSet, "bytes_nullable",
VarBinaryVector.class);
assertEqualsForField(root, columnSet, "date", DateDayVector.class);
assertEqualsForField(root, columnSet, "date_nullable",
DateDayVector.class);
+ assertEqualsForField(root, columnSet, "time", TimeMicroVector.class);
+ assertEqualsForField(root, columnSet, "time_nullable",
TimeMicroVector.class);
+ assertEqualsForField(root, columnSet, "uuid", FixedSizeBinaryVector.class);
+ assertEqualsForField(root, columnSet, "uuid_nullable",
FixedSizeBinaryVector.class);
assertEqualsForField(root, columnSet, "int_promotion", IntVector.class);
}
@@ -875,6 +944,29 @@ public class ArrowReaderTest {
(records, i) -> records.get(i).getField("int_promotion"),
(vector, i) -> ((IntVector) vector).get(i)
);
+
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "uuid",
+ (records, i) -> records.get(i).getField("uuid"),
+ (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)
+ );
+
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "uuid_nullable",
+ (records, i) -> records.get(i).getField("uuid_nullable"),
+ (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)
+ );
+
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "time",
+ (records, i) -> records.get(i).getField("time"),
+ (vector, i) -> LocalTime.ofNanoOfDay(((TimeMicroVector) vector).get(i)
* 1000)
+ );
+ checkVectorValues(
+ expectedNumRows, expectedRows, root, columnSet, "time_nullable",
+ (records, i) -> records.get(i).getField("time_nullable"),
+ (vector, i) -> LocalTime.ofNanoOfDay(((TimeMicroVector) vector).get(i)
* 1000)
+ );
}
private static void checkVectorValues(
@@ -891,7 +983,9 @@ public class ArrowReaderTest {
for (int i = 0; i < expectedNumRows; i++) {
Object expectedValue = expectedValueExtractor.apply(expectedRows, i);
Object actualValue = vectorValueExtractor.apply(vector, i);
- assertEquals("Row#" + i + " mismatches", expectedValue, actualValue);
+ // we need to use assertThat() here because it does a
java.util.Objects.deepEquals() and that
+ // is relevant for byte[]
+ Assertions.assertThat(actualValue).as("Row#" + i + "
mismatches").isEqualTo(expectedValue);
}
}
}