This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e362c85d9 [parquet] Support parquet two level list representations
(#5602)
9e362c85d9 is described below
commit 9e362c85d92ef60b1e59f64ec8da2e1f7783d1fd
Author: YeJunHao <[email protected]>
AuthorDate: Wed May 14 15:26:46 2025 +0800
[parquet] Support parquet two level list representations (#5602)
---
.../format/parquet/ParquetReaderFactory.java | 38 ++++--
.../format/parquet/ParquetSchemaConverter.java | 21 +++-
.../reader/FileTypeNotMatchReadTypeTest.java | 40 +++++++
.../parquet/reader/SimpleGroupWriteSupport.java | 130 +++++++++++++++++++++
4 files changed, 212 insertions(+), 17 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 65577fba30..6af5e0c01a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -196,20 +196,34 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
Preconditions.checkArgument(
listSubFields == 1,
"Parquet list group type should only have one middle
level REPEATED field.");
+ // There are two representations for array type in parquet.
+ // See link:
+ //
https://impala.apache.org/docs/build/html/topics/impala_parquet_array_resolution.html.
+ int level = arrayGroup.getType(0) instanceof GroupType ? 3 : 2;
Type elementType =
clipParquetType(
- arrayType.getElementType(),
parquetListElementType(arrayGroup));
- // In case that the name in middle level is not "list".
- Type groupMiddle =
- new GroupType(
- Type.Repetition.REPEATED,
- arrayGroup.getType(0).getName(),
- elementType);
- return new GroupType(
- arrayGroup.getRepetition(),
- arrayGroup.getName(),
- OriginalType.LIST,
- groupMiddle);
+ arrayType.getElementType(),
+ parquetListElementType(arrayGroup, level));
+
+ if (level == 3) {
+ // In case that the name in middle level is not "list".
+ Type groupMiddle =
+ new GroupType(
+ Type.Repetition.REPEATED,
+ arrayGroup.getType(0).getName(),
+ elementType);
+ return new GroupType(
+ arrayGroup.getRepetition(),
+ arrayGroup.getName(),
+ OriginalType.LIST,
+ groupMiddle);
+ } else {
+ return new GroupType(
+ arrayGroup.getRepetition(),
+ arrayGroup.getName(),
+ OriginalType.LIST,
+ elementType);
+ }
default:
return parquetType;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 6b02ed6f7d..e2fe63618c 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -373,9 +373,11 @@ public class ParquetSchemaConverter {
} else {
GroupType groupType = parquetType.asGroupType();
if (logicalType instanceof
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+ int level = groupType.getType(0) instanceof GroupType ? 3 : 2;
paimonDataType =
new ArrayType(
-
convertToPaimonField(parquetListElementType(groupType)).type());
+
convertToPaimonField(parquetListElementType(groupType, level))
+ .type());
} else if (logicalType instanceof
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
Pair<Type, Type> keyValueType =
parquetMapKeyValueType(groupType);
paimonDataType =
@@ -400,10 +402,19 @@ public class ParquetSchemaConverter {
return new DataField(parquetType.getId().intValue(),
parquetType.getName(), paimonDataType);
}
- public static Type parquetListElementType(GroupType listType) {
- // List type should only have one middle group type, which is
repeated, and one element
- // type, which is optional.
- return listType.getType(0).asGroupType().getType(0);
+ public static Type parquetListElementType(GroupType listType, int level) {
+ if (level == 3) {
+ // Level 3 representation of list type.
+ // List type should only have one middle group type, which is
repeated, and one element
+ // type, which is optional.
+ return listType.getType(0).asGroupType().getType(0);
+ } else if (level == 2) {
+ // Level 2 representation of list type
+ return listType.getType(0);
+ } else {
+ throw new UnsupportedOperationException(
+ "Parquet list type only have two level representation and
three level representation.");
+ }
}
public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
index 71bcf7af03..bae1c98a8c 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
@@ -32,6 +32,7 @@ import
org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
@@ -53,6 +54,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@@ -193,6 +195,44 @@ public class FileTypeNotMatchReadTypeTest {
file.delete();
}
+ @Test
+ public void testArray2() throws Exception {
+ String fileName = "test.parquet";
+ String fileWholePath = tempDir + "/" + fileName;
+
+ SimpleGroupWriteSupport simpleGroupWriteSupport = new
SimpleGroupWriteSupport();
+ simpleGroupWriteSupport.writeTest(
+ fileWholePath,
+ Arrays.asList(
+ new
SimpleGroupWriteSupport.SimpleGroup(Arrays.asList(1, 21, 242)),
+ new
SimpleGroupWriteSupport.SimpleGroup(Arrays.asList(4, 221, 12))));
+
+ RowType rowType =
+ RowType.of(new DataField(0, "list_of_ints",
DataTypes.ARRAY(DataTypes.INT())));
+
+ ParquetReaderFactory parquetReaderFactory =
+ new ParquetReaderFactory(new Options(), rowType, 100, null);
+
+ File file = new File(fileWholePath);
+ FileRecordReader<InternalRow> fileRecordReader =
+ parquetReaderFactory.createReader(
+ new FormatReaderContext(
+ LocalFileIO.create(),
+ new
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+ file.length()));
+
+ FileRecordIterator<InternalRow> batch = fileRecordReader.readBatch();
+ InternalRow row = batch.next();
+ assertThat(row.getArray(0).getInt(0)).isEqualTo(1);
+ assertThat(row.getArray(0).getInt(1)).isEqualTo(21);
+ assertThat(row.getArray(0).getInt(2)).isEqualTo(242);
+ row = batch.next();
+ assertThat(row.getArray(0).getInt(0)).isEqualTo(4);
+ assertThat(row.getArray(0).getInt(1)).isEqualTo(221);
+ assertThat(row.getArray(0).getInt(2)).isEqualTo(12);
+ file.delete();
+ }
+
@Test
public void testMap() throws Exception {
String fileName = "test.parquet";
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
new file mode 100644
index 0000000000..6b995392ac
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.format.parquet.writer.StreamOutputFile;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/** For writing two level representations for parquet list type. */
+class SimpleGroupWriteSupport extends
WriteSupport<SimpleGroupWriteSupport.SimpleGroup> {
+
+ private org.apache.parquet.schema.MessageType schema;
+ private RecordConsumer recordConsumer;
+
+ static class SimpleGroup {
+ List<Integer> numbers;
+
+ public SimpleGroup(List<Integer> numbers) {
+ this.numbers = numbers;
+ }
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ String schemaString =
+ "message Record { required group list_of_ints (LIST) {
repeated int32 list_of_ints_tuple; } }";
+ this.schema = MessageTypeParser.parseMessageType(schemaString);
+ return new WriteContext(schema, Collections.emptyMap());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(SimpleGroup record) {
+ recordConsumer.startMessage();
+
+ int listGroupIndex = schema.getFieldIndex("list_of_ints");
+ String listGroupName = schema.getFieldName(listGroupIndex);
+ GroupType listGroupType = schema.getType(listGroupIndex).asGroupType();
+
+ int elementsFieldIndex =
listGroupType.getFieldIndex("list_of_ints_tuple");
+ String elementsFieldName =
listGroupType.getFieldName(elementsFieldIndex);
+ recordConsumer.startField(listGroupName, listGroupIndex);
+
+ if (record != null && record.numbers != null) {
+ recordConsumer.startField(elementsFieldName, elementsFieldIndex);
+ for (Integer number : record.numbers) {
+ recordConsumer.addInteger(number);
+ }
+ recordConsumer.endField(elementsFieldName, elementsFieldIndex);
+ }
+ recordConsumer.endField(listGroupName, listGroupIndex);
+ recordConsumer.endMessage();
+ }
+
+ static class Builder
+ extends ParquetWriter.Builder<SimpleGroupWriteSupport.SimpleGroup,
Builder> {
+
+ protected Builder(OutputFile path) {
+ super(path);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport<SimpleGroupWriteSupport.SimpleGroup>
getWriteSupport(
+ Configuration conf) {
+ return new SimpleGroupWriteSupport();
+ }
+ }
+
+ public void writeTest(String path,
List<SimpleGroupWriteSupport.SimpleGroup> records)
+ throws IOException {
+ Configuration conf = new Configuration();
+
+ ParquetWriter<SimpleGroupWriteSupport.SimpleGroup> writer = null;
+ try {
+
+ writer =
+ new Builder(
+ new StreamOutputFile(
+ new
LocalFileIO.LocalPositionOutputStream(
+ new File(path))))
+ .withConf(conf)
+ .build();
+ for (SimpleGroupWriteSupport.SimpleGroup record : records) {
+ writer.write(record);
+ }
+
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+}