This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c2a2fcdfe [parquet] Fix parquet timestamp read (#3662)
c2a2fcdfe is described below
commit c2a2fcdfe1faa1ccf4b63954a453e05e62f7e562
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 3 12:09:59 2024 +0800
[parquet] Fix parquet timestamp read (#3662)
---
.../format/parquet/reader/ArrayColumnReader.java | 518 ---------------------
.../parquet/reader/BaseVectorizedColumnReader.java | 291 ------------
.../format/parquet/reader/MapColumnReader.java | 65 ---
.../reader/NestedPrimitiveColumnReader.java | 36 +-
.../parquet/reader/ParquetDataColumnReader.java | 17 +-
.../reader/ParquetDataColumnReaderFactory.java | 92 ++--
.../format/parquet/ParquetReadWriteTest.java | 24 +-
7 files changed, 97 insertions(+), 946 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
deleted file mode 100644
index 3a602783a..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.columnar.VectorizedColumnBatch;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
-import org.apache.paimon.data.columnar.heap.HeapByteVector;
-import org.apache.paimon.data.columnar.heap.HeapBytesVector;
-import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
-import org.apache.paimon.data.columnar.heap.HeapFloatVector;
-import org.apache.paimon.data.columnar.heap.HeapIntVector;
-import org.apache.paimon.data.columnar.heap.HeapLongVector;
-import org.apache.paimon.data.columnar.heap.HeapShortVector;
-import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.TimestampType;
-
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
-
-/** Array {@link ColumnReader}. TODO Currently ARRAY type only support non
nested case. */
-public class ArrayColumnReader extends BaseVectorizedColumnReader {
-
- // The value read in last time
- private Object lastValue;
-
- // flag to indicate if there is no data in parquet data page
- private boolean eof = false;
-
- // flag to indicate if it's the first time to read parquet data page with
this instance
- boolean isFirstRow = true;
-
- public ArrayColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean isUtcTimestamp,
- Type type,
- DataType dataType)
- throws IOException {
- super(descriptor, pageReader, isUtcTimestamp, type, dataType);
- }
-
- @Override
- public void readToVector(int readNumber, WritableColumnVector vector) {
- HeapArrayVector lcv = (HeapArrayVector) vector;
- // before readBatch, initial the size of offsets & lengths as the
default value,
- // the actual size will be assigned in setChildrenInfo() after reading
complete.
- lcv.setOffsets(new long[VectorizedColumnBatch.DEFAULT_SIZE]);
- lcv.setLengths(new long[VectorizedColumnBatch.DEFAULT_SIZE]);
-
- DataType elementType = ((ArrayType) dataType).getElementType();
-
- // read the first row in parquet data page, this will be only happened
once for this
- // instance
- if (isFirstRow) {
- if (!fetchNextValue(elementType)) {
- return;
- }
- isFirstRow = false;
- }
-
- // Because the length of ListColumnVector.child can't be known now,
- // the valueList will save all data for ListColumnVector temporary.
- List<Object> valueList = new ArrayList<>();
-
- int index = collectDataFromParquetPage(readNumber, lcv, valueList,
elementType);
- // Convert valueList to array for the ListColumnVector.child
- fillColumnVector(elementType, lcv, valueList, index);
- }
-
- /**
- * Reads a single value from parquet page, puts it into lastValue. Returns
a boolean indicating
- * if there is more values to read (true).
- *
- * @param type the element type of array
- * @return boolean
- */
- private boolean fetchNextValue(DataType type) {
- int left = readPageIfNeed();
- if (left > 0) {
- // get the values of repetition and definitionLevel
- readRepetitionAndDefinitionLevels();
- // read the data if it isn't null
- if (definitionLevel == maxDefLevel) {
- if (isCurrentPageDictionaryEncoded) {
- lastValue = dataColumn.readValueDictionaryId();
- } else {
- lastValue = readPrimitiveTypedRow(type);
- }
- } else {
- lastValue = null;
- }
- return true;
- } else {
- eof = true;
- return false;
- }
- }
-
- private int readPageIfNeed() {
- // Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- // no data left in current page, load data from new page
- readPage();
- leftInPage = (int) (endOfPageValueCount - valuesRead);
- }
- return leftInPage;
- }
-
- // Need to be in consistent with that
VectorizedPrimitiveColumnReader#readBatchHelper
- // TODO Reduce the duplicated code
- private Object readPrimitiveTypedRow(DataType type) {
- switch (type.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- return dataColumn.readBytes();
- case BOOLEAN:
- return dataColumn.readBoolean();
- case TIME_WITHOUT_TIME_ZONE:
- case DATE:
- case INTEGER:
- return dataColumn.readInteger();
- case TINYINT:
- return dataColumn.readTinyInt();
- case SMALLINT:
- return dataColumn.readSmallInt();
- case BIGINT:
- return dataColumn.readLong();
- case FLOAT:
- return dataColumn.readFloat();
- case DOUBLE:
- return dataColumn.readDouble();
- case DECIMAL:
- switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
- case INT32:
- return dataColumn.readInteger();
- case INT64:
- return dataColumn.readLong();
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return dataColumn.readBytes();
- }
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- int precision;
- if (type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) {
- precision = ((TimestampType) type).getPrecision();
- } else {
- precision = ((LocalZonedTimestampType)
type).getPrecision();
- }
-
- if (precision <= 3) {
- return dataColumn.readMillsTimestamp();
- } else if (precision <= 6) {
- return dataColumn.readMicrosTimestamp();
- } else {
- throw new RuntimeException(
- "Unsupported precision of time type in the list: "
+ precision);
- }
- default:
- throw new RuntimeException("Unsupported type in the list: " +
type);
- }
- }
-
- private Object dictionaryDecodeValue(DataType type, Integer
dictionaryValue) {
- if (dictionaryValue == null) {
- return null;
- }
-
- switch (type.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- return dictionary.readBytes(dictionaryValue);
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- case INTEGER:
- return dictionary.readInteger(dictionaryValue);
- case BOOLEAN:
- return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
- case DOUBLE:
- return dictionary.readDouble(dictionaryValue);
- case FLOAT:
- return dictionary.readFloat(dictionaryValue);
- case TINYINT:
- return dictionary.readTinyInt(dictionaryValue);
- case SMALLINT:
- return dictionary.readSmallInt(dictionaryValue);
- case BIGINT:
- return dictionary.readLong(dictionaryValue);
- case DECIMAL:
- switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
- case INT32:
- return dictionary.readInteger(dictionaryValue);
- case INT64:
- return dictionary.readLong(dictionaryValue);
- case FIXED_LEN_BYTE_ARRAY:
- case BINARY:
- return dictionary.readBytes(dictionaryValue);
- }
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return dictionary.readTimestamp(dictionaryValue);
- default:
- throw new RuntimeException("Unsupported type in the list: " +
type);
- }
- }
-
- /**
- * Collects data from a parquet page and returns the final row index where
it stopped. The
- * returned index can be equal to or less than total.
- *
- * @param total maximum number of rows to collect
- * @param lcv column vector to do initial setup in data collection time
- * @param valueList collection of values that will be fed into the vector
later
- * @param type the element type of array
- * @return int
- */
- private int collectDataFromParquetPage(
- int total, HeapArrayVector lcv, List<Object> valueList, DataType
type) {
- int index = 0;
- /*
- * Here is a nested loop for collecting all values from a parquet page.
- * A column of array type can be considered as a list of lists, so the
two loops are as below:
- * 1. The outer loop iterates on rows (index is a row index, so points
to a row in the batch), e.g.:
- * [0, 2, 3] <- index: 0
- * [NULL, 3, 4] <- index: 1
- *
- * 2. The inner loop iterates on values within a row (sets all data
from parquet data page
- * for an element in ListColumnVector), so fetchNextValue returns
values one-by-one:
- * 0, 2, 3, NULL, 3, 4
- *
- * As described below, the repetition level (repetitionLevel != 0)
- * can be used to decide when we'll start to read values for the next
list.
- */
- while (!eof && index < total) {
- // add element to ListColumnVector one by one
- lcv.getOffsets()[index] = valueList.size();
- /*
- * Let's collect all values for a single list.
- * Repetition level = 0 means that a new list started there in the
parquet page,
- * in that case, let's exit from the loop, and start to collect
value for a new list.
- */
- do {
- /*
- * Definition level = 0 when a NULL value was returned instead
of a list
- * (this is not the same as a NULL value in of a list).
- */
- if (definitionLevel == 0) {
- lcv.setNullAt(index);
- }
- valueList.add(
- isCurrentPageDictionaryEncoded
- ? dictionaryDecodeValue(type, (Integer)
lastValue)
- : lastValue);
- } while (fetchNextValue(type) && (repetitionLevel != 0));
-
- lcv.getLengths()[index] = valueList.size() -
lcv.getOffsets()[index];
- index++;
- }
- return index;
- }
-
- /**
- * The lengths & offsets will be initialized as default size (1024), it
should be set to the
- * actual size according to the element number.
- */
- private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int
elementNum) {
- lcv.setSize(itemNum);
- long[] lcvLength = new long[elementNum];
- long[] lcvOffset = new long[elementNum];
- System.arraycopy(lcv.getLengths(), 0, lcvLength, 0, elementNum);
- System.arraycopy(lcv.getOffsets(), 0, lcvOffset, 0, elementNum);
- lcv.setLengths(lcvLength);
- lcv.setOffsets(lcvOffset);
- }
-
- private void fillColumnVector(
- DataType type, HeapArrayVector lcv, List valueList, int
elementNum) {
- int total = valueList.size();
- setChildrenInfo(lcv, total, elementNum);
- switch (type.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- HeapBytesVector bytesVector = new HeapBytesVector(total);
- bytesVector.reset();
- lcv.setChild(bytesVector);
- for (int i = 0; i < valueList.size(); i++) {
- byte[] src = (byte[]) valueList.get(i);
- if (src == null) {
- ((HeapBytesVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapBytesVector) lcv.getChild()).appendBytes(i, src,
0, src.length);
- }
- }
- break;
- case BOOLEAN:
- HeapBooleanVector booleanVector = new HeapBooleanVector(total);
- booleanVector.reset();
- lcv.setChild(booleanVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapBooleanVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapBooleanVector) lcv.getChild()).vector[i] =
(boolean) valueList.get(i);
- }
- }
- break;
- case TINYINT:
- HeapByteVector byteVector = new HeapByteVector(total);
- byteVector.reset();
- lcv.setChild(byteVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapByteVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapByteVector) lcv.getChild()).vector[i] =
- ((List<Integer>) valueList).get(i).byteValue();
- }
- }
- break;
- case SMALLINT:
- HeapShortVector shortVector = new HeapShortVector(total);
- shortVector.reset();
- lcv.setChild(shortVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapShortVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapShortVector) lcv.getChild()).vector[i] =
- ((List<Integer>)
valueList).get(i).shortValue();
- }
- }
- break;
- case INTEGER:
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- HeapIntVector intVector = new HeapIntVector(total);
- intVector.reset();
- lcv.setChild(intVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapIntVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapIntVector) lcv.getChild()).vector[i] =
- ((List<Integer>) valueList).get(i);
- }
- }
- break;
- case FLOAT:
- HeapFloatVector floatVector = new HeapFloatVector(total);
- floatVector.reset();
- lcv.setChild(floatVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapFloatVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapFloatVector) lcv.getChild()).vector[i] =
- ((List<Float>) valueList).get(i);
- }
- }
- break;
- case BIGINT:
- HeapLongVector longVector = new HeapLongVector(total);
- longVector.reset();
- lcv.setChild(longVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapLongVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapLongVector) lcv.getChild()).vector[i] =
- ((List<Long>) valueList).get(i);
- }
- }
- break;
- case DOUBLE:
- HeapDoubleVector doubleVector = new HeapDoubleVector(total);
- doubleVector.reset();
- lcv.setChild(doubleVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapDoubleVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapDoubleVector) lcv.getChild()).vector[i] =
- ((List<Double>) valueList).get(i);
- }
- }
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- if (descriptor.getPrimitiveType().getPrimitiveTypeName()
- == PrimitiveType.PrimitiveTypeName.INT64) {
- HeapTimestampVector heapTimestampVector = new
HeapTimestampVector(total);
- heapTimestampVector.reset();
- lcv.setChild(new
ParquetTimestampVector(heapTimestampVector));
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapTimestampVector)
- ((ParquetTimestampVector)
lcv.getChild()).getVector())
- .setNullAt(i);
- } else {
- ((HeapTimestampVector)
- ((ParquetTimestampVector)
lcv.getChild()).getVector())
- .fill(((List<Timestamp>)
valueList).get(i));
- }
- }
- break;
- } else {
- HeapTimestampVector timestampVector = new
HeapTimestampVector(total);
- timestampVector.reset();
- lcv.setChild(timestampVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapTimestampVector)
lcv.getChild()).setNullAt(i);
- } else {
- ((HeapTimestampVector) lcv.getChild())
- .setTimestamp(i, ((List<Timestamp>)
valueList).get(i));
- }
- }
- break;
- }
- case DECIMAL:
- switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
- case INT32:
- HeapIntVector heapIntVector = new HeapIntVector(total);
- heapIntVector.reset();
- lcv.setChild(new ParquetDecimalVector(heapIntVector));
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapIntVector)
- ((ParquetDecimalVector)
lcv.getChild()).getVector())
- .setNullAt(i);
- } else {
- ((HeapIntVector)
-
((ParquetDecimalVector) lcv.getChild())
- .getVector())
- .vector[i] =
- ((List<Integer>) valueList).get(i);
- }
- }
- break;
- case INT64:
- HeapLongVector heapLongVector = new
HeapLongVector(total);
- heapLongVector.reset();
- lcv.setChild(new ParquetDecimalVector(heapLongVector));
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapLongVector)
- ((ParquetDecimalVector)
lcv.getChild()).getVector())
- .setNullAt(i);
- } else {
- ((HeapLongVector)
-
((ParquetDecimalVector) lcv.getChild())
- .getVector())
- .vector[i] =
- ((List<Long>) valueList).get(i);
- }
- }
- break;
- default:
- HeapBytesVector heapBytesVector = new
HeapBytesVector(total);
- heapBytesVector.reset();
- lcv.setChild(new
ParquetDecimalVector(heapBytesVector));
- for (int i = 0; i < valueList.size(); i++) {
- byte[] src = (byte[]) valueList.get(i);
- if (valueList.get(i) == null) {
- ((HeapBytesVector)
- ((ParquetDecimalVector)
lcv.getChild()).getVector())
- .setNullAt(i);
- } else {
- ((HeapBytesVector)
- ((ParquetDecimalVector)
lcv.getChild()).getVector())
- .appendBytes(i, src, 0, src.length);
- }
- }
- break;
- }
- break;
- default:
- throw new RuntimeException("Unsupported type in the list: " +
type);
- }
- }
-}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
deleted file mode 100644
index 0fb4c143c..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-import org.apache.paimon.types.DataType;
-
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records
for a column, part of
- * the code is referred from Apache Hive and Apache Parquet.
- */
-public abstract class BaseVectorizedColumnReader implements
ColumnReader<WritableColumnVector> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
-
- protected boolean isUtcTimestamp;
-
- /** Total number of values read. */
- protected long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is, if
valuesRead ==
- * endOfPageValueCount, we are at the end of the page.
- */
- protected long endOfPageValueCount;
-
- /** The dictionary, if this column has dictionary encoding. */
- protected final ParquetDataColumnReader dictionary;
-
- /** If true, the current page is dictionary encoded. */
- protected boolean isCurrentPageDictionaryEncoded;
-
- /** Maximum definition level for this column. */
- protected final int maxDefLevel;
-
- protected int definitionLevel;
- protected int repetitionLevel;
-
- /** Repetition/Definition/Value readers. */
- protected IntIterator repetitionLevelColumn;
-
- protected IntIterator definitionLevelColumn;
- protected ParquetDataColumnReader dataColumn;
-
- /** Total values in the current page. */
- protected int pageValueCount;
-
- protected final PageReader pageReader;
- protected final ColumnDescriptor descriptor;
- protected final Type type;
- protected final DataType dataType;
-
- public BaseVectorizedColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean isUtcTimestamp,
- Type parquetType,
- DataType dataType)
- throws IOException {
- this.descriptor = descriptor;
- this.type = parquetType;
- this.pageReader = pageReader;
- this.maxDefLevel = descriptor.getMaxDefinitionLevel();
- this.isUtcTimestamp = isUtcTimestamp;
- this.dataType = dataType;
-
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary =
-
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
- parquetType.asPrimitiveType(),
- dictionaryPage
- .getEncoding()
- .initDictionary(descriptor,
dictionaryPage),
- isUtcTimestamp);
- this.isCurrentPageDictionaryEncoded = true;
- } catch (IOException e) {
- throw new IOException(
- String.format("Could not decode the dictionary for
%s", descriptor), e);
- }
- } else {
- this.dictionary = null;
- this.isCurrentPageDictionaryEncoded = false;
- }
- }
-
- protected void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- valuesRead++;
- }
-
- protected void readPage() {
- DataPage page = pageReader.readPage();
-
- if (page == null) {
- return;
- }
-
- page.accept(
- new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
-
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, ByteBufferInputStream
in, int valueCount)
- throws IOException {
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = valuesRead + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- this.dataColumn = null;
- if (dictionary == null) {
- throw new IOException(
- String.format(
- "Could not read page in col %s because the
dictionary was missing for encoding %s.",
- descriptor, dataEncoding));
- }
- dataColumn =
- ParquetDataColumnReaderFactory.getDataColumnReaderByType(
- type.asPrimitiveType(),
- dataEncoding.getDictionaryBasedValuesReader(
- descriptor, VALUES,
dictionary.getDictionary()),
- isUtcTimestamp);
- this.isCurrentPageDictionaryEncoded = true;
- } else {
- dataColumn =
- ParquetDataColumnReaderFactory.getDataColumnReaderByType(
- type.asPrimitiveType(),
- dataEncoding.getValuesReader(descriptor, VALUES),
- isUtcTimestamp);
- this.isCurrentPageDictionaryEncoded = false;
- }
-
- try {
- dataColumn.initFromPage(pageValueCount, in);
- } catch (IOException e) {
- throw new IOException(String.format("Could not read page in col
%s.", descriptor), e);
- }
- }
-
- private void readPageV1(DataPageV1 page) {
- ValuesReader rlReader =
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
- ValuesReader dlReader =
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- BytesInput bytes = page.getBytes();
- LOG.debug("Page size {} bytes and {} records.", bytes.size(),
pageValueCount);
- ByteBufferInputStream in = bytes.toInputStream();
- LOG.debug("Reading repetition levels at {}.", in.position());
- rlReader.initFromPage(pageValueCount, in);
- LOG.debug("Reading definition levels at {}.", in.position());
- dlReader.initFromPage(pageValueCount, in);
- LOG.debug("Reading data at {}.", in.position());
- initDataReader(page.getValueEncoding(), in, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException(
- String.format("Could not read page %s in col %s.", page,
descriptor), e);
- }
- }
-
- private void readPageV2(DataPageV2 page) {
- this.pageValueCount = page.getValueCount();
- this.repetitionLevelColumn =
- newRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels());
- this.definitionLevelColumn =
- newRLEIterator(descriptor.getMaxDefinitionLevel(),
page.getDefinitionLevels());
- try {
- LOG.debug(
- "Page data size {} bytes and {} records.",
- page.getData().size(),
- pageValueCount);
- initDataReader(
- page.getDataEncoding(), page.getData().toInputStream(),
page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException(
- String.format("Could not read page %s in col %s.", page,
descriptor), e);
- }
- }
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
- } catch (IOException e) {
- throw new ParquetDecodingException(
- String.format("Could not read levels in page for col %s.",
descriptor), e);
- }
- }
-
- /** Utility interface to abstract over different way to read ints with
different encodings. */
- interface IntIterator {
- int nextInt();
- }
-
- /** Reading int from {@link ValuesReader}. */
- protected static final class ValuesReaderIntIterator implements
IntIterator {
- ValuesReader delegate;
-
- public ValuesReaderIntIterator(ValuesReader delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public int nextInt() {
- return delegate.readInteger();
- }
- }
-
- /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
- protected static final class RLEIntIterator implements IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
- }
-
- /** Reading zero always. */
- protected static final class NullIntIterator implements IntIterator {
- @Override
- public int nextInt() {
- return 0;
- }
- }
-}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
deleted file mode 100644
index c865617f5..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapMapVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-
-import java.io.IOException;
-
-/** Map {@link ColumnReader}. */
-public class MapColumnReader implements ColumnReader<WritableColumnVector> {
-
- private final ArrayColumnReader keyReader;
- private final ArrayColumnReader valueReader;
-
- public MapColumnReader(ArrayColumnReader keyReader, ArrayColumnReader
valueReader) {
- this.keyReader = keyReader;
- this.valueReader = valueReader;
- }
-
- public void readBatch(int total, ColumnVector column) throws IOException {
- HeapMapVector mapVector = (HeapMapVector) column;
- // initialize 2 ListColumnVector for keys and values
- HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
- HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
- // read the keys and values
- keyReader.readToVector(total, keyArrayColumnVector);
- valueReader.readToVector(total, valueArrayColumnVector);
-
- // set the related attributes according to the keys and values
- mapVector.setKeys(keyArrayColumnVector.getChild());
- mapVector.setValues(valueArrayColumnVector.getChild());
- mapVector.setOffsets(keyArrayColumnVector.getOffsets());
- mapVector.setLengths(keyArrayColumnVector.getLengths());
- mapVector.setSize(keyArrayColumnVector.getSize());
- for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
- if (keyArrayColumnVector.isNullAt(i)) {
- mapVector.setNullAt(i);
- }
- }
- }
-
- @Override
- public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
- readBatch(readNumber, vector);
- }
-}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index dbbf2028c..0b0d89d4d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -31,6 +31,8 @@ import
org.apache.paimon.data.columnar.heap.HeapTimestampVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.position.LevelDelegation;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.IntArrayList;
import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -128,7 +130,6 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
try {
this.dictionary =
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
- parquetType.asPrimitiveType(),
dictionaryPage
.getEncoding()
.initDictionary(descriptor,
dictionaryPage),
@@ -263,13 +264,24 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
return dataColumn.readBytes();
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return readTimestamp(((TimestampType)
category).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return dataColumn.readMillsTimestamp();
+ return readTimestamp(((LocalZonedTimestampType)
category).getPrecision());
default:
throw new RuntimeException("Unsupported type in the list: " +
type);
}
}
+ private Timestamp readTimestamp(int precision) {
+ if (precision <= 3) {
+ return dataColumn.readMillsTimestamp();
+ } else if (precision <= 6) {
+ return dataColumn.readMicrosTimestamp();
+ } else {
+ return dataColumn.readNanosTimestamp();
+ }
+ }
+
private Object dictionaryDecodeValue(DataType category, Integer
dictionaryValue) {
if (dictionaryValue == null) {
return null;
@@ -308,13 +320,26 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
return dictionary.readBytes(dictionaryValue);
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return dictionaryReadTimestamp(
+ ((TimestampType) category).getPrecision(),
dictionaryValue);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return dictionary.readTimestamp(dictionaryValue);
+ return dictionaryReadTimestamp(
+ ((LocalZonedTimestampType) category).getPrecision(),
dictionaryValue);
default:
throw new RuntimeException("Unsupported type in the list: " +
type);
}
}
+ private Timestamp dictionaryReadTimestamp(int precision, int
dictionaryValue) {
+ if (precision <= 3) {
+ return dictionary.readMillsTimestamp(dictionaryValue);
+ } else if (precision <= 6) {
+ return dictionary.readMicrosTimestamp(dictionaryValue);
+ } else {
+ return dictionary.readNanosTimestamp(dictionaryValue);
+ }
+ }
+
private WritableColumnVector fillColumnVector(int total, List valueList) {
switch (dataType.getTypeRoot()) {
case CHAR:
@@ -499,7 +524,6 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
}
dataColumn =
ParquetDataColumnReaderFactory.getDataColumnReaderByType(
- type.asPrimitiveType(),
dataEncoding.getDictionaryBasedValuesReader(
descriptor, VALUES,
dictionary.getDictionary()),
isUtcTimestamp);
@@ -507,9 +531,7 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
} else {
dataColumn =
ParquetDataColumnReaderFactory.getDataColumnReaderByType(
- type.asPrimitiveType(),
- dataEncoding.getValuesReader(descriptor, VALUES),
- isUtcTimestamp);
+ dataEncoding.getValuesReader(descriptor, VALUES),
isUtcTimestamp);
this.isCurrentPageDictionaryEncoded = false;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
index 0d3c220a1..53ade64ab 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
@@ -72,6 +72,9 @@ public interface ParquetDataColumnReader {
/** @return the next Timestamp with microsecond precision. */
Timestamp readMicrosTimestamp();
+ /** @return the next Timestamp with nanos precision. */
+ Timestamp readNanosTimestamp();
+
/** @return the underlying dictionary if current reader is dictionary
encoded */
Dictionary getDictionary();
@@ -127,5 +130,17 @@ public interface ParquetDataColumnReader {
* @param id in dictionary
* @return the TimestampData from the dictionary by id
*/
- Timestamp readTimestamp(int id);
+ Timestamp readMillsTimestamp(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the TimestampData from the dictionary by id
+ */
+ Timestamp readMicrosTimestamp(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the TimestampData from the dictionary by id
+ */
+ Timestamp readNanosTimestamp(int id);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
index 382704f1c..c3e05e0ae 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
@@ -24,7 +24,6 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
import javax.annotation.Nullable;
@@ -47,18 +46,22 @@ public final class ParquetDataColumnReaderFactory {
* The default data column reader for existing Parquet page reader which
works for both
* dictionary or non dictionary types, Mirror from dictionary encoding
path.
*/
- public static class DefaultParquetDataColumnReader implements
ParquetDataColumnReader {
- protected final ValuesReader valuesReader;
- protected final Dictionary dict;
+ private static class DefaultParquetDataColumnReader implements
ParquetDataColumnReader {
- public DefaultParquetDataColumnReader(ValuesReader valuesReader) {
+ private final ValuesReader valuesReader;
+ private final Dictionary dict;
+ private final boolean isUtcTimestamp;
+
+ public DefaultParquetDataColumnReader(ValuesReader valuesReader,
boolean isUtcTimestamp) {
this.valuesReader = checkNotNull(valuesReader);
this.dict = null;
+ this.isUtcTimestamp = isUtcTimestamp;
}
- public DefaultParquetDataColumnReader(Dictionary dict) {
+ public DefaultParquetDataColumnReader(Dictionary dict, boolean
isUtcTimestamp) {
this.valuesReader = null;
this.dict = checkNotNull(dict);
+ this.isUtcTimestamp = isUtcTimestamp;
}
@Override
@@ -117,8 +120,23 @@ public final class ParquetDataColumnReaderFactory {
}
@Override
- public Timestamp readTimestamp(int id) {
- throw new RuntimeException("Unsupported operation");
+ public Timestamp readNanosTimestamp() {
+ return int96TimestampConvert(valuesReader.readBytes());
+ }
+
+ @Override
+ public Timestamp readMillsTimestamp(int id) {
+ return Timestamp.fromEpochMillis(readLong(id));
+ }
+
+ @Override
+ public Timestamp readMicrosTimestamp(int id) {
+ return Timestamp.fromMicros(readLong(id));
+ }
+
+ @Override
+ public Timestamp readNanosTimestamp(int id) {
+ return int96TimestampConvert(dict.decodeToBinary(id));
}
@Override
@@ -166,31 +184,12 @@ public final class ParquetDataColumnReaderFactory {
return valuesReader.readValueDictionaryId();
}
- public void skip() {
- valuesReader.skip();
- }
-
@Override
public Dictionary getDictionary() {
return dict;
}
- }
-
- /** The reader who reads from the underlying Timestamp value. */
- public static class TypesFromInt96PageReader extends
DefaultParquetDataColumnReader {
- private final boolean isUtcTimestamp;
-
- public TypesFromInt96PageReader(ValuesReader realReader, boolean
isUtcTimestamp) {
- super(realReader);
- this.isUtcTimestamp = isUtcTimestamp;
- }
- public TypesFromInt96PageReader(Dictionary dict, boolean
isUtcTimestamp) {
- super(dict);
- this.isUtcTimestamp = isUtcTimestamp;
- }
-
- private Timestamp convert(Binary binary) {
+ private Timestamp int96TimestampConvert(Binary binary) {
ByteBuffer buf = binary.toByteBuffer();
buf.order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = buf.getLong();
@@ -198,48 +197,25 @@ public final class ParquetDataColumnReaderFactory {
return TimestampColumnReader.int96ToTimestamp(
isUtcTimestamp, timeOfDayNanos, julianDay);
}
-
- @Override
- public Timestamp readTimestamp(int id) {
- return convert(dict.decodeToBinary(id));
- }
-
- @Override
- public Timestamp readMillsTimestamp() {
- return convert(valuesReader.readBytes());
- }
-
- @Override
- public Timestamp readMicrosTimestamp() {
- return convert(valuesReader.readBytes());
- }
}
private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
boolean isDictionary,
- PrimitiveType parquetType,
@Nullable Dictionary dictionary,
@Nullable ValuesReader valuesReader,
boolean isUtcTimestamp) {
- if (parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT96) {
- return isDictionary
- ? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
- : new TypesFromInt96PageReader(valuesReader,
isUtcTimestamp);
- } else {
- return isDictionary
- ? new DefaultParquetDataColumnReader(dictionary)
- : new DefaultParquetDataColumnReader(valuesReader);
- }
+ return isDictionary
+ ? new DefaultParquetDataColumnReader(dictionary,
isUtcTimestamp)
+ : new DefaultParquetDataColumnReader(valuesReader,
isUtcTimestamp);
}
public static ParquetDataColumnReader
getDataColumnReaderByTypeOnDictionary(
- PrimitiveType parquetType, Dictionary realReader, boolean
isUtcTimestamp) {
- return getDataColumnReaderByTypeHelper(true, parquetType, realReader,
null, isUtcTimestamp);
+ Dictionary realReader, boolean isUtcTimestamp) {
+ return getDataColumnReaderByTypeHelper(true, realReader, null,
isUtcTimestamp);
}
public static ParquetDataColumnReader getDataColumnReaderByType(
- PrimitiveType parquetType, ValuesReader realReader, boolean
isUtcTimestamp) {
- return getDataColumnReaderByTypeHelper(
- false, parquetType, null, realReader, isUtcTimestamp);
+ ValuesReader realReader, boolean isUtcTimestamp) {
+ return getDataColumnReaderByTypeHelper(false, null, realReader,
isUtcTimestamp);
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 56aa83875..03e1ec072 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -135,7 +136,9 @@ public class ParquetReadWriteTest {
new MultisetType(new
VarCharType(VarCharType.MAX_LENGTH)),
RowType.builder()
.fields(new
VarCharType(VarCharType.MAX_LENGTH), new IntType())
- .build())
+ .build(),
+ new MapType(
+ new TimestampType(6), new
VarCharType(VarCharType.MAX_LENGTH)))
.build();
private static final RowType NESTED_ARRAY_MAP_TYPE =
@@ -469,6 +472,9 @@ public class ParquetReadWriteTest {
Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
Options conf = new Options();
conf.setInteger("parquet.block.size", rowGroupSize);
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ conf.set("parquet.enable.dictionary", "false");
+ }
ParquetWriterFactory factory =
new ParquetWriterFactory(new RowDataParquetBuilder(rowType,
conf));
String[] candidates = new String[] {"snappy", "zstd", "gzip"};
@@ -611,17 +617,22 @@ public class ParquetReadWriteTest {
return new GenericRow(ROW_TYPE.getFieldCount());
}
+ BinaryString str = BinaryString.fromString("" + v);
+
Map<BinaryString, BinaryString> f30 = new HashMap<>();
- f30.put(BinaryString.fromString("" + v), BinaryString.fromString("" +
v));
+ f30.put(str, str);
Map<Integer, Boolean> f31 = new HashMap<>();
f31.put(v, v % 2 == 0);
Map<BinaryString, Integer> f32 = new HashMap<>();
- f32.put(BinaryString.fromString("" + v), v);
+ f32.put(str, v);
+
+ Map<Timestamp, BinaryString> f34 = new HashMap<>();
+ f34.put(toMicros(v), str);
return GenericRow.of(
- BinaryString.fromString("" + v),
+ str,
v % 2 == 0,
v.byteValue(),
v.shortValue(),
@@ -638,7 +649,7 @@ public class ParquetReadWriteTest {
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0),
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0),
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0),
- new GenericArray(new Object[] {BinaryString.fromString("" +
v), null}),
+ new GenericArray(new Object[] {str, null}),
new GenericArray(new Object[] {v % 2 == 0, null}),
new GenericArray(new Object[] {v.byteValue(), null}),
new GenericArray(new Object[] {v.shortValue(), null}),
@@ -670,7 +681,8 @@ public class ParquetReadWriteTest {
new GenericMap(f30),
new GenericMap(f31),
new GenericMap(f32),
- GenericRow.of(BinaryString.fromString("" + v), v));
+ GenericRow.of(str, v),
+ new GenericMap(f34));
}
private Timestamp toMills(Integer v) {