This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 623d30d860 [variant] Annotate Variant columns with Variant logical
type annotation (#7110)
623d30d860 is described below
commit 623d30d860d31aecc894d083dcc77b93caad2ba4
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 23 04:38:12 2026 +0800
[variant] Annotate Variant columns with Variant logical type annotation
(#7110)
---
docs/content/spark/quick-start.md | 5 +++
.../paimon/data/variant/PaimonShreddingUtils.java | 3 +-
.../org/apache/paimon/data/variant/Variant.java | 2 ++
.../paimon/data/variant/VariantMetadataUtils.java | 9 +++++
paimon-format/pom.xml | 1 -
.../format/parquet/ParquetSchemaConverter.java | 17 +++++++--
.../apache/paimon/format/parquet/VariantUtils.java | 4 +--
.../writer/InferVariantShreddingWriteTest.java | 40 +++++++++++++++++++++-
8 files changed, 73 insertions(+), 8 deletions(-)
diff --git a/docs/content/spark/quick-start.md
b/docs/content/spark/quick-start.md
index 7a4341a4a0..79268feab1 100644
--- a/docs/content/spark/quick-start.md
+++ b/docs/content/spark/quick-start.md
@@ -358,6 +358,11 @@ All Spark's data types are available in package
`org.apache.spark.sql.types`.
<td><code>VarBinaryType</code>, <code>BinaryType</code></td>
<td>true</td>
</tr>
+ <tr>
+ <td><code>VariantType(Spark4.0+)</code></td>
+ <td><code>VariantType</code></td>
+ <td>true</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index dd4b48a17e..cd7884bc64 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -211,7 +211,8 @@ public class PaimonShreddingUtils {
}
public static RowType variantShreddingSchema(DataType dataType) {
- return variantShreddingSchema(dataType, true, false);
+ return VariantMetadataUtils.addVariantMetadata(
+ variantShreddingSchema(dataType, true, false));
}
/**
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
index 5bc19972f9..ad44bddeb6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
@@ -37,6 +37,8 @@ import java.time.ZoneOffset;
*/
public interface Variant {
+ byte VARIANT_SPEC_VERSION = (byte) 1;
+
String METADATA = "metadata";
String VALUE = "value";
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
index 735ea08af3..a0a6208035 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
@@ -67,6 +67,15 @@ public class VariantMetadataUtils {
return true;
}
+ /** Add metadata to the top-level fields to mark it s a shredding schema
for writers. */
+ public static RowType addVariantMetadata(RowType rowType) {
+ List<DataField> fields = new ArrayList<>();
+ for (DataField f : rowType.getFields()) {
+ fields.add((f.newDescription(METADATA_KEY)));
+ }
+ return rowType.copy(fields);
+ }
+
/** Extract the path from variant metadata description. */
public static String path(String description) {
return splitDescription(description)[0];
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index 4f7f11965a..75a49da924 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -351,7 +351,6 @@ under the License.
<include>org.apache.parquet:parquet-format-structures</include>
<include>org.apache.parquet:parquet-jackson</include>
<include>commons-pool:commons-pool</include>
-
<include>commons-pool:commons-pool</include>
<include>org.locationtech.jts:jts-core</include>
<!-- compress -->
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 19d1beb678..bfb6297fae 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantMetadataUtils;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
@@ -213,20 +214,30 @@ public class ParquetSchemaConverter {
.withId(fieldId);
case ROW:
RowType rowType = (RowType) type;
- return new GroupType(repetition, name,
convertToParquetTypes(rowType))
+ Types.GroupBuilder<GroupType> groupTypeBuilder =
Types.buildGroup(repetition);
+ if (VariantMetadataUtils.isVariantRowType(rowType)) {
+ groupTypeBuilder.as(
+
LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
+ }
+ return groupTypeBuilder
+ .addFields(convertToParquetTypes(rowType))
+ .named(name)
.withId(fieldId);
case VARIANT:
return Types.buildGroup(repetition)
+
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
- .named(Variant.VALUE))
+ .named(Variant.VALUE)
+ .withId(0))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
- .named(Variant.METADATA))
+ .named(Variant.METADATA)
+ .withId(1))
.named(name)
.withId(fieldId);
default:
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index b523c37548..7fec5aee9f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -49,8 +49,8 @@ public class VariantUtils {
return (RowType)
ParquetSchemaConverter.convertToPaimonField(fileType).type();
} else {
List<DataField> dataFields = new ArrayList<>();
- dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
- dataFields.add(new DataField(1, METADATA, DataTypes.BYTES()));
+ dataFields.add(new DataField(0, VALUE,
DataTypes.BYTES().notNull()));
+ dataFields.add(new DataField(1, METADATA,
DataTypes.BYTES().notNull()));
return new RowType(dataFields);
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
index f07c70784a..52a1618c02 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
@@ -45,10 +45,14 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -545,9 +549,43 @@ public class InferVariantShreddingWriteTest {
fileIO, file, fileIO.getFileSize(file), new
Options())) {
MessageType schema =
reader.getFooter().getFileMetaData().getSchema();
for (int i = 0; i < expectShreddedTypes.length; i++) {
- assertThat(VariantUtils.variantFileType(schema.getType(i)))
+ assertThat(
+ VariantMetadataUtils.addVariantMetadata(
+
VariantUtils.variantFileType(schema.getType(i))))
.isEqualTo(variantShreddingSchema(expectShreddedTypes[i]));
}
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testVariantTypeAnnotation(boolean inferShredding) throws
Exception {
+ Options options = new Options();
+ options.set(
+ CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(),
String.valueOf(inferShredding));
+ ParquetFileFormat format = createFormat(options);
+ RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v",
DataTypes.VARIANT()));
+
+ FormatWriterFactory factory = format.createWriterFactory(writeType);
+ writeRows(
+ factory,
+ GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")),
+ GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}")));
+
+ // Verify that the Parquet schema contains
LogicalTypeAnnotation.variantType
+ try (ParquetFileReader reader =
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new
Options())) {
+ MessageType schema =
reader.getFooter().getFileMetaData().getSchema();
+ Type variantField = schema.getType(0);
+
+ // The variant field should be a group type
+ assertThat(variantField.isPrimitive()).isFalse();
+ LogicalTypeAnnotation logicalType =
variantField.getLogicalTypeAnnotation();
+
+ // The variant type should have variant annotation
+ assertThat(logicalType)
+
.isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class);
+ }
+ }
}