This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 623d30d860 [variant] Annotate Variant columns with Variant logical 
type annotation (#7110)
623d30d860 is described below

commit 623d30d860d31aecc894d083dcc77b93caad2ba4
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 23 04:38:12 2026 +0800

    [variant] Annotate Variant columns with Variant logical type annotation 
(#7110)
---
 docs/content/spark/quick-start.md                  |  5 +++
 .../paimon/data/variant/PaimonShreddingUtils.java  |  3 +-
 .../org/apache/paimon/data/variant/Variant.java    |  2 ++
 .../paimon/data/variant/VariantMetadataUtils.java  |  9 +++++
 paimon-format/pom.xml                              |  1 -
 .../format/parquet/ParquetSchemaConverter.java     | 17 +++++++--
 .../apache/paimon/format/parquet/VariantUtils.java |  4 +--
 .../writer/InferVariantShreddingWriteTest.java     | 40 +++++++++++++++++++++-
 8 files changed, 73 insertions(+), 8 deletions(-)

diff --git a/docs/content/spark/quick-start.md 
b/docs/content/spark/quick-start.md
index 7a4341a4a0..79268feab1 100644
--- a/docs/content/spark/quick-start.md
+++ b/docs/content/spark/quick-start.md
@@ -358,6 +358,11 @@ All Spark's data types are available in package 
`org.apache.spark.sql.types`.
       <td><code>VarBinaryType</code>, <code>BinaryType</code></td>
       <td>true</td>
     </tr>
+    <tr>
+      <td><code>VariantType(Spark4.0+)</code></td>
+      <td><code>VariantType</code></td>
+      <td>true</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index dd4b48a17e..cd7884bc64 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -211,7 +211,8 @@ public class PaimonShreddingUtils {
     }
 
     public static RowType variantShreddingSchema(DataType dataType) {
-        return variantShreddingSchema(dataType, true, false);
+        return VariantMetadataUtils.addVariantMetadata(
+                variantShreddingSchema(dataType, true, false));
     }
 
     /**
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
index 5bc19972f9..ad44bddeb6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
@@ -37,6 +37,8 @@ import java.time.ZoneOffset;
  */
 public interface Variant {
 
+    byte VARIANT_SPEC_VERSION = (byte) 1;
+
     String METADATA = "metadata";
 
     String VALUE = "value";
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
index 735ea08af3..a0a6208035 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
@@ -67,6 +67,15 @@ public class VariantMetadataUtils {
         return true;
     }
 
+    /** Add metadata to the top-level fields to mark it s a shredding schema 
for writers. */
+    public static RowType addVariantMetadata(RowType rowType) {
+        List<DataField> fields = new ArrayList<>();
+        for (DataField f : rowType.getFields()) {
+            fields.add((f.newDescription(METADATA_KEY)));
+        }
+        return rowType.copy(fields);
+    }
+
     /** Extract the path from variant metadata description. */
     public static String path(String description) {
         return splitDescription(description)[0];
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index 4f7f11965a..75a49da924 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -351,7 +351,6 @@ under the License.
                                     
<include>org.apache.parquet:parquet-format-structures</include>
                                     
<include>org.apache.parquet:parquet-jackson</include>
                                     
<include>commons-pool:commons-pool</include>
-                                    
<include>commons-pool:commons-pool</include>
                                     
<include>org.locationtech.jts:jts-core</include>
 
                                     <!-- compress -->
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 19d1beb678..bfb6297fae 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantMetadataUtils;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
@@ -213,20 +214,30 @@ public class ParquetSchemaConverter {
                         .withId(fieldId);
             case ROW:
                 RowType rowType = (RowType) type;
-                return new GroupType(repetition, name, 
convertToParquetTypes(rowType))
+                Types.GroupBuilder<GroupType> groupTypeBuilder = 
Types.buildGroup(repetition);
+                if (VariantMetadataUtils.isVariantRowType(rowType)) {
+                    groupTypeBuilder.as(
+                            
LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
+                }
+                return groupTypeBuilder
+                        .addFields(convertToParquetTypes(rowType))
+                        .named(name)
                         .withId(fieldId);
             case VARIANT:
                 return Types.buildGroup(repetition)
+                        
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
                         .addField(
                                 Types.primitive(
                                                 
PrimitiveType.PrimitiveTypeName.BINARY,
                                                 Type.Repetition.REQUIRED)
-                                        .named(Variant.VALUE))
+                                        .named(Variant.VALUE)
+                                        .withId(0))
                         .addField(
                                 Types.primitive(
                                                 
PrimitiveType.PrimitiveTypeName.BINARY,
                                                 Type.Repetition.REQUIRED)
-                                        .named(Variant.METADATA))
+                                        .named(Variant.METADATA)
+                                        .withId(1))
                         .named(name)
                         .withId(fieldId);
             default:
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index b523c37548..7fec5aee9f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -49,8 +49,8 @@ public class VariantUtils {
             return (RowType) 
ParquetSchemaConverter.convertToPaimonField(fileType).type();
         } else {
             List<DataField> dataFields = new ArrayList<>();
-            dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
-            dataFields.add(new DataField(1, METADATA, DataTypes.BYTES()));
+            dataFields.add(new DataField(0, VALUE, 
DataTypes.BYTES().notNull()));
+            dataFields.add(new DataField(1, METADATA, 
DataTypes.BYTES().notNull()));
             return new RowType(dataFields);
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
index f07c70784a..52a1618c02 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
@@ -45,10 +45,14 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -545,9 +549,43 @@ public class InferVariantShreddingWriteTest {
                         fileIO, file, fileIO.getFileSize(file), new 
Options())) {
             MessageType schema = 
reader.getFooter().getFileMetaData().getSchema();
             for (int i = 0; i < expectShreddedTypes.length; i++) {
-                assertThat(VariantUtils.variantFileType(schema.getType(i)))
+                assertThat(
+                                VariantMetadataUtils.addVariantMetadata(
+                                        
VariantUtils.variantFileType(schema.getType(i))))
                         
.isEqualTo(variantShreddingSchema(expectShreddedTypes[i]));
             }
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testVariantTypeAnnotation(boolean inferShredding) throws 
Exception {
+        Options options = new Options();
+        options.set(
+                CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(), 
String.valueOf(inferShredding));
+        ParquetFileFormat format = createFormat(options);
+        RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", 
DataTypes.VARIANT()));
+
+        FormatWriterFactory factory = format.createWriterFactory(writeType);
+        writeRows(
+                factory,
+                GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")),
+                GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}")));
+
+        // Verify that the Parquet schema contains 
LogicalTypeAnnotation.variantType
+        try (ParquetFileReader reader =
+                ParquetUtil.getParquetReader(
+                        fileIO, file, fileIO.getFileSize(file), new 
Options())) {
+            MessageType schema = 
reader.getFooter().getFileMetaData().getSchema();
+            Type variantField = schema.getType(0);
+
+            // The variant field should be a group type
+            assertThat(variantField.isPrimitive()).isFalse();
+            LogicalTypeAnnotation logicalType = 
variantField.getLogicalTypeAnnotation();
+
+            // The variant type should have variant annotation
+            assertThat(logicalType)
+                    
.isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class);
+        }
+    }
 }

Reply via email to