This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c59fc04300 [variant] Extract only required columns when reading
shredded variants (#6720)
c59fc04300 is described below
commit c59fc04300ffe053a8d7923a9d20f0fd2fa9263f
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 2 11:49:08 2025 +0800
[variant] Extract only required columns when reading shredded variants
(#6720)
---
.../paimon/data/variant/VariantAccessInfo.java | 4 +
.../data/variant/VariantAccessInfoUtils.java | 51 +++++++++++
.../paimon/data/variant/VariantCastArgs.java | 5 +
.../format/parquet/reader/ParquetReaderUtil.java | 7 +-
.../format/parquet/ParquetFormatReadWriteTest.java | 101 +++++++++++++++++++++
5 files changed, 167 insertions(+), 1 deletion(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
index b4d8c22f30..f4dfa65cfd 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
@@ -77,6 +77,10 @@ public class VariantAccessInfo implements Serializable {
this.castArgs = castArgs;
}
+ public VariantField(DataField dataField, String path) {
+ this(dataField, path, VariantCastArgs.defaultArgs());
+ }
+
public DataField dataField() {
return dataField;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
index 557caa14fb..151e89cece 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
@@ -23,8 +23,14 @@ import org.apache.paimon.types.RowType;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.METADATA_FIELD_NAME;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.VARIANT_VALUE_FIELD_NAME;
/** Utils for variant access. */
public class VariantAccessInfoUtils {
@@ -61,4 +67,49 @@ public class VariantAccessInfoUtils {
}
return new RowType(fields);
}
+
+ /** Clip the variant schema to read with variant access fields. */
+ public static RowType clipVariantSchema(
+ RowType shreddingSchema, List<VariantAccessInfo.VariantField>
variantFields) {
+ boolean canClip = true;
+ Set<String> fieldsToRead = new HashSet<>();
+ for (VariantAccessInfo.VariantField variantField : variantFields) {
+ VariantPathSegment[] pathSegments =
VariantPathSegment.parse(variantField.path());
+ if (pathSegments.length < 1) {
+ canClip = false;
+ break;
+ }
+
+ // todo: support nested column pruning
+ VariantPathSegment pathSegment = pathSegments[0];
+ if (pathSegment instanceof VariantPathSegment.ObjectExtraction) {
+ fieldsToRead.add(((VariantPathSegment.ObjectExtraction)
pathSegment).getKey());
+ } else {
+ canClip = false;
+ break;
+ }
+ }
+
+ if (!canClip) {
+ return shreddingSchema;
+ }
+
+ List<DataField> typedFieldsToRead = new ArrayList<>();
+ DataField typedValue =
shreddingSchema.getField(TYPED_VALUE_FIELD_NAME);
+ for (DataField field : ((RowType) typedValue.type()).getFields()) {
+ if (fieldsToRead.contains(field.name())) {
+ typedFieldsToRead.add(field);
+ fieldsToRead.remove(field.name());
+ }
+ }
+
+ List<DataField> shreddingSchemaFields = new ArrayList<>();
+
shreddingSchemaFields.add(shreddingSchema.getField(METADATA_FIELD_NAME));
+ // If there are fields to read not in the `typed_value`, add the
`value` field.
+ if (!fieldsToRead.isEmpty()) {
+
shreddingSchemaFields.add(shreddingSchema.getField(VARIANT_VALUE_FIELD_NAME));
+ }
+ shreddingSchemaFields.add(typedValue.newType(new
RowType(typedFieldsToRead)));
+ return new RowType(shreddingSchemaFields);
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
index bdc4df9aff..2623d39371 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
@@ -20,6 +20,7 @@ package org.apache.paimon.data.variant;
import java.io.Serializable;
import java.time.ZoneId;
+import java.time.ZoneOffset;
/** Several parameters used by `VariantGet.cast`. Packed together to simplify
parameter passing. */
public class VariantCastArgs implements Serializable {
@@ -42,6 +43,10 @@ public class VariantCastArgs implements Serializable {
return zoneId;
}
+ public static VariantCastArgs defaultArgs() {
+ return new VariantCastArgs(true, ZoneOffset.UTC);
+ }
+
@Override
public String toString() {
return "VariantCastArgs{" + "failOnError=" + failOnError + ", zoneId="
+ zoneId + '}';
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index 0b29e7e5ed..0f9b025e20 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -285,9 +285,14 @@ public class ParquetReaderUtil {
if (type instanceof VariantType) {
if (shreddingSchema != null) {
VariantType variantType = (VariantType) type;
+ DataType clippedParquetType =
+ variantFields == null
+ ? shreddingSchema
+ : VariantAccessInfoUtils.clipVariantSchema(
+ shreddingSchema, variantFields);
ParquetGroupField parquetField =
(ParquetGroupField)
-
constructField(dataField.newType(shreddingSchema), columnIO);
+
constructField(dataField.newType(clippedParquetType), columnIO);
DataType readType =
variantFields == null
? variantType
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 024ea93b6e..a174411cc5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -18,13 +18,22 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -34,12 +43,16 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** A parquet {@link FormatReadWriteTest}. */
public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
@@ -88,4 +101,92 @@ public class ParquetFormatReadWriteTest extends
FormatReadWriteTest {
}
}
}
+
+ @Test
+ public void testReadShreddedVariant() throws Exception {
+ Options options = new Options();
+ options.set(
+ "parquet.variant.shreddingSchema",
+
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"v\",\"type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"age\",\"type\":\"INT\"},{\"name\":\"city\",\"type\":\"STRING\"}]}}]}");
+ ParquetFileFormat format =
+ new ParquetFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v",
DataTypes.VARIANT()));
+
+ FormatWriterFactory factory = format.createWriterFactory(writeType);
+ write(
+ factory,
+ file,
+
GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}")),
+
GenericRow.of(GenericVariant.fromJson("{\"age\":25,\"other\":\"Hello\"}")));
+
+ // read without pruning
+ List<InternalRow> result1 = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(writeType, writeType, new
ArrayList<>())
+ .createReader(
+ new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file)))) {
+ InternalRowSerializer serializer = new
InternalRowSerializer(writeType);
+ reader.forEachRemaining(row -> result1.add(serializer.copy(row)));
+ }
+ assertThat(result1.get(0).getVariant(0).toJson())
+ .isEqualTo("{\"age\":35,\"city\":\"Chicago\"}");
+ assertThat(result1.get(1).getVariant(0).toJson())
+ .isEqualTo("{\"age\":25,\"other\":\"Hello\"}");
+
+ // read with typed col only
+ List<VariantAccessInfo.VariantField> variantFields2 = new
ArrayList<>();
+ variantFields2.add(
+ new VariantAccessInfo.VariantField(
+ new DataField(0, "age", DataTypes.INT()), "$.age"));
+ VariantAccessInfo[] variantAccess2 = {new VariantAccessInfo("v",
variantFields2)};
+ RowType readStructType2 =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0, "v", DataTypes.ROW(DataTypes.FIELD(0,
"age", DataTypes.INT()))));
+ List<InternalRow> result2 = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(writeType, writeType, new
ArrayList<>(), variantAccess2)
+ .createReader(
+ new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file)))) {
+ InternalRowSerializer serializer = new
InternalRowSerializer(readStructType2);
+ reader.forEachRemaining(row -> result2.add(serializer.copy(row)));
+ }
+
assertThat(result2.get(0).equals(GenericRow.of(GenericRow.of(35)))).isTrue();
+
assertThat(result2.get(1).equals(GenericRow.of(GenericRow.of(25)))).isTrue();
+
+ // read with typed col and untyped col
+ List<VariantAccessInfo.VariantField> variantFields3 = new
ArrayList<>();
+ variantFields3.add(
+ new VariantAccessInfo.VariantField(
+ new DataField(0, "age", DataTypes.INT()), "$.age"));
+ variantFields3.add(
+ new VariantAccessInfo.VariantField(
+ new DataField(1, "other", DataTypes.STRING()),
"$.other"));
+ VariantAccessInfo[] variantAccess3 = {new VariantAccessInfo("v",
variantFields3)};
+ RowType readStructType3 =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "v",
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "age",
DataTypes.INT()),
+ DataTypes.FIELD(1, "other",
DataTypes.STRING()))));
+ List<InternalRow> result3 = new ArrayList<>();
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(writeType, writeType, new
ArrayList<>(), variantAccess3)
+ .createReader(
+ new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file)))) {
+ InternalRowSerializer serializer = new
InternalRowSerializer(readStructType3);
+ reader.forEachRemaining(row -> result3.add(serializer.copy(row)));
+ }
+ assertThat(result3.get(0).equals(GenericRow.of(GenericRow.of(35,
null)))).isTrue();
+ assertThat(
+ result3.get(1)
+ .equals(
+ GenericRow.of(
+ GenericRow.of(
+ 25,
BinaryString.fromString("Hello")))))
+ .isTrue();
+ }
}