This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e3284b49d8 [spark] Fix read shredded and unshredded variant both 
(#6936)
e3284b49d8 is described below

commit e3284b49d80e860fbf5e0b40af25736c5244da3d
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 31 22:07:20 2025 +0800

    [spark] Fix read shredded and unshredded variant both (#6936)
---
 .../paimon/data/variant/PaimonShreddingUtils.java  |  4 ---
 .../data/variant/VariantAccessInfoUtils.java       |  4 +++
 .../apache/paimon/format/parquet/VariantUtils.java | 33 ++++++++++++++--------
 .../apache/paimon/spark/sql/VariantTestBase.scala  | 29 +++++++++++++++++++
 4 files changed, 55 insertions(+), 15 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index 79845fb030..189ac8e83b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -408,10 +408,6 @@ public class PaimonShreddingUtils {
                 default:
                     throw invalidVariantShreddingSchema(rowType);
             }
-
-            if (topLevel && (topLevelMetadataIdx == -1)) {
-                topLevelMetadataIdx = i;
-            }
         }
 
         if (topLevel != (topLevelMetadataIdx >= 0)) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
index 151e89cece..b445de465a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
@@ -71,6 +71,10 @@ public class VariantAccessInfoUtils {
     /** Clip the variant schema to read with variant access fields. */
     public static RowType clipVariantSchema(
             RowType shreddingSchema, List<VariantAccessInfo.VariantField> 
variantFields) {
+        if (!shreddingSchema.containsField(TYPED_VALUE_FIELD_NAME)) {
+            return shreddingSchema;
+        }
+
         boolean canClip = true;
         Set<String> fieldsToRead = new HashSet<>();
         for (VariantAccessInfo.VariantField variantField : variantFields) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 77c11929aa..2b2382304e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.variant.PaimonShreddingUtils;
 import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VariantType;
 import org.apache.paimon.utils.JsonSerdeUtil;
@@ -35,6 +36,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import static org.apache.paimon.data.variant.Variant.METADATA;
+import static org.apache.paimon.data.variant.Variant.VALUE;
+
 /** Utils for variant. */
 public class VariantUtils {
 
@@ -44,17 +48,24 @@ public class VariantUtils {
         RowType[] shreddingSchemas = new RowType[readFields.length];
         for (int i = 0; i < readFields.length; i++) {
             DataField field = readFields[i];
-            if (field.type() instanceof VariantType
-                    && fileSchema
-                            .getType(field.name())
-                            .asGroupType()
-                            
.containsField(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME)) {
-                RowType shreddingSchema =
-                        (RowType)
-                                ParquetSchemaConverter.convertToPaimonField(
-                                                
fileSchema.getType(field.name()))
-                                        .type();
-                shreddingSchemas[i] = shreddingSchema;
+            if (field.type() instanceof VariantType) {
+                boolean isShredded =
+                        fileSchema
+                                .getType(field.name())
+                                .asGroupType()
+                                
.containsField(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME);
+                if (isShredded) {
+                    shreddingSchemas[i] =
+                            (RowType)
+                                    
ParquetSchemaConverter.convertToPaimonField(
+                                                    
fileSchema.getType(field.name()))
+                                            .type();
+                } else {
+                    List<DataField> dataFields = new ArrayList<>();
+                    dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
+                    dataFields.add(new DataField(1, METADATA, 
DataTypes.BYTES()));
+                    shreddingSchemas[i] = new RowType(dataFields);
+                }
             }
         }
         return shreddingSchemas;
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 3db82180cf..da4fc175fe 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -172,6 +172,35 @@ abstract class VariantTestBase extends PaimonSparkTestBase 
{
     )
   }
 
+  test("Paimon Variant: read and write shredded and unshredded variant") {
+    sql(
+      """
+        |CREATE TABLE T
+        |(id INT, v1 VARIANT, v2 VARIANT, v3 VARIANT)
+        |TBLPROPERTIES
+        |('parquet.variant.shreddingSchema' =
+        
|'{"type":"ROW","fields":[{"name":"v1","type":{"type":"ROW","fields":[{"name":"age","type":"INT"},{"name":"city","type":"STRING"}]}}]}'
+        |)
+        |""".stripMargin)
+    sql(
+      """
+        |INSERT INTO T VALUES
+        | (1, parse_json('{"age":26,"city":"Beijing"}'), 
parse_json('{"age":26,"city":"Beijing"}'), 
parse_json('{"age":26,"city":"Beijing"}'))
+        | """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM T"),
+      sql(
+        """SELECT 1, parse_json('{"age":26,"city":"Beijing"}'), 
parse_json('{"age":26,"city":"Beijing"}'), 
parse_json('{"age":26,"city":"Beijing"}')""")
+    )
+
+    checkAnswer(
+      sql(
+        "SELECT variant_get(v1, '$.age', 'int'), variant_get(v2, '$.age', 
'int'), variant_get(v3, '$.age', 'int') FROM T"),
+      Seq(Row(26, 26, 26))
+    )
+  }
+
   test("Paimon Variant: read and write shredded variant with all types") {
     for (isPkTable <- Seq(true, false)) {
       val pkProps = if (isPkTable) "'primary-key' = 'id'," else ""

Reply via email to