This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e3284b49d8 [spark] Fix read shredded and unshredded variant both
(#6936)
e3284b49d8 is described below
commit e3284b49d80e860fbf5e0b40af25736c5244da3d
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 31 22:07:20 2025 +0800
[spark] Fix read shredded and unshredded variant both (#6936)
---
.../paimon/data/variant/PaimonShreddingUtils.java | 4 ---
.../data/variant/VariantAccessInfoUtils.java | 4 +++
.../apache/paimon/format/parquet/VariantUtils.java | 33 ++++++++++++++--------
.../apache/paimon/spark/sql/VariantTestBase.scala | 29 +++++++++++++++++++
4 files changed, 55 insertions(+), 15 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index 79845fb030..189ac8e83b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -408,10 +408,6 @@ public class PaimonShreddingUtils {
default:
throw invalidVariantShreddingSchema(rowType);
}
-
- if (topLevel && (topLevelMetadataIdx == -1)) {
- topLevelMetadataIdx = i;
- }
}
if (topLevel != (topLevelMetadataIdx >= 0)) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
index 151e89cece..b445de465a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
@@ -71,6 +71,10 @@ public class VariantAccessInfoUtils {
/** Clip the variant schema to read with variant access fields. */
public static RowType clipVariantSchema(
RowType shreddingSchema, List<VariantAccessInfo.VariantField>
variantFields) {
+ if (!shreddingSchema.containsField(TYPED_VALUE_FIELD_NAME)) {
+ return shreddingSchema;
+ }
+
boolean canClip = true;
Set<String> fieldsToRead = new HashSet<>();
for (VariantAccessInfo.VariantField variantField : variantFields) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 77c11929aa..2b2382304e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.variant.PaimonShreddingUtils;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -35,6 +36,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import static org.apache.paimon.data.variant.Variant.METADATA;
+import static org.apache.paimon.data.variant.Variant.VALUE;
+
/** Utils for variant. */
public class VariantUtils {
@@ -44,17 +48,24 @@ public class VariantUtils {
RowType[] shreddingSchemas = new RowType[readFields.length];
for (int i = 0; i < readFields.length; i++) {
DataField field = readFields[i];
- if (field.type() instanceof VariantType
- && fileSchema
- .getType(field.name())
- .asGroupType()
-
.containsField(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME)) {
- RowType shreddingSchema =
- (RowType)
- ParquetSchemaConverter.convertToPaimonField(
-
fileSchema.getType(field.name()))
- .type();
- shreddingSchemas[i] = shreddingSchema;
+ if (field.type() instanceof VariantType) {
+ boolean isShredded =
+ fileSchema
+ .getType(field.name())
+ .asGroupType()
+
.containsField(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME);
+ if (isShredded) {
+ shreddingSchemas[i] =
+ (RowType)
+
ParquetSchemaConverter.convertToPaimonField(
+
fileSchema.getType(field.name()))
+ .type();
+ } else {
+ List<DataField> dataFields = new ArrayList<>();
+ dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
+ dataFields.add(new DataField(1, METADATA,
DataTypes.BYTES()));
+ shreddingSchemas[i] = new RowType(dataFields);
+ }
}
}
return shreddingSchemas;
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 3db82180cf..da4fc175fe 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -172,6 +172,35 @@ abstract class VariantTestBase extends PaimonSparkTestBase
{
)
}
+ test("Paimon Variant: read and write shredded and unshredded variant") {
+ sql(
+ """
+ |CREATE TABLE T
+ |(id INT, v1 VARIANT, v2 VARIANT, v3 VARIANT)
+ |TBLPROPERTIES
+ |('parquet.variant.shreddingSchema' =
+
|'{"type":"ROW","fields":[{"name":"v1","type":{"type":"ROW","fields":[{"name":"age","type":"INT"},{"name":"city","type":"STRING"}]}}]}'
+ |)
+ |""".stripMargin)
+ sql(
+ """
+ |INSERT INTO T VALUES
+ | (1, parse_json('{"age":26,"city":"Beijing"}'),
parse_json('{"age":26,"city":"Beijing"}'),
parse_json('{"age":26,"city":"Beijing"}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM T"),
+ sql(
+ """SELECT 1, parse_json('{"age":26,"city":"Beijing"}'),
parse_json('{"age":26,"city":"Beijing"}'),
parse_json('{"age":26,"city":"Beijing"}')""")
+ )
+
+ checkAnswer(
+ sql(
+ "SELECT variant_get(v1, '$.age', 'int'), variant_get(v2, '$.age',
'int'), variant_get(v3, '$.age', 'int') FROM T"),
+ Seq(Row(26, 26, 26))
+ )
+ }
+
test("Paimon Variant: read and write shredded variant with all types") {
for (isPkTable <- Seq(true, false)) {
val pkProps = if (isPkTable) "'primary-key' = 'id'," else ""