This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ddd41a417 [parquet] support read parquet nested columns. (#3656)
ddd41a417 is described below
commit ddd41a417a7b2636e0a0fef762f92745ff5f6d10
Author: Wenchao Wu <[email protected]>
AuthorDate: Wed Jul 3 10:37:48 2024 +0800
[parquet] support read parquet nested columns. (#3656)
---
.../paimon/data/columnar/heap/HeapRowVector.java | 6 +-
.../org/apache/paimon/utils/BooleanArrayList.java | 65 +++
.../java/org/apache/paimon/utils/IntArrayList.java | 89 +++
.../org/apache/paimon/utils/LongArrayList.java | 79 +++
.../format/parquet/ParquetReaderFactory.java | 21 +-
.../parquet/position/CollectionPosition.java | 54 ++
.../format/parquet/position/LevelDelegation.java | 38 ++
.../format/parquet/position/RowPosition.java | 40 ++
.../format/parquet/reader/NestedColumnReader.java | 255 +++++++++
.../format/parquet/reader/NestedPositionUtil.java | 208 +++++++
.../reader/NestedPrimitiveColumnReader.java | 622 +++++++++++++++++++++
.../parquet/reader/ParquetDecimalVector.java | 161 +++++-
.../parquet/reader/ParquetSplitReaderUtil.java | 263 +++++++--
.../paimon/format/parquet/type/ParquetField.java | 66 +++
.../format/parquet/type/ParquetGroupField.java | 47 ++
.../format/parquet/type/ParquetPrimitiveField.java | 51 ++
.../format/parquet/ParquetReadWriteTest.java | 329 ++++++++++-
17 files changed, 2332 insertions(+), 62 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
index bfa16ce96..37d619bde 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
@@ -27,7 +27,7 @@ import
org.apache.paimon.data.columnar.writable.WritableColumnVector;
public class HeapRowVector extends AbstractHeapVector
implements WritableColumnVector, RowColumnVector {
- private final WritableColumnVector[] fields;
+ private WritableColumnVector[] fields;
public HeapRowVector(int len, WritableColumnVector... fields) {
super(len);
@@ -57,4 +57,8 @@ public class HeapRowVector extends AbstractHeapVector
field.reset();
}
}
+
+ public void setFields(WritableColumnVector[] fields) {
+ this.fields = fields;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java
new file mode 100644
index 000000000..cbb16b703
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+
+/** Minimal implementation of an array-backed list of booleans. */
+public class BooleanArrayList {
+ private int size;
+
+ private boolean[] array;
+
+ public BooleanArrayList(int capacity) {
+ this.size = 0;
+ this.array = new boolean[capacity];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean add(boolean element) {
+ grow(size + 1);
+ array[size++] = element;
+ return true;
+ }
+
+ public void clear() {
+ size = 0;
+ }
+
+ public boolean isEmpty() {
+ return (size == 0);
+ }
+
+ public boolean[] toArray() {
+ return Arrays.copyOf(array, size);
+ }
+
+ private void grow(int length) {
+ if (length > array.length) {
+ final int newLength =
+ (int) Math.max(Math.min(2L * array.length,
Integer.MAX_VALUE - 8), length);
+ final boolean[] t = new boolean[newLength];
+ System.arraycopy(array, 0, t, 0, size);
+ array = t;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
new file mode 100644
index 000000000..575fae02a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+/** Minimal implementation of an array-backed list of ints. */
+public class IntArrayList {
+
+ private int size;
+
+ private int[] array;
+
+ public IntArrayList(final int capacity) {
+ this.size = 0;
+ this.array = new int[capacity];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean add(final int number) {
+ grow(size + 1);
+ array[size++] = number;
+ return true;
+ }
+
+ public int removeLast() {
+ if (size == 0) {
+ throw new NoSuchElementException();
+ }
+ --size;
+ return array[size];
+ }
+
+ public void clear() {
+ size = 0;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ private void grow(final int length) {
+ if (length > array.length) {
+ final int newLength =
+ (int) Math.max(Math.min(2L * array.length,
Integer.MAX_VALUE - 8), length);
+ final int[] t = new int[newLength];
+ System.arraycopy(array, 0, t, 0, size);
+ array = t;
+ }
+ }
+
+ public int[] toArray() {
+ return Arrays.copyOf(array, size);
+ }
+
+ public static final IntArrayList EMPTY =
+ new IntArrayList(0) {
+
+ @Override
+ public boolean add(int number) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int removeLast() {
+ throw new UnsupportedOperationException();
+ }
+ };
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java
b/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java
new file mode 100644
index 000000000..e94f46897
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+
+/** Minimal implementation of an array-backed list of longs. */
+public class LongArrayList {
+
+ private int size;
+
+ private long[] array;
+
+ public LongArrayList(int capacity) {
+ this.size = 0;
+ this.array = new long[capacity];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean add(long number) {
+ grow(size + 1);
+ array[size++] = number;
+ return true;
+ }
+
+ public long removeLong(int index) {
+ if (index >= size) {
+ throw new IndexOutOfBoundsException(
+ "Index (" + index + ") is greater than or equal to list
size (" + size + ")");
+ }
+ final long old = array[index];
+ size--;
+ if (index != size) {
+ System.arraycopy(array, index + 1, array, index, size - index);
+ }
+ return old;
+ }
+
+ public void clear() {
+ size = 0;
+ }
+
+ public boolean isEmpty() {
+ return (size == 0);
+ }
+
+ public long[] toArray() {
+ return Arrays.copyOf(array, size);
+ }
+
+ private void grow(int length) {
+ if (length > array.length) {
+ final int newLength =
+ (int) Math.max(Math.min(2L * array.length,
Integer.MAX_VALUE - 8), length);
+ final long[] t = new long[newLength];
+ System.arraycopy(array, 0, t, 0, size);
+ array = t;
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index afaf8a501..b0715bb53 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
+import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
@@ -42,6 +43,8 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -57,6 +60,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList;
import static
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader;
import static
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector;
import static
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
@@ -72,6 +76,8 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private static final String ALLOCATION_SIZE =
"parquet.read.allocation.size";
private final Options conf;
+
+ private final RowType projectedType;
private final String[] projectedFields;
private final DataType[] projectedTypes;
private final int batchSize;
@@ -81,6 +87,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize,
FilterCompat.Filter filter) {
this.conf = conf;
+ this.projectedType = projectedType;
this.projectedFields = projectedType.getFieldNames().toArray(new
String[0]);
this.projectedTypes = projectedType.getFieldTypes().toArray(new
DataType[0]);
this.batchSize = batchSize;
@@ -106,7 +113,12 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
Pool<ParquetReaderBatch> poolOfBatches =
createPoolOfBatches(context.filePath(), requestedSchema);
- return new ParquetReader(reader, requestedSchema,
reader.getRecordCount(), poolOfBatches);
+ MessageColumnIO columnIO = new
ColumnIOFactory().getColumnIO(requestedSchema);
+ List<ParquetField> fields =
+ buildFieldsList(projectedType.getFields(),
projectedType.getFieldNames(), columnIO);
+
+ return new ParquetReader(
+ reader, requestedSchema, reader.getRecordCount(),
poolOfBatches, fields);
}
private void setReadOptions(ParquetReadOptions.Builder builder) {
@@ -270,11 +282,14 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
@SuppressWarnings("rawtypes")
private ColumnReader[] columnReaders;
+ private final List<ParquetField> fields;
+
private ParquetReader(
ParquetFileReader reader,
MessageType requestedSchema,
long totalRowCount,
- Pool<ParquetReaderBatch> pool) {
+ Pool<ParquetReaderBatch> pool,
+ List<ParquetField> fields) {
this.reader = reader;
this.requestedSchema = requestedSchema;
this.totalRowCount = totalRowCount;
@@ -283,6 +298,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
+ this.fields = fields;
}
@Nullable
@@ -348,6 +364,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
types.get(i),
requestedSchema.getColumns(),
rowGroup,
+ fields.get(i),
0);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
new file mode 100644
index 000000000..e72a4280f
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent collection's position in repeated type. */
+public class CollectionPosition {
+ @Nullable private final boolean[] isNull;
+ private final long[] offsets;
+
+ private final long[] length;
+
+ private final int valueCount;
+
+ public CollectionPosition(boolean[] isNull, long[] offsets, long[] length,
int valueCount) {
+ this.isNull = isNull;
+ this.offsets = offsets;
+ this.length = length;
+ this.valueCount = valueCount;
+ }
+
+ public boolean[] getIsNull() {
+ return isNull;
+ }
+
+ public long[] getOffsets() {
+ return offsets;
+ }
+
+ public long[] getLength() {
+ return length;
+ }
+
+ public int getValueCount() {
+ return valueCount;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
new file mode 100644
index 000000000..25bbedc86
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+/** To delegate repetition level and definition level. */
+public class LevelDelegation {
+ private final int[] repetitionLevel;
+ private final int[] definitionLevel;
+
+ public LevelDelegation(int[] repetitionLevel, int[] definitionLevel) {
+ this.repetitionLevel = repetitionLevel;
+ this.definitionLevel = definitionLevel;
+ }
+
+ public int[] getRepetitionLevel() {
+ return repetitionLevel;
+ }
+
+ public int[] getDefinitionLevel() {
+ return definitionLevel;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
new file mode 100644
index 000000000..fb6378349
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent struct's position in repeated type. */
+public class RowPosition {
+ @Nullable private final boolean[] isNull;
+ private final int positionsCount;
+
+ public RowPosition(boolean[] isNull, int positionsCount) {
+ this.isNull = isNull;
+ this.positionsCount = positionsCount;
+ }
+
+ public boolean[] getIsNull() {
+ return isNull;
+ }
+
+ public int getPositionsCount() {
+ return positionsCount;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
new file mode 100644
index 000000000..e39005800
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.format.parquet.position.RowPosition;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.format.parquet.type.ParquetGroupField;
+import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This ColumnReader mainly used to read `Group` type in parquet such as
`Map`, `Array`, `Row`. The
+ * method about how to resolve nested struct mainly refer to : <a
+ *
href="https://github.com/julienledem/redelm/wiki/The-striping-and-assembly-algorithms-from-the-Dremel-paper">The
+ * striping and assembly algorithms from the Dremel paper</a>.
+ *
+ * <p>Brief explanation of reading repetition and definition levels:
Repetition level equal to 0
+ * means that this is the beginning of a new row. Other value means that we
should add data to the
+ * current row.
+ *
+ * <p>For example, if we have the following data: repetition levels:
0,1,1,0,0,1,[0] (last 0 is
+ * implicit, normally will be the end of the page) values: a,b,c,d,e,f will
consist of the sets of:
+ * (a, b, c), (d), (e, f). <br>
+ *
+ * <p>Definition levels contains 3 situations: level = maxDefLevel means value
exist and is not null
+ * level = maxDefLevel - 1 means value is null level < maxDefLevel - 1 means
value doesn't exist For
+ * non-nullable (REQUIRED) fields the (level = maxDefLevel - 1) condition
means non-existing value
+ * as well. <br>
+ *
+ * <p>Quick example (maxDefLevel is 2): Read 3 rows out of: repetition levels:
0,1,0,1,1,0,0,...
+ * definition levels: 2,1,0,2,1,2,... values: a,b,c,d,e,f,... Resulting
buffer: a,n, ,d,n,f that
+ * result is (a,n),(d,n),(f) where n means null
+ */
+public class NestedColumnReader implements ColumnReader<WritableColumnVector> {
+
+ private final Map<ColumnDescriptor, NestedPrimitiveColumnReader>
columnReaders;
+ private final boolean isUtcTimestamp;
+
+ private final PageReadStore pages;
+
+ private final ParquetField field;
+
+ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages,
ParquetField field) {
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.pages = pages;
+ this.field = field;
+ this.columnReaders = new HashMap<>();
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
+ readData(field, readNumber, vector, false);
+ }
+
+ private Pair<LevelDelegation, WritableColumnVector> readData(
+ ParquetField field, int readNumber, ColumnVector vector, boolean
inside)
+ throws IOException {
+ if (field.getType() instanceof RowType) {
+ return readRow((ParquetGroupField) field, readNumber, vector,
inside);
+ } else if (field.getType() instanceof MapType || field.getType()
instanceof MultisetType) {
+ return readMap((ParquetGroupField) field, readNumber, vector,
inside);
+ } else if (field.getType() instanceof ArrayType) {
+ return readArray((ParquetGroupField) field, readNumber, vector,
inside);
+ } else {
+ return readPrimitive((ParquetPrimitiveField) field, readNumber,
vector);
+ }
+ }
+
+ private Pair<LevelDelegation, WritableColumnVector> readRow(
+ ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
+ throws IOException {
+ HeapRowVector heapRowVector = (HeapRowVector) vector;
+ LevelDelegation levelDelegation = null;
+ List<ParquetField> children = field.getChildren();
+ WritableColumnVector[] childrenVectors = heapRowVector.getFields();
+ WritableColumnVector[] finalChildrenVectors =
+ new WritableColumnVector[childrenVectors.length];
+ for (int i = 0; i < children.size(); i++) {
+ Pair<LevelDelegation, WritableColumnVector> tuple =
+ readData(children.get(i), readNumber, childrenVectors[i],
true);
+ levelDelegation = tuple.getLeft();
+ finalChildrenVectors[i] = tuple.getRight();
+ }
+ if (levelDelegation == null) {
+ throw new RuntimeException(
+ String.format("Row field does not have any children: %s.",
field));
+ }
+
+ RowPosition rowPosition =
+ NestedPositionUtil.calculateRowOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If row was inside the structure, then we need to renew the vector
to reset the
+ // capacity.
+ if (inside) {
+ heapRowVector =
+ new HeapRowVector(rowPosition.getPositionsCount(),
finalChildrenVectors);
+ } else {
+ heapRowVector.setFields(finalChildrenVectors);
+ }
+
+ if (rowPosition.getIsNull() != null) {
+ setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
+ }
+ return Pair.of(levelDelegation, heapRowVector);
+ }
+
+ private Pair<LevelDelegation, WritableColumnVector> readMap(
+ ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
+ throws IOException {
+ HeapMapVector mapVector = (HeapMapVector) vector;
+ mapVector.reset();
+ List<ParquetField> children = field.getChildren();
+ Preconditions.checkArgument(
+ children.size() == 2,
+ "Maps must have two type parameters, found %s",
+ children.size());
+ Pair<LevelDelegation, WritableColumnVector> keyTuple =
+ readData(children.get(0), readNumber,
mapVector.getKeyColumnVector(), true);
+ Pair<LevelDelegation, WritableColumnVector> valueTuple =
+ readData(children.get(1), readNumber,
mapVector.getValueColumnVector(), true);
+
+ LevelDelegation levelDelegation = keyTuple.getLeft();
+
+ CollectionPosition collectionPosition =
+ NestedPositionUtil.calculateCollectionOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If map was inside the structure, then we need to renew the vector
to reset the
+ // capacity.
+ if (inside) {
+ mapVector =
+ new HeapMapVector(
+ collectionPosition.getValueCount(),
+ keyTuple.getRight(),
+ valueTuple.getRight());
+ } else {
+ mapVector.setKeys(keyTuple.getRight());
+ mapVector.setValues(valueTuple.getRight());
+ }
+
+ if (collectionPosition.getIsNull() != null) {
+ setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
+ }
+
+ mapVector.setLengths(collectionPosition.getLength());
+ mapVector.setOffsets(collectionPosition.getOffsets());
+
+ return Pair.of(levelDelegation, mapVector);
+ }
+
+ private Pair<LevelDelegation, WritableColumnVector> readArray(
+ ParquetGroupField field, int readNumber, ColumnVector vector,
boolean inside)
+ throws IOException {
+ HeapArrayVector arrayVector = (HeapArrayVector) vector;
+ arrayVector.reset();
+ List<ParquetField> children = field.getChildren();
+ Preconditions.checkArgument(
+ children.size() == 1,
+ "Arrays must have a single type parameter, found %s",
+ children.size());
+ Pair<LevelDelegation, WritableColumnVector> tuple =
+ readData(children.get(0), readNumber, arrayVector.getChild(),
true);
+
+ LevelDelegation levelDelegation = tuple.getLeft();
+ CollectionPosition collectionPosition =
+ NestedPositionUtil.calculateCollectionOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If array was inside the structure, then we need to renew the vector
to reset the
+ // capacity.
+ if (inside) {
+ arrayVector = new
HeapArrayVector(collectionPosition.getValueCount(), tuple.getRight());
+ } else {
+ arrayVector.setChild(tuple.getRight());
+ }
+
+ if (collectionPosition.getIsNull() != null) {
+ setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
+ }
+ arrayVector.setLengths(collectionPosition.getLength());
+ arrayVector.setOffsets(collectionPosition.getOffsets());
+ return Pair.of(levelDelegation, arrayVector);
+ }
+
+ private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
+ ParquetPrimitiveField field, int readNumber, ColumnVector vector)
throws IOException {
+ ColumnDescriptor descriptor = field.getDescriptor();
+ NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
+ if (reader == null) {
+ reader =
+ new NestedPrimitiveColumnReader(
+ descriptor,
+ pages.getPageReader(descriptor),
+ isUtcTimestamp,
+ descriptor.getPrimitiveType(),
+ field.getType());
+ columnReaders.put(descriptor, reader);
+ }
+ WritableColumnVector writableColumnVector =
+ reader.readAndNewVector(readNumber, (WritableColumnVector)
vector);
+ return Pair.of(reader.getLevelDelegation(), writableColumnVector);
+ }
+
+ private static void setFieldNullFalg(boolean[] nullFlags,
AbstractHeapVector vector) {
+ for (int index = 0; index < vector.getLen() && index <
nullFlags.length; index++) {
+ if (nullFlags[index]) {
+ vector.setNullAt(index);
+ }
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
new file mode 100644
index 000000000..5f0757c23
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.RowPosition;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.utils.BooleanArrayList;
+import org.apache.paimon.utils.LongArrayList;
+
+import static java.lang.String.format;
+
+/** Utils to calculate nested type position. */
+public class NestedPositionUtil {
+
+ /**
+ * Calculate row offsets according to column's max repetition level,
definition level, value's
+ * repetition level and definition level. Each row has three situation:
+ * <li>Row is not defined,because it's optional parent fields is null,
this is decided by its
+ * parent's repetition level
+ * <li>Row is null
+ * <li>Row is defined and not empty.
+ *
+ * @param field field that contains the row column message include max
repetition level and
+ * definition level.
+ * @param fieldRepetitionLevels int array with each value's repetition
level.
+ * @param fieldDefinitionLevels int array with each value's definition
level.
+ * @return {@link RowPosition} contains collections row count and isNull
array.
+ */
+ public static RowPosition calculateRowOffsets(
+ ParquetField field, int[] fieldDefinitionLevels, int[]
fieldRepetitionLevels) {
+ int rowDefinitionLevel = field.getDefinitionLevel();
+ int rowRepetitionLevel = field.getRepetitionLevel();
+ int nullValuesCount = 0;
+ BooleanArrayList nullRowFlags = new BooleanArrayList(0);
+ for (int i = 0; i < fieldDefinitionLevels.length; i++) {
+ if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
+ throw new IllegalStateException(
+ format(
+ "In parquet's row type field repetition level
should not larger than row's repetition level. "
+ + "Row repetition level is %s, row
field repetition level is %s.",
+ rowRepetitionLevel, fieldRepetitionLevels[i]));
+ }
+
+ if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
+ // current row is defined and not empty
+ nullRowFlags.add(false);
+ } else {
+ // current row is null
+ nullRowFlags.add(true);
+ nullValuesCount++;
+ }
+ }
+ if (nullValuesCount == 0) {
+ return new RowPosition(null, fieldDefinitionLevels.length);
+ }
+ return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
+ }
+
+ /**
+ * Calculate the collection's offsets according to column's max repetition
level, definition
+ * level, value's repetition level and definition level. Each collection
(Array or Map) has four
+ * situation:
+ * <li>Collection is not defined, because optional parent fields is null,
this is decided by its
+ * parent's repetition level
+ * <li>Collection is null
+ * <li>Collection is defined but empty
+ * <li>Collection is defined and not empty. In this case offset value is
increased by the number
+ * of elements in that collection
+ *
+ * @param field field that contains array/map column message include max
repetition level and
+ * definition level.
+ * @param definitionLevels int array with each value's repetition level.
+ * @param repetitionLevels int array with each value's definition level.
+ * @return {@link CollectionPosition} contains collections offset array,
length array and isNull
+ * array.
+ */
+ public static CollectionPosition calculateCollectionOffsets(
+ ParquetField field, int[] definitionLevels, int[]
repetitionLevels) {
+ int collectionDefinitionLevel = field.getDefinitionLevel();
+ int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
+ int offset = 0;
+ int valueCount = 0;
+ LongArrayList offsets = new LongArrayList(0);
+ offsets.add(offset);
+ BooleanArrayList emptyCollectionFlags = new BooleanArrayList(0);
+ BooleanArrayList nullCollectionFlags = new BooleanArrayList(0);
+ int nullValuesCount = 0;
+ for (int i = 0;
+ i < definitionLevels.length;
+ i = getNextCollectionStartIndex(repetitionLevels,
collectionRepetitionLevel, i)) {
+ valueCount++;
+ if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
+ boolean isNull =
+ isOptionalFieldValueNull(definitionLevels[i],
collectionDefinitionLevel);
+ nullCollectionFlags.add(isNull);
+ nullValuesCount += isNull ? 1 : 0;
+ // definitionLevels[i] > collectionDefinitionLevel =>
Collection is defined and not
+ // empty
+ // definitionLevels[i] == collectionDefinitionLevel =>
Collection is defined but
+ // empty
+ if (definitionLevels[i] > collectionDefinitionLevel) {
+ emptyCollectionFlags.add(false);
+ offset += getCollectionSize(repetitionLevels,
collectionRepetitionLevel, i + 1);
+ } else if (definitionLevels[i] == collectionDefinitionLevel) {
+ offset++;
+ emptyCollectionFlags.add(true);
+ } else {
+ offset++;
+ emptyCollectionFlags.add(false);
+ }
+ offsets.add(offset);
+ } else {
+ // when definitionLevels[i] < collectionDefinitionLevel - 1,
it means the collection
+ // is
+ // not defined, but we need to regard it as null to avoid
getting value wrong.
+ nullCollectionFlags.add(true);
+ nullValuesCount++;
+ offsets.add(++offset);
+ emptyCollectionFlags.add(false);
+ }
+ }
+ long[] offsetsArray = offsets.toArray();
+ long[] length =
calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
+ if (nullValuesCount == 0) {
+ return new CollectionPosition(null, offsetsArray, length,
valueCount);
+ }
+ return new CollectionPosition(
+ nullCollectionFlags.toArray(), offsetsArray, length,
valueCount);
+ }
+
+ public static boolean isOptionalFieldValueNull(int definitionLevel, int
maxDefinitionLevel) {
+ return definitionLevel == maxDefinitionLevel - 1;
+ }
+
+ public static long[] calculateLengthByOffsets(
+ boolean[] collectionIsEmpty, long[] arrayOffsets) {
+ LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
+ for (int i = 0; i < arrayOffsets.length - 1; i++) {
+ long offset = arrayOffsets[i];
+ long length = arrayOffsets[i + 1] - offset;
+ if (length < 0) {
+ throw new IllegalArgumentException(
+ format(
+ "Offset is not monotonically ascending.
offsets[%s]=%s, offsets[%s]=%s",
+ i, arrayOffsets[i], i + 1, arrayOffsets[i +
1]));
+ }
+ if (collectionIsEmpty[i]) {
+ length = 0;
+ }
+ lengthList.add(length);
+ }
+ return lengthList.toArray();
+ }
+
+ private static int getNextCollectionStartIndex(
+ int[] repetitionLevels, int maxRepetitionLevel, int elementIndex) {
+ do {
+ elementIndex++;
+ } while (hasMoreElements(repetitionLevels, elementIndex)
+ && isNotCollectionBeginningMarker(
+ repetitionLevels, maxRepetitionLevel, elementIndex));
+ return elementIndex;
+ }
+
+ /** This method is only called for non-empty collections. */
+ private static int getCollectionSize(
+ int[] repetitionLevels, int maxRepetitionLevel, int nextIndex) {
+ int size = 1;
+ while (hasMoreElements(repetitionLevels, nextIndex)
+ && isNotCollectionBeginningMarker(
+ repetitionLevels, maxRepetitionLevel, nextIndex)) {
+ // Collection elements cannot only be primitive, but also can have
nested structure
+ // Counting only elements which belong to current collection,
skipping inner elements of
+ // nested collections/structs
+ if (repetitionLevels[nextIndex] <= maxRepetitionLevel) {
+ size++;
+ }
+ nextIndex++;
+ }
+ return size;
+ }
+
+ private static boolean isNotCollectionBeginningMarker(
+ int[] repetitionLevels, int maxRepetitionLevel, int nextIndex) {
+ return repetitionLevels[nextIndex] >= maxRepetitionLevel;
+ }
+
+ private static boolean hasMoreElements(int[] repetitionLevels, int
nextIndex) {
+ return nextIndex < repetitionLevels.length;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
new file mode 100644
index 000000000..dbbf2028c
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapByteVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
+import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
+import org.apache.paimon.data.columnar.heap.HeapShortVector;
+import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.IntArrayList;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/** Reader to read nested primitive column. */
+public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnVector> {
+ private static final Logger LOG =
LoggerFactory.getLogger(NestedPrimitiveColumnReader.class);
+
+ private final IntArrayList repetitionLevelList = new IntArrayList(0);
+ private final IntArrayList definitionLevelList = new IntArrayList(0);
+
+ private final PageReader pageReader;
+ private final ColumnDescriptor descriptor;
+ private final Type type;
+ private final DataType dataType;
+ /** The dictionary, if this column has dictionary encoding. */
+ private final ParquetDataColumnReader dictionary;
+ /** Maximum definition level for this column. */
+ private final int maxDefLevel;
+
+ private boolean isUtcTimestamp;
+
+ /** Total number of values read. */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if
valuesRead ==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /** If true, the current page is dictionary encoded. */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ private int definitionLevel;
+ private int repetitionLevel;
+
+ /** Repetition/Definition/Value readers. */
+ private IntIterator repetitionLevelColumn;
+
+ private IntIterator definitionLevelColumn;
+ private ParquetDataColumnReader dataColumn;
+
+ /** Total values in the current page. */
+ private int pageValueCount;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ private boolean isFirstRow = true;
+
+ private Object lastValue;
+
+ public NestedPrimitiveColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type parquetType,
+ DataType dataType)
+ throws IOException {
+ this.descriptor = descriptor;
+ this.type = parquetType;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.dataType = dataType;
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary =
+
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+ parquetType.asPrimitiveType(),
+ dictionaryPage
+ .getEncoding()
+ .initDictionary(descriptor,
dictionaryPage),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException(
+ String.format("Could not decode the dictionary for
%s", descriptor), e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ }
+
+ // This won't call, will actually call readAndNewVector
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector)
throws IOException {
+ throw new UnsupportedOperationException("This function should no be
called.");
+ }
+
+ public WritableColumnVector readAndNewVector(int readNumber,
WritableColumnVector vector)
+ throws IOException {
+ if (isFirstRow) {
+ if (!readValue()) {
+ return vector;
+ }
+ isFirstRow = false;
+ }
+
+ // index to set value.
+ int index = 0;
+ int valueIndex = 0;
+ List<Object> valueList = new ArrayList<>();
+
+ // repeated type need two loops to read data.
+ while (!eof && index < readNumber) {
+ do {
+ valueList.add(lastValue);
+ valueIndex++;
+ } while (readValue() && (repetitionLevel != 0));
+ index++;
+ }
+
+ return fillColumnVector(valueIndex, valueList);
+ }
+
+ public LevelDelegation getLevelDelegation() {
+ int[] repetition = repetitionLevelList.toArray();
+ int[] definition = definitionLevelList.toArray();
+ repetitionLevelList.clear();
+ definitionLevelList.clear();
+ repetitionLevelList.add(repetitionLevel);
+ definitionLevelList.add(definitionLevel);
+ return new LevelDelegation(repetition, definition);
+ }
+
+ private boolean readValue() throws IOException {
+ int left = readPageIfNeed();
+ if (left > 0) {
+ // get the values of repetition and definitionLevel
+ readAndSaveRepetitionAndDefinitionLevels();
+ // read the data if it isn't null
+ if (definitionLevel == maxDefLevel) {
+ if (isCurrentPageDictionaryEncoded) {
+ int dictionaryId = dataColumn.readValueDictionaryId();
+ lastValue = dictionaryDecodeValue(dataType, dictionaryId);
+ } else {
+ lastValue = readPrimitiveTypedRow(dataType);
+ }
+ } else {
+ lastValue = null;
+ }
+ return true;
+ } else {
+ eof = true;
+ return false;
+ }
+ }
+
+ private void readAndSaveRepetitionAndDefinitionLevels() {
+ // get the values of repetition and definitionLevel
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ valuesRead++;
+ repetitionLevelList.add(repetitionLevel);
+ definitionLevelList.add(definitionLevel);
+ }
+
+ private int readPageIfNeed() throws IOException {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ // no data left in current page, load data from new page
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ return leftInPage;
+ }
+
+ private Object readPrimitiveTypedRow(DataType category) {
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dataColumn.readBytes();
+ case BOOLEAN:
+ return dataColumn.readBoolean();
+ case TIME_WITHOUT_TIME_ZONE:
+ case DATE:
+ case INTEGER:
+ return dataColumn.readInteger();
+ case TINYINT:
+ return dataColumn.readTinyInt();
+ case SMALLINT:
+ return dataColumn.readSmallInt();
+ case BIGINT:
+ return dataColumn.readLong();
+ case FLOAT:
+ return dataColumn.readFloat();
+ case DOUBLE:
+ return dataColumn.readDouble();
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dataColumn.readInteger();
+ case INT64:
+ return dataColumn.readLong();
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return dataColumn.readBytes();
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dataColumn.readMillsTimestamp();
+ default:
+ throw new RuntimeException("Unsupported type in the list: " +
type);
+ }
+ }
+
+ private Object dictionaryDecodeValue(DataType category, Integer
dictionaryValue) {
+ if (dictionaryValue == null) {
+ return null;
+ }
+
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dictionary.readBytes(dictionaryValue);
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ return dictionary.readInteger(dictionaryValue);
+ case BOOLEAN:
+ return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+ case DOUBLE:
+ return dictionary.readDouble(dictionaryValue);
+ case FLOAT:
+ return dictionary.readFloat(dictionaryValue);
+ case TINYINT:
+ return dictionary.readTinyInt(dictionaryValue);
+ case SMALLINT:
+ return dictionary.readSmallInt(dictionaryValue);
+ case BIGINT:
+ return dictionary.readLong(dictionaryValue);
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dictionary.readInteger(dictionaryValue);
+ case INT64:
+ return dictionary.readLong(dictionaryValue);
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return dictionary.readBytes(dictionaryValue);
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dictionary.readTimestamp(dictionaryValue);
+ default:
+ throw new RuntimeException("Unsupported type in the list: " +
type);
+ }
+ }
+
+ private WritableColumnVector fillColumnVector(int total, List valueList) {
+ switch (dataType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ HeapBytesVector heapBytesVector = new HeapBytesVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (src == null) {
+ heapBytesVector.setNullAt(i);
+ } else {
+ heapBytesVector.appendBytes(i, src, 0, src.length);
+ }
+ }
+ return heapBytesVector;
+ case BOOLEAN:
+ HeapBooleanVector heapBooleanVector = new
HeapBooleanVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapBooleanVector.setNullAt(i);
+ } else {
+ heapBooleanVector.vector[i] = ((List<Boolean>)
valueList).get(i);
+ }
+ }
+ return heapBooleanVector;
+ case TINYINT:
+ HeapByteVector heapByteVector = new HeapByteVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapByteVector.setNullAt(i);
+ } else {
+ heapByteVector.vector[i] =
+ (byte) ((List<Integer>)
valueList).get(i).intValue();
+ }
+ }
+ return heapByteVector;
+ case SMALLINT:
+ HeapShortVector heapShortVector = new HeapShortVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapShortVector.setNullAt(i);
+ } else {
+ heapShortVector.vector[i] =
+ (short) ((List<Integer>)
valueList).get(i).intValue();
+ }
+ }
+ return heapShortVector;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ HeapIntVector heapIntVector = new HeapIntVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapIntVector.setNullAt(i);
+ } else {
+ heapIntVector.vector[i] = ((List<Integer>)
valueList).get(i);
+ }
+ }
+ return heapIntVector;
+ case FLOAT:
+ HeapFloatVector heapFloatVector = new HeapFloatVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapFloatVector.setNullAt(i);
+ } else {
+ heapFloatVector.vector[i] = ((List<Float>)
valueList).get(i);
+ }
+ }
+ return heapFloatVector;
+ case BIGINT:
+ HeapLongVector heapLongVector = new HeapLongVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapLongVector.setNullAt(i);
+ } else {
+ heapLongVector.vector[i] = ((List<Long>)
valueList).get(i);
+ }
+ }
+ return heapLongVector;
+ case DOUBLE:
+ HeapDoubleVector heapDoubleVector = new
HeapDoubleVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapDoubleVector.setNullAt(i);
+ } else {
+ heapDoubleVector.vector[i] = ((List<Double>)
valueList).get(i);
+ }
+ }
+ return heapDoubleVector;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ HeapTimestampVector heapTimestampVector = new
HeapTimestampVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapTimestampVector.setNullAt(i);
+ } else {
+ heapTimestampVector.setTimestamp(i, ((List<Timestamp>)
valueList).get(i));
+ }
+ }
+ return heapTimestampVector;
+ case DECIMAL:
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
+ descriptor.getPrimitiveType().getPrimitiveTypeName();
+ switch (primitiveTypeName) {
+ case INT32:
+ HeapIntVector phiv = new HeapIntVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ phiv.setNullAt(i);
+ } else {
+ phiv.vector[i] = ((List<Integer>)
valueList).get(i);
+ }
+ }
+ return new ParquetDecimalVector(phiv);
+ case INT64:
+ HeapLongVector phlv = new HeapLongVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ phlv.setNullAt(i);
+ } else {
+ phlv.vector[i] = ((List<Long>)
valueList).get(i);
+ }
+ }
+ return new ParquetDecimalVector(phlv);
+ default:
+ HeapBytesVector phbv = getHeapBytesVector(total,
valueList);
+ return new ParquetDecimalVector(phbv);
+ }
+ default:
+ throw new RuntimeException("Unsupported type in the list: " +
type);
+ }
+ }
+
+ private static HeapBytesVector getHeapBytesVector(int total, List
valueList) {
+ HeapBytesVector phbv = new HeapBytesVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (valueList.get(i) == null) {
+ phbv.setNullAt(i);
+ } else {
+ phbv.appendBytes(i, src, 0, src.length);
+ }
+ }
+ return phbv;
+ }
+
+ protected void readPage() {
+ DataPage page = pageReader.readPage();
+
+ if (page == null) {
+ return;
+ }
+
+ page.accept(
+ new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream
in, int valueCount)
+ throws IOException {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ String.format(
+ "Could not read page in col %s because the
dictionary was missing for encoding %s.",
+ descriptor, dataEncoding));
+ }
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getDictionaryBasedValuesReader(
+ descriptor, VALUES,
dictionary.getDictionary()),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getValuesReader(descriptor, VALUES),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException(String.format("Could not read page in col
%s.", descriptor), e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader =
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+ ValuesReader dlReader =
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ BytesInput bytes = page.getBytes();
+ LOG.debug("Page size {} bytes and {} records.", bytes.size(),
pageValueCount);
+ ByteBufferInputStream in = bytes.toInputStream();
+ LOG.debug("Reading repetition levels at {}.", in.position());
+ rlReader.initFromPage(pageValueCount, in);
+ LOG.debug("Reading definition levels at {}.", in.position());
+ dlReader.initFromPage(pageValueCount, in);
+ LOG.debug("Reading data at {}.", in.position());
+ initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read page %s in col %s.", page,
descriptor), e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn =
+ newRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels());
+ this.definitionLevelColumn =
+ newRLEIterator(descriptor.getMaxDefinitionLevel(),
page.getDefinitionLevels());
+ try {
+ LOG.debug(
+ "Page data size {} bytes and {} records.",
+ page.getData().size(),
+ pageValueCount);
+ initDataReader(
+ page.getDataEncoding(), page.getData().toInputStream(),
page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read page %s in col %s.", page,
descriptor), e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read levels in page for col %s.",
descriptor), e);
+ }
+ }
+
+ /** Utility interface to abstract over different way to read ints with
different encodings. */
+ interface IntIterator {
+ int nextInt();
+ }
+
+ /** Reading int from {@link ValuesReader}. */
+ protected static final class ValuesReaderIntIterator implements
IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
+ protected static final class RLEIntIterator implements IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ /** Reading zero always. */
+ protected static final class NullIntIterator implements IntIterator {
+ @Override
+ public int nextInt() {
+ return 0;
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index d1ab8d660..28d308bac 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -22,8 +22,13 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.DecimalColumnVector;
+import org.apache.paimon.data.columnar.Dictionary;
import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableBytesVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableIntVector;
+import org.apache.paimon.data.columnar.writable.WritableLongVector;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -32,7 +37,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* Parquet write decimal as int32 and int64 and binary, this class wrap the
real vector to provide
* {@link DecimalColumnVector} interface.
*/
-public class ParquetDecimalVector implements DecimalColumnVector {
+public class ParquetDecimalVector
+ implements DecimalColumnVector, WritableLongVector, WritableIntVector,
WritableBytesVector {
private final ColumnVector vector;
@@ -66,4 +72,157 @@ public class ParquetDecimalVector implements
DecimalColumnVector {
public boolean isNullAt(int i) {
return vector.isNullAt(i);
}
+
+ @Override
+ public void reset() {
+ if (vector instanceof WritableColumnVector) {
+ ((WritableColumnVector) vector).reset();
+ }
+ }
+
+ @Override
+ public void setNullAt(int rowId) {
+ if (vector instanceof WritableColumnVector) {
+ ((WritableColumnVector) vector).setNullAt(rowId);
+ }
+ }
+
+ @Override
+ public void setNulls(int rowId, int count) {
+ if (vector instanceof WritableColumnVector) {
+ ((WritableColumnVector) vector).setNulls(rowId, count);
+ }
+ }
+
+ @Override
+ public void fillWithNulls() {
+ if (vector instanceof WritableColumnVector) {
+ ((WritableColumnVector) vector).fillWithNulls();
+ }
+ }
+
+ @Override
+ public void setDictionary(Dictionary dictionary) {
+ if (vector instanceof WritableColumnVector) {
+ ((WritableColumnVector) vector).setDictionary(dictionary);
+ }
+ }
+
+ @Override
+ public boolean hasDictionary() {
+ if (vector instanceof WritableColumnVector) {
+ return ((WritableColumnVector) vector).hasDictionary();
+ }
+ return false;
+ }
+
+ @Override
+ public WritableIntVector reserveDictionaryIds(int capacity) {
+ if (vector instanceof WritableColumnVector) {
+ return ((WritableColumnVector)
vector).reserveDictionaryIds(capacity);
+ }
+ throw new RuntimeException("Child vector must be instance of
WritableColumnVector");
+ }
+
+ @Override
+ public WritableIntVector getDictionaryIds() {
+ if (vector instanceof WritableColumnVector) {
+ return ((WritableColumnVector) vector).getDictionaryIds();
+ }
+ throw new RuntimeException("Child vector must be instance of
WritableColumnVector");
+ }
+
+ @Override
+ public Bytes getBytes(int i) {
+ if (vector instanceof WritableBytesVector) {
+ return ((WritableBytesVector) vector).getBytes(i);
+ }
+ throw new RuntimeException("Child vector must be instance of
WritableColumnVector");
+ }
+
+ @Override
+ public void appendBytes(int rowId, byte[] value, int offset, int length) {
+ if (vector instanceof WritableBytesVector) {
+ ((WritableBytesVector) vector).appendBytes(rowId, value, offset,
length);
+ }
+ }
+
+ @Override
+ public void fill(byte[] value) {
+ if (vector instanceof WritableBytesVector) {
+ ((WritableBytesVector) vector).fill(value);
+ }
+ }
+
+ @Override
+ public int getInt(int i) {
+ if (vector instanceof WritableIntVector) {
+ return ((WritableIntVector) vector).getInt(i);
+ }
+ throw new RuntimeException("Child vector must be instance of
WritableColumnVector");
+ }
+
+ @Override
+ public void setInt(int rowId, int value) {
+ if (vector instanceof WritableIntVector) {
+ ((WritableIntVector) vector).setInt(rowId, value);
+ }
+ }
+
+ @Override
+ public void setIntsFromBinary(int rowId, int count, byte[] src, int
srcIndex) {
+ if (vector instanceof WritableIntVector) {
+ ((WritableIntVector) vector).setIntsFromBinary(rowId, count, src,
srcIndex);
+ }
+ }
+
+ @Override
+ public void setInts(int rowId, int count, int value) {
+ if (vector instanceof WritableIntVector) {
+ ((WritableIntVector) vector).setInts(rowId, count, value);
+ }
+ }
+
+ @Override
+ public void setInts(int rowId, int count, int[] src, int srcIndex) {
+ if (vector instanceof WritableIntVector) {
+ ((WritableIntVector) vector).setInts(rowId, count, src, srcIndex);
+ }
+ }
+
+ @Override
+ public void fill(int value) {
+ if (vector instanceof WritableIntVector) {
+ ((WritableIntVector) vector).fill(value);
+ }
+ }
+
+ @Override
+ public long getLong(int i) {
+ if (vector instanceof WritableLongVector) {
+ return ((WritableLongVector) vector).getLong(i);
+ }
+ throw new RuntimeException("Child vector must be instance of
WritableColumnVector");
+ }
+
+ @Override
+ public void setLong(int rowId, long value) {
+ if (vector instanceof WritableLongVector) {
+ ((WritableLongVector) vector).setLong(rowId, value);
+ }
+ }
+
+ @Override
+ public void setLongsFromBinary(int rowId, int count, byte[] src, int
srcIndex) {
+ if (vector instanceof WritableLongVector) {
+ ((WritableLongVector) vector).setLongsFromBinary(rowId, count,
src, srcIndex);
+ }
+ }
+
+ @Override
+ public void fill(long value) {
+ if (vector instanceof WritableLongVector) {
+ ((WritableLongVector) vector).fill(value);
+ }
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 59af1f391..8a362961d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -32,7 +32,11 @@ import org.apache.paimon.data.columnar.heap.HeapShortVector;
import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.format.parquet.type.ParquetGroupField;
+import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DecimalType;
@@ -40,12 +44,20 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.io.ColumnIO;
+import org.apache.parquet.io.GroupColumnIO;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -53,8 +65,11 @@ import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
/** Util for generating {@link ColumnReader}. */
public class ParquetSplitReaderUtil {
@@ -65,6 +80,7 @@ public class ParquetSplitReaderUtil {
Type type,
List<ColumnDescriptor> columnDescriptors,
PageReadStore pages,
+ ParquetField field,
int depth)
throws IOException {
List<ColumnDescriptor> descriptors =
@@ -126,60 +142,10 @@ public class ParquetSplitReaderUtil {
((DecimalType) fieldType).getPrecision());
}
case ARRAY:
- return new ArrayColumnReader(
- descriptors.get(0),
- pages.getPageReader(descriptors.get(0)),
- true,
- descriptors.get(0).getPrimitiveType(),
- fieldType);
case MAP:
- MapType mapType = (MapType) fieldType;
- ArrayColumnReader mapKeyReader =
- new ArrayColumnReader(
- descriptors.get(0),
- pages.getPageReader(descriptors.get(0)),
- true,
- descriptors.get(0).getPrimitiveType(),
- new ArrayType(mapType.getKeyType()));
- ArrayColumnReader mapValueReader =
- new ArrayColumnReader(
- descriptors.get(1),
- pages.getPageReader(descriptors.get(1)),
- true,
- descriptors.get(1).getPrimitiveType(),
- new ArrayType(mapType.getValueType()));
- return new MapColumnReader(mapKeyReader, mapValueReader);
case MULTISET:
- MultisetType multisetType = (MultisetType) fieldType;
- ArrayColumnReader multisetKeyReader =
- new ArrayColumnReader(
- descriptors.get(0),
- pages.getPageReader(descriptors.get(0)),
- true,
- descriptors.get(0).getPrimitiveType(),
- new ArrayType(multisetType.getElementType()));
- ArrayColumnReader multisetValueReader =
- new ArrayColumnReader(
- descriptors.get(1),
- pages.getPageReader(descriptors.get(1)),
- true,
- descriptors.get(1).getPrimitiveType(),
- new ArrayType(new IntType(false)));
- return new MapColumnReader(multisetKeyReader,
multisetValueReader);
case ROW:
- RowType rowType = (RowType) fieldType;
- GroupType groupType = type.asGroupType();
- List<ColumnReader> fieldReaders = new ArrayList<>();
- for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
- }
- return new RowColumnReader(fieldReaders);
+ return new NestedColumnReader(true, pages, field);
default:
throw new UnsupportedOperationException(fieldType + " is not
supported now.");
}
@@ -300,7 +266,18 @@ public class ParquetSplitReaderUtil {
depth));
case MAP:
MapType mapType = (MapType) fieldType;
+ LogicalTypeAnnotation mapTypeAnnotation =
type.getLogicalTypeAnnotation();
GroupType mapRepeatedType =
type.asGroupType().getType(0).asGroupType();
+ if
(mapTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
+ mapRepeatedType = mapRepeatedType.getType(0).asGroupType();
+ depth++;
+ if (mapRepeatedType
+ .getLogicalTypeAnnotation()
+ .equals(LogicalTypeAnnotation.mapType())) {
+ mapRepeatedType =
mapRepeatedType.getType(0).asGroupType();
+ depth++;
+ }
+ }
return new HeapMapVector(
batchSize,
createWritableColumnVector(
@@ -317,7 +294,18 @@ public class ParquetSplitReaderUtil {
depth + 2));
case MULTISET:
MultisetType multisetType = (MultisetType) fieldType;
+ LogicalTypeAnnotation multisetTypeAnnotation =
type.getLogicalTypeAnnotation();
GroupType multisetRepeatedType =
type.asGroupType().getType(0).asGroupType();
+ if
(multisetTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
+ multisetRepeatedType =
multisetRepeatedType.getType(0).asGroupType();
+ depth++;
+ if (multisetRepeatedType
+ .getLogicalTypeAnnotation()
+ .equals(LogicalTypeAnnotation.mapType())) {
+ multisetRepeatedType =
multisetRepeatedType.getType(0).asGroupType();
+ depth++;
+ }
+ }
return new HeapMapVector(
batchSize,
createWritableColumnVector(
@@ -335,6 +323,12 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = type.asGroupType();
+ if
(LogicalTypeAnnotation.listType().equals(groupType.getLogicalTypeAnnotation()))
{
+ // this means there was two outside struct, need to get
group twice.
+ groupType = groupType.getType(0).asGroupType();
+ groupType = groupType.getType(0).asGroupType();
+ depth = depth + 2;
+ }
WritableColumnVector[] columnVectors =
new WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
@@ -371,4 +365,171 @@ public class ParquetSplitReaderUtil {
}
return res;
}
+
+ public static List<ParquetField> buildFieldsList(
+ List<DataField> childrens, List<String> fieldNames,
MessageColumnIO columnIO) {
+ List<ParquetField> list = new ArrayList<>();
+ for (int i = 0; i < childrens.size(); i++) {
+ list.add(
+ constructField(
+ childrens.get(i), lookupColumnByName(columnIO,
fieldNames.get(i))));
+ }
+ return list;
+ }
+
+ private static ParquetField constructField(DataField dataField, ColumnIO
columnIO) {
+ boolean required = columnIO.getType().getRepetition() == REQUIRED;
+ int repetitionLevel = columnIO.getRepetitionLevel();
+ int definitionLevel = columnIO.getDefinitionLevel();
+ DataType type = dataField.type();
+ String filedName = dataField.name();
+ if (type instanceof RowType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ RowType rowType = (RowType) type;
+ ImmutableList.Builder<ParquetField> fieldsBuilder =
ImmutableList.builder();
+ List<String> fieldNames = rowType.getFieldNames();
+ List<DataField> childrens = rowType.getFields();
+ for (int i = 0; i < childrens.size(); i++) {
+ fieldsBuilder.add(
+ constructField(
+ childrens.get(i),
+ lookupColumnByName(groupColumnIO,
fieldNames.get(i))));
+ }
+
+ return new ParquetGroupField(
+ type, repetitionLevel, definitionLevel, required,
fieldsBuilder.build());
+ }
+
+ if (type instanceof MapType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ GroupColumnIO keyValueColumnIO =
getMapKeyValueColumn(groupColumnIO);
+ MapType mapType = (MapType) type;
+ ParquetField keyField =
+ constructField(
+ new DataField(0, "", mapType.getKeyType()),
+ keyValueColumnIO.getChild(0));
+ ParquetField valueField =
+ constructField(
+ new DataField(0, "", mapType.getValueType()),
+ keyValueColumnIO.getChild(1));
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ ImmutableList.of(keyField, valueField));
+ }
+
+ if (type instanceof MultisetType) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ GroupColumnIO keyValueColumnIO =
getMapKeyValueColumn(groupColumnIO);
+ MultisetType multisetType = (MultisetType) type;
+ ParquetField keyField =
+ constructField(
+ new DataField(0, "",
multisetType.getElementType()),
+ keyValueColumnIO.getChild(0));
+ ParquetField valueField =
+ constructField(
+ new DataField(0, "", new IntType()),
keyValueColumnIO.getChild(1));
+ return new ParquetGroupField(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ ImmutableList.of(keyField, valueField));
+ }
+
+ if (type instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) type;
+ ColumnIO elementTypeColumnIO;
+ if (columnIO instanceof GroupColumnIO) {
+ GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+ if (!StringUtils.isNullOrWhitespaceOnly(filedName)) {
+ while (!Objects.equals(groupColumnIO.getName(),
filedName)) {
+ groupColumnIO = (GroupColumnIO)
groupColumnIO.getChild(0);
+ }
+ elementTypeColumnIO = groupColumnIO;
+ } else {
+ if (arrayType.getElementType() instanceof RowType) {
+ elementTypeColumnIO = groupColumnIO;
+ } else {
+ elementTypeColumnIO = groupColumnIO.getChild(0);
+ }
+ }
+ } else if (columnIO instanceof PrimitiveColumnIO) {
+ elementTypeColumnIO = columnIO;
+ } else {
+ throw new RuntimeException(String.format("Unknown ColumnIO,
%s", columnIO));
+ }
+
+ ParquetField field =
+ constructField(
+ new DataField(0, "", arrayType.getElementType()),
+ getArrayElementColumn(elementTypeColumnIO));
+ if (repetitionLevel == field.getRepetitionLevel()) {
+ repetitionLevel = columnIO.getParent().getRepetitionLevel();
+ }
+ return new ParquetGroupField(
+ type, repetitionLevel, definitionLevel, required,
ImmutableList.of(field));
+ }
+
+ PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
+ return new ParquetPrimitiveField(
+ type, required, primitiveColumnIO.getColumnDescriptor(),
primitiveColumnIO.getId());
+ }
+
+ /**
+ * Parquet's column names are case in sensitive. So when we look up
columns we first check for
+ * exact match, and if that can not find we look for a case-insensitive
match.
+ */
+ public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO,
String columnName) {
+ ColumnIO columnIO = groupColumnIO.getChild(columnName);
+
+ if (columnIO != null) {
+ return columnIO;
+ }
+
+ for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
+ if
(groupColumnIO.getChild(i).getName().equalsIgnoreCase(columnName)) {
+ return groupColumnIO.getChild(i);
+ }
+ }
+
+ throw new RuntimeException("Can not find column io for parquet
reader.");
+ }
+
+ public static GroupColumnIO getMapKeyValueColumn(GroupColumnIO
groupColumnIO) {
+ while (groupColumnIO.getChildrenCount() == 1) {
+ groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+ }
+ return groupColumnIO;
+ }
+
+ public static ColumnIO getArrayElementColumn(ColumnIO columnIO) {
+ while (columnIO instanceof GroupColumnIO &&
!columnIO.getType().isRepetition(REPEATED)) {
+ columnIO = ((GroupColumnIO) columnIO).getChild(0);
+ }
+
+ /* Compatible with array has a standard 3-level structure:
+ * optional group my_list (LIST) {
+ * repeated group element {
+ * required binary str (UTF8);
+ * };
+ * }
+ */
+ if (columnIO instanceof GroupColumnIO
+ && columnIO.getType().getLogicalTypeAnnotation() == null
+ && ((GroupColumnIO) columnIO).getChildrenCount() == 1
+ && !columnIO.getName().equals("array")
+ && !columnIO.getName().equals(columnIO.getParent().getName() +
"_tuple")) {
+ return ((GroupColumnIO) columnIO).getChild(0);
+ }
+
+ /* Compatible with array for 2-level arrays where a repeated field is
not a group:
+ * optional group my_list (LIST) {
+ * repeated int32 element;
+ * }
+ */
+ return columnIO;
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
new file mode 100644
index 000000000..94fe6b91d
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+/** Field that represent parquet's field type. */
+public abstract class ParquetField {
+ private final DataType type;
+ private final int repetitionLevel;
+ private final int definitionLevel;
+ private final boolean required;
+
+ public ParquetField(DataType type, int repetitionLevel, int
definitionLevel, boolean required) {
+ this.type = type;
+ this.repetitionLevel = repetitionLevel;
+ this.definitionLevel = definitionLevel;
+ this.required = required;
+ }
+
+ public DataType getType() {
+ return type;
+ }
+
+ public int getRepetitionLevel() {
+ return repetitionLevel;
+ }
+
+ public int getDefinitionLevel() {
+ return definitionLevel;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ @Override
+ public String toString() {
+ return "Field{"
+ + "type="
+ + type
+ + ", repetitionLevel="
+ + repetitionLevel
+ + ", definitionLevel="
+ + definitionLevel
+ + ", required="
+ + required
+ + '}';
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
new file mode 100644
index 000000000..95f0dd1f8
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** Field that represent parquet's Group Field. */
+public class ParquetGroupField extends ParquetField {
+
+ private final List<ParquetField> children;
+
+ public ParquetGroupField(
+ DataType type,
+ int repetitionLevel,
+ int definitionLevel,
+ boolean required,
+ List<ParquetField> children) {
+ super(type, repetitionLevel, definitionLevel, required);
+ this.children = ImmutableList.copyOf(requireNonNull(children,
"children is null"));
+ }
+
+ public List<ParquetField> getChildren() {
+ return children;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
new file mode 100644
index 000000000..148596d15
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+import static java.util.Objects.requireNonNull;
+
+/** Field that represent parquet's primitive field. */
+public class ParquetPrimitiveField extends ParquetField {
+
+ private final ColumnDescriptor descriptor;
+ private final int id;
+
+ public ParquetPrimitiveField(
+ DataType type, boolean required, ColumnDescriptor descriptor, int
id) {
+ super(
+ type,
+ descriptor.getMaxRepetitionLevel(),
+ descriptor.getMaxDefinitionLevel(),
+ required);
+ this.descriptor = requireNonNull(descriptor, "descriptor is required");
+ this.id = id;
+ }
+
+ public ColumnDescriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public int getId() {
+ return id;
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 00a7a0081..56aa83875 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.format.FormatReaderContext;
@@ -50,11 +51,21 @@ import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.ParquetFilters;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
@@ -127,6 +138,31 @@ public class ParquetReadWriteTest {
.build())
.build();
+ private static final RowType NESTED_ARRAY_MAP_TYPE =
+ RowType.of(
+ new IntType(),
+ new ArrayType(true, new IntType()),
+ new ArrayType(true, new ArrayType(true, new IntType())),
+ new ArrayType(
+ true,
+ new MapType(
+ true,
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new VarCharType(VarCharType.MAX_LENGTH))),
+ new ArrayType(true, RowType.builder().field("a", new
IntType()).build()),
+ RowType.of(
+ new ArrayType(
+ true,
+ RowType.builder()
+ .field(
+ "b",
+ new ArrayType(
+ true,
+ new
ArrayType(true, new IntType())))
+ .field("c", new IntType())
+ .build()),
+ new IntType()));
+
@TempDir public File folder;
public static Collection<Integer> parameters() {
@@ -226,7 +262,7 @@ public class ParquetReadWriteTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+ Path testPath = createTempParquetFileByPaimon(folder, records,
rowGroupSize, ROW_TYPE);
// test reader
DataType[] fieldTypes = new DataType[] {new DoubleType(), new
TinyIntType(), new IntType()};
ParquetReaderFactory format =
@@ -265,7 +301,7 @@ public class ParquetReadWriteTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+ Path testPath = createTempParquetFileByPaimon(folder, records,
rowGroupSize, ROW_TYPE);
// test reader
DataType[] fieldTypes =
@@ -311,7 +347,7 @@ public class ParquetReadWriteTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+ Path testPath = createTempParquetFileByPaimon(folder, records,
rowGroupSize, ROW_TYPE);
DataType[] fieldTypes = new DataType[] {new DoubleType()};
ParquetReaderFactory format =
@@ -353,7 +389,7 @@ public class ParquetReadWriteTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+ Path testPath = createTempParquetFileByPaimon(folder, records,
rowGroupSize, ROW_TYPE);
DataType[] fieldTypes = new DataType[] {new IntType()};
// Build filter: f4 > randomStart
@@ -394,22 +430,47 @@ public class ParquetReadWriteTest {
}
}
+ @ParameterizedTest
+ @CsvSource({"10, paimon", "1000, paimon", "10, origin", "1000, origin"})
+ public void testNestedRead(int rowGroupSize, String writerType) throws
Exception {
+ List<InternalRow> rows = prepareNestedData(1283);
+ Path path;
+ if ("paimon".equals(writerType)) {
+ path = createTempParquetFileByPaimon(folder, rows, rowGroupSize,
NESTED_ARRAY_MAP_TYPE);
+ } else if ("origin".equals(writerType)) {
+ path = createNestedDataByOriginWriter(1283, folder, rowGroupSize);
+ } else {
+ throw new RuntimeException("Unknown writer type.");
+ }
+ ParquetReaderFactory format =
+ new ParquetReaderFactory(
+ new Options(), NESTED_ARRAY_MAP_TYPE, 500,
FilterCompat.NOOP);
+ RecordReader<InternalRow> reader =
+ format.createReader(
+ new FormatReaderContext(
+ new LocalFileIO(), path, new
LocalFileIO().getFileSize(path)));
+ List<InternalRow> results = new ArrayList<>(1283);
+ reader.forEachRemaining(results::add);
+ compareNestedRow(rows, results);
+ }
+
private void innerTestTypes(File folder, List<Integer> records, int
rowGroupSize)
throws IOException {
List<InternalRow> rows =
records.stream().map(this::newRow).collect(Collectors.toList());
- Path testPath = createTempParquetFile(folder, rows, rowGroupSize);
+ Path testPath = createTempParquetFileByPaimon(folder, rows,
rowGroupSize, ROW_TYPE);
int len = testReadingFile(subList(records, 0), testPath);
assertThat(len).isEqualTo(records.size());
}
- private Path createTempParquetFile(File folder, List<InternalRow> rows,
int rowGroupSize)
+ private Path createTempParquetFileByPaimon(
+ File folder, List<InternalRow> rows, int rowGroupSize, RowType
rowType)
throws IOException {
// write data
Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
Options conf = new Options();
conf.setInteger("parquet.block.size", rowGroupSize);
ParquetWriterFactory factory =
- new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE,
conf));
+ new ParquetWriterFactory(new RowDataParquetBuilder(rowType,
conf));
String[] candidates = new String[] {"snappy", "zstd", "gzip"};
String compress = candidates[new Random().nextInt(3)];
FormatWriter writer =
@@ -633,4 +694,258 @@ public class ParquetReadWriteTest {
private static <T> List<T> subList(List<T> list, int i) {
return list.subList(i, list.size());
}
+
+ private List<InternalRow> prepareNestedData(int rowNum) {
+ List<InternalRow> rows = new ArrayList<>(rowNum);
+
+ for (int i = 0; i < rowNum; i++) {
+ Integer v = i;
+ Map<BinaryString, BinaryString> mp1 = new HashMap<>();
+ mp1.put(null, BinaryString.fromString("val_" + i));
+ Map<BinaryString, BinaryString> mp2 = new HashMap<>();
+ mp2.put(BinaryString.fromString("key_" + i), null);
+ mp2.put(BinaryString.fromString("key@" + i),
BinaryString.fromString("val@" + i));
+
+ rows.add(
+ GenericRow.of(
+ v,
+ new GenericArray(new Object[] {v, v + 1}),
+ new GenericArray(
+ new Object[] {
+ new GenericArray(new Object[] {i, i +
1, null}),
+ new GenericArray(new Object[] {i, i +
2, null}),
+ new GenericArray(new Object[] {}),
+ null
+ }),
+ new GenericArray(
+ new GenericMap[] {
+ null, new GenericMap(mp1), new
GenericMap(mp2)
+ }),
+ new GenericArray(
+ new GenericRow[] {GenericRow.of(i),
GenericRow.of(i + 1)}),
+ GenericRow.of(
+ new GenericArray(
+ new GenericRow[] {
+ GenericRow.of(
+ new GenericArray(
+ new Object[] {
+ new
GenericArray(
+
new Object[] {
+
i, i + 1, null
+ }),
+ new
GenericArray(
+
new Object[] {
+
i, i + 2, null
+ }),
+ new
GenericArray(
+
new Object[] {}),
+ null
+ }),
+ i)
+ }),
+ i)));
+ }
+ return rows;
+ }
+
+ private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int
rowGroupSize) {
+ Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString());
+ Configuration conf = new Configuration();
+ conf.setInt("parquet.block.size", rowGroupSize);
+ MessageType schema =
+ ParquetSchemaConverter.convertToParquetMessageType(
+ "paimon-parquet", NESTED_ARRAY_MAP_TYPE);
+ try (ParquetWriter<Group> writer =
+ ExampleParquetWriter.builder(
+ HadoopOutputFile.fromPath(
+ new
org.apache.hadoop.fs.Path(path.toString()), conf))
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withConf(new Configuration())
+ .withType(schema)
+ .build()) {
+ SimpleGroupFactory simpleGroupFactory = new
SimpleGroupFactory(schema);
+ for (int i = 0; i < rowNum; i++) {
+ Group row = simpleGroupFactory.newGroup();
+ // add int
+ row.append("f0", i);
+
+ // add array<int>
+ Group f1 = row.addGroup("f1");
+ createParquetArrayGroup(f1, i, i + 1);
+
+ // add array<array<int>>
+ Group f2 = row.addGroup("f2");
+ createParquetDoubleNestedArray(f2, i);
+
+ // add array<map>
+ Group f3 = row.addGroup("f3");
+ f3.addGroup(0);
+ Group mapList = f3.addGroup(0);
+ Group map1 = mapList.addGroup(0);
+ createParquetMapGroup(map1, null, "val_" + i);
+ Group map2 = mapList.addGroup(0);
+ createParquetMapGroup(map2, "key_" + i, null);
+ createParquetMapGroup(map2, "key@" + i, "val@" + i);
+
+ // add array<row>
+ Group f4 = row.addGroup("f4");
+ Group rowList = f4.addGroup(0);
+ Group row1 = rowList.addGroup(0);
+ row1.add(0, i);
+ Group row2 = rowList.addGroup(0);
+ row2.add(0, i + 1);
+ f4.addGroup(0);
+
+ // add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>,
`f1` INT>>
+ Group f5 = row.addGroup("f5");
+ Group arrayRow = f5.addGroup(0);
+ Group insideRow = arrayRow.addGroup(0).addGroup(0);
+ Group insideArray = insideRow.addGroup(0);
+ createParquetDoubleNestedArray(insideArray, i);
+ insideRow.add(1, i);
+ arrayRow.addGroup(0);
+ f5.add(1, i);
+ writer.write(row);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create nested data by parquet origin
writer failed.");
+ }
+ return path;
+ }
+
+ private void createParquetDoubleNestedArray(Group group, int i) {
+ Group outside = group.addGroup(0);
+ Group inside = outside.addGroup(0);
+ createParquetArrayGroup(inside, i, i + 1);
+ Group inside2 = outside.addGroup(0);
+ createParquetArrayGroup(inside2, i, i + 2);
+ // create empty array []
+ outside.addGroup(0);
+ // create null
+ group.addGroup(0);
+ }
+
+ private void createParquetArrayGroup(Group group, int i, int j) {
+ Group element = group.addGroup(0);
+ element.add(0, i);
+ element = group.addGroup(0);
+ element.add(0, j);
+ group.addGroup(0);
+ }
+
+ private void createParquetMapGroup(Group map, String key, String value) {
+ Group entry = map.addGroup(0);
+ if (key != null) {
+ entry.append("key", key);
+ }
+ if (value != null) {
+ entry.append("value", value);
+ }
+ }
+
+ private void compareNestedRow(List<InternalRow> rows, List<InternalRow>
results) {
+ Assertions.assertEquals(rows.size(), results.size());
+
+ for (InternalRow result : results) {
+ int index = result.getInt(0);
+ InternalRow origin = rows.get(index);
+ Assertions.assertEquals(origin.getInt(0), result.getInt(0));
+
+ // int[]
+ Assertions.assertEquals(origin.getArray(1).getInt(0),
result.getArray(1).getInt(0));
+ Assertions.assertEquals(origin.getArray(1).getInt(1),
result.getArray(1).getInt(1));
+
+ // int[][]
+ Assertions.assertEquals(
+ origin.getArray(2).getArray(0).getInt(0),
+ result.getArray(2).getArray(0).getInt(0));
+ Assertions.assertEquals(
+ origin.getArray(2).getArray(0).getInt(1),
+ result.getArray(2).getArray(0).getInt(1));
+ Assertions.assertTrue(result.getArray(2).getArray(0).isNullAt(2));
+
+ Assertions.assertEquals(
+ origin.getArray(2).getArray(1).getInt(0),
+ result.getArray(2).getArray(1).getInt(0));
+ Assertions.assertEquals(
+ origin.getArray(2).getArray(1).getInt(1),
+ result.getArray(2).getArray(1).getInt(1));
+ Assertions.assertTrue(result.getArray(2).getArray(1).isNullAt(2));
+
+ Assertions.assertEquals(0, result.getArray(2).getArray(2).size());
+ Assertions.assertTrue(result.getArray(2).isNullAt(3));
+
+ // map[]
+ Assertions.assertTrue(result.getArray(3).isNullAt(0));
+
Assertions.assertTrue(result.getArray(3).getMap(1).keyArray().isNullAt(0));
+
+ Assertions.assertEquals(
+ origin.getArray(3).getMap(1).valueArray().getString(0),
+ result.getArray(3).getMap(1).valueArray().getString(0));
+
+ Map<String, String> originMap = new HashMap<>();
+ Map<String, String> resultMap = new HashMap<>();
+ fillWithMap(originMap, origin.getArray(3).getMap(2), 0);
+ fillWithMap(originMap, origin.getArray(3).getMap(2), 1);
+ fillWithMap(resultMap, result.getArray(3).getMap(2), 0);
+ fillWithMap(resultMap, result.getArray(3).getMap(2), 1);
+ Assertions.assertEquals(originMap, resultMap);
+
+ // row<int>[]
+ Assertions.assertEquals(
+ origin.getArray(4).getRow(0, 1).getInt(0),
+ result.getArray(4).getRow(0, 1).getInt(0));
+ Assertions.assertEquals(
+ origin.getArray(4).getRow(1, 1).getInt(0),
+ result.getArray(4).getRow(1, 1).getInt(0));
+
+ Assertions.assertEquals(
+ origin.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(0).getInt(0),
+ result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(0).getInt(0));
+ Assertions.assertEquals(
+ origin.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(0).getInt(1),
+ result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(0).getInt(1));
+ Assertions.assertTrue(
+ result.getRow(5, 2)
+ .getArray(0)
+ .getRow(0, 2)
+ .getArray(0)
+ .getArray(0)
+ .isNullAt(2));
+
+ Assertions.assertEquals(
+ origin.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(1).getInt(0),
+ result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(1).getInt(0));
+ Assertions.assertEquals(
+ origin.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(1).getInt(1),
+ result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(1).getInt(1));
+ Assertions.assertTrue(
+ result.getRow(5, 2)
+ .getArray(0)
+ .getRow(0, 2)
+ .getArray(0)
+ .getArray(1)
+ .isNullAt(2));
+
+ Assertions.assertEquals(
+ 0, result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).getArray(2).size());
+ Assertions.assertTrue(
+ result.getRow(5, 2).getArray(0).getRow(0,
2).getArray(0).isNullAt(3));
+
+ Assertions.assertEquals(
+ origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1),
+ result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
+ Assertions.assertEquals(origin.getRow(5, 2).getInt(1),
result.getRow(5, 2).getInt(1));
+ }
+ }
+
+ private void fillWithMap(Map<String, String> map, InternalMap internalMap,
int index) {
+ map.put(
+ internalMap.keyArray().isNullAt(index)
+ ? null
+ : internalMap.keyArray().getString(index).toString(),
+ internalMap.valueArray().isNullAt(index)
+ ? null
+ :
internalMap.valueArray().getString(index).toString());
+ }
}