This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e9ac92d077 [variant] Add shredding function (#5901)
e9ac92d077 is described below
commit e9ac92d077040293bbff4762dcd51c6c1f4578d3
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 15 17:39:09 2025 +0800
[variant] Add shredding function (#5901)
---
LICENSE | 3 +
.../apache/paimon/data/variant/GenericVariant.java | 8 +-
.../paimon/data/variant/GenericVariantBuilder.java | 7 +-
.../paimon/data/variant/PaimonShreddingUtils.java | 432 +++++++++++++++++++++
.../apache/paimon/data/variant/ShreddingUtils.java | 208 ++++++++++
.../apache/paimon/data/variant/VariantSchema.java | 205 ++++++++++
.../data/variant/VariantShreddingWriter.java | 327 ++++++++++++++++
.../paimon/data/variant/GenericVariantTest.java | 143 +++++++
8 files changed, 1326 insertions(+), 7 deletions(-)
diff --git a/LICENSE b/LICENSE
index c5e688b325..1fc316248a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -294,6 +294,9 @@
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/Vectorize
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaByteArrayEncodingTest.java
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaEncodingTest.java
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaLengthByteArrayEncodingTest.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
from https://spark.apache.org/ version 4.0.0-preview2
MIT License
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
index 17faf26097..ad00d2d798 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
@@ -123,7 +123,7 @@ public final class GenericVariant implements Variant {
return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata),
pos);
}
- public static Variant fromJson(String json) {
+ public static GenericVariant fromJson(String json) {
try {
return GenericVariantBuilder.parseJson(json, false);
} catch (IOException e) {
@@ -300,9 +300,9 @@ public final class GenericVariant implements Variant {
/** Variant object field. */
public static final class ObjectField {
public final String key;
- public final Variant value;
+ public final GenericVariant value;
- public ObjectField(String key, Variant value) {
+ public ObjectField(String key, GenericVariant value) {
this.key = key;
this.value = value;
}
@@ -322,7 +322,7 @@ public final class GenericVariant implements Variant {
int id = readUnsigned(value, idStart + idSize * index,
idSize);
int offset = readUnsigned(value, offsetStart + offsetSize
* index, offsetSize);
String key = getMetadataKey(metadata, id);
- Variant v = new GenericVariant(value, metadata, dataStart
+ offset);
+ GenericVariant v = new GenericVariant(value, metadata,
dataStart + offset);
return new ObjectField(key, v);
});
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
index 169a2f8c9b..9893652c71 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
@@ -95,7 +95,8 @@ public class GenericVariantBuilder {
*
* @throws IOException if any JSON parsing error happens.
*/
- public static Variant parseJson(String json, boolean allowDuplicateKeys)
throws IOException {
+ public static GenericVariant parseJson(String json, boolean
allowDuplicateKeys)
+ throws IOException {
try (JsonParser parser = new JsonFactory().createParser(json)) {
parser.nextToken();
return parseJson(parser, allowDuplicateKeys);
@@ -105,7 +106,7 @@ public class GenericVariantBuilder {
/**
* Similar {@link #parseJson(String, boolean)}, but takes a JSON parser
instead of string input.
*/
- public static Variant parseJson(JsonParser parser, boolean
allowDuplicateKeys)
+ public static GenericVariant parseJson(JsonParser parser, boolean
allowDuplicateKeys)
throws IOException {
GenericVariantBuilder builder = new
GenericVariantBuilder(allowDuplicateKeys);
builder.buildJson(parser);
@@ -113,7 +114,7 @@ public class GenericVariantBuilder {
}
// Build the variant metadata from `dictionaryKeys` and return the variant
result.
- public Variant result() {
+ public GenericVariant result() {
int numKeys = dictionaryKeys.size();
// Use long to avoid overflow in accumulating lengths.
long dictionaryStringSize = 0;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
new file mode 100644
index 0000000000..18f129ac87
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** Utils for paimon shredding. */
+public class PaimonShreddingUtils {
+
+ public static final String METADATA_FIELD_NAME = Variant.METADATA;
+ public static final String VARIANT_VALUE_FIELD_NAME = Variant.VALUE;
+ public static final String TYPED_VALUE_FIELD_NAME = "typed_value";
+
+ /** Paimon shredded row. */
+ static class PaimonShreddedRow implements ShreddingUtils.ShreddedRow {
+
+ private final DataGetters row;
+
+ public PaimonShreddedRow(DataGetters row) {
+ this.row = row;
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return row.getBoolean(ordinal);
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return row.getByte(ordinal);
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return row.getShort(ordinal);
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return row.getInt(ordinal);
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ return row.getLong(ordinal);
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return row.getFloat(ordinal);
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return row.getDouble(ordinal);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+ return row.getDecimal(ordinal, precision, scale).toBigDecimal();
+ }
+
+ @Override
+ public String getString(int ordinal) {
+ return row.getString(ordinal).toString();
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ return row.getBinary(ordinal);
+ }
+
+ @Override
+ public UUID getUuid(int ordinal) {
+ // Paimon currently does not shred UUID.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getStruct(int ordinal, int
numFields) {
+ return new PaimonShreddedRow(row.getRow(ordinal, numFields));
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+ return new PaimonShreddedRow(row.getArray(ordinal));
+ }
+
+ @Override
+ public int numElements() {
+ return ((InternalArray) row).size();
+ }
+ }
+
+ public static RowType variantShreddingSchema(RowType rowType) {
+ return variantShreddingSchema(rowType, true);
+ }
+
+ /**
+ * Given an expected schema of a Variant value, returns a suitable schema
for shredding, by
+ * inserting appropriate intermediate value/typed_value fields at each
level. For example, to
+ * represent the JSON {"a": 1, "b": "hello"}, the schema struct<a: int,
b: string> could
+ * be passed into this function, and it would return the shredding schema:
struct< metadata:
+ * binary, value: binary, typed_value: struct< a:
struct<typed_value: int, value:
+ * binary>, b: struct<typed_value: string, value: binary>>>
+ */
+ private static RowType variantShreddingSchema(DataType dataType, boolean
topLevel) {
+ RowType.Builder builder = RowType.builder();
+ if (topLevel) {
+ builder.field(METADATA_FIELD_NAME, DataTypes.BYTES());
+ }
+ switch (dataType.getTypeRoot()) {
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) dataType;
+ ArrayType shreddedArrayType =
+ new ArrayType(
+ arrayType.isNullable(),
+
variantShreddingSchema(arrayType.getElementType(), false));
+ builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+ builder.field(TYPED_VALUE_FIELD_NAME, shreddedArrayType);
+ break;
+ case ROW:
+ // The field name level is always non-nullable: Variant null
values are represented
+ // in the "value" column as "00", and missing values are
represented by setting both
+ // "value" and "typed_value" to null.
+ RowType rowType = (RowType) dataType;
+ RowType shreddedRowType =
+ rowType.copy(
+ rowType.getFields().stream()
+ .map(
+ field ->
+ field.newType(
+
variantShreddingSchema(
+
field.type(), false)
+
.notNull()))
+ .collect(Collectors.toList()));
+ builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+ builder.field(TYPED_VALUE_FIELD_NAME, shreddedRowType);
+ break;
+ case VARIANT:
+ builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+ break;
+ case CHAR:
+ case VARCHAR:
+ case BOOLEAN:
+ case BINARY:
+ case VARBINARY:
+ case DECIMAL:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+ builder.field(TYPED_VALUE_FIELD_NAME, dataType);
+ break;
+ default:
+ throw invalidVariantShreddingSchema(dataType);
+ }
+ return builder.build();
+ }
+
+ public static VariantSchema buildVariantSchema(RowType rowType) {
+ return buildVariantSchema(rowType, true);
+ }
+
+ private static VariantSchema buildVariantSchema(RowType rowType, boolean
topLevel) {
+ int typedIdx = -1;
+ int variantIdx = -1;
+ int topLevelMetadataIdx = -1;
+ VariantSchema.ScalarType scalarSchema = null;
+ VariantSchema.ObjectField[] objectSchema = null;
+ VariantSchema arraySchema = null;
+
+ // The struct must not be empty or contain duplicate field names. The
latter is enforced in
+ // the loop below (`if (typedIdx != -1)` and other similar checks).
+ if (rowType.getFields().isEmpty()) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+
+ List<DataField> fields = rowType.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ DataField field = fields.get(i);
+ DataType dataType = field.type();
+ switch (field.name()) {
+ case TYPED_VALUE_FIELD_NAME:
+ if (typedIdx != -1) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ typedIdx = i;
+ switch (field.type().getTypeRoot()) {
+ case ROW:
+ RowType r = (RowType) dataType;
+ List<DataField> rFields = r.getFields();
+ // The struct must not be empty or contain
duplicate field names.
+ if (fields.isEmpty()
+ || fields.stream().distinct().count() !=
fields.size()) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ objectSchema = new
VariantSchema.ObjectField[rFields.size()];
+ for (int index = 0; index < rFields.size();
index++) {
+ if (field.type() instanceof RowType) {
+ DataField f = rFields.get(index);
+ objectSchema[index] =
+ new VariantSchema.ObjectField(
+ f.name(),
+
buildVariantSchema((RowType) f.type(), false));
+ } else {
+ throw
invalidVariantShreddingSchema(rowType);
+ }
+ }
+ break;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) dataType;
+ if (arrayType.getElementType() instanceof RowType)
{
+ arraySchema =
+ buildVariantSchema(
+ (RowType)
arrayType.getElementType(), false);
+ } else {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ break;
+ case BOOLEAN:
+ scalarSchema = new VariantSchema.BooleanType();
+ break;
+ case TINYINT:
+ scalarSchema =
+ new
VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE);
+ break;
+ case SMALLINT:
+ scalarSchema =
+ new VariantSchema.IntegralType(
+ VariantSchema.IntegralSize.SHORT);
+ break;
+ case INTEGER:
+ scalarSchema =
+ new
VariantSchema.IntegralType(VariantSchema.IntegralSize.INT);
+ break;
+ case BIGINT:
+ scalarSchema =
+ new
VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG);
+ break;
+ case FLOAT:
+ scalarSchema = new VariantSchema.FloatType();
+ break;
+ case DOUBLE:
+ scalarSchema = new VariantSchema.DoubleType();
+ break;
+ case VARCHAR:
+ scalarSchema = new VariantSchema.StringType();
+ break;
+ case BINARY:
+ scalarSchema = new VariantSchema.BinaryType();
+ break;
+ case DATE:
+ scalarSchema = new VariantSchema.DateType();
+ break;
+ case DECIMAL:
+ DecimalType d = (DecimalType) dataType;
+ scalarSchema =
+ new
VariantSchema.DecimalType(d.getPrecision(), d.getScale());
+ break;
+ default:
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ break;
+
+ case VARIANT_VALUE_FIELD_NAME:
+ if (variantIdx != -1 || !(field.type() instanceof
VarBinaryType)) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ variantIdx = i;
+ break;
+
+ case METADATA_FIELD_NAME:
+ if (topLevelMetadataIdx != -1 || !(field.type() instanceof
VarBinaryType)) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+ topLevelMetadataIdx = i;
+ break;
+
+ default:
+ throw invalidVariantShreddingSchema(rowType);
+ }
+
+ if (topLevel && (topLevelMetadataIdx == -1)) {
+ topLevelMetadataIdx = i;
+ }
+ }
+
+ if (topLevel != (topLevelMetadataIdx >= 0)) {
+ throw invalidVariantShreddingSchema(rowType);
+ }
+
+ return new VariantSchema(
+ typedIdx,
+ variantIdx,
+ topLevelMetadataIdx,
+ fields.size(),
+ scalarSchema,
+ objectSchema,
+ arraySchema);
+ }
+
+ private static RuntimeException invalidVariantShreddingSchema(DataType
dataType) {
+ return new RuntimeException("Invalid variant shredding schema: " +
dataType);
+ }
+
+ /** Paimon shredded result. */
+ public static class PaimonShreddedResult implements
VariantShreddingWriter.ShreddedResult {
+
+ private final VariantSchema schema;
+ // Result is stored as an InternalRow.
+ private final GenericRow row;
+
+ public PaimonShreddedResult(VariantSchema schema) {
+ this.schema = schema;
+ this.row = new GenericRow(schema.numFields);
+ }
+
+ @Override
+ public void addArray(VariantShreddingWriter.ShreddedResult[] array) {
+ GenericArray arrayResult =
+ new GenericArray(
+ java.util.Arrays.stream(array)
+ .map(result -> ((PaimonShreddedResult)
result).row)
+ .toArray(InternalRow[]::new));
+ row.setField(schema.typedIdx, arrayResult);
+ }
+
+ @Override
+ public void addObject(VariantShreddingWriter.ShreddedResult[] values) {
+ GenericRow innerRow = new GenericRow(schema.objectSchema.length);
+ for (int i = 0; i < values.length; i++) {
+ innerRow.setField(i, ((PaimonShreddedResult) values[i]).row);
+ }
+ row.setField(schema.typedIdx, innerRow);
+ }
+
+ @Override
+ public void addVariantValue(byte[] result) {
+ row.setField(schema.variantIdx, result);
+ }
+
+ @Override
+ public void addScalar(Object result) {
+ Object paimonValue;
+ if (schema.scalarSchema instanceof VariantSchema.StringType) {
+ paimonValue = BinaryString.fromString((String) result);
+ } else if (schema.scalarSchema instanceof
VariantSchema.DecimalType) {
+ VariantSchema.DecimalType dt = (VariantSchema.DecimalType)
schema.scalarSchema;
+ paimonValue = Decimal.fromBigDecimal((BigDecimal) result,
dt.precision, dt.scale);
+ } else {
+ paimonValue = result;
+ }
+ row.setField(schema.typedIdx, paimonValue);
+ }
+
+ @Override
+ public void addMetadata(byte[] result) {
+ row.setField(schema.topLevelMetadataIdx, result);
+ }
+ }
+
+ /** Paimon shredded result builder. */
+ public static class PaimonShreddedResultBuilder
+ implements VariantShreddingWriter.ShreddedResultBuilder {
+ @Override
+ public VariantShreddingWriter.ShreddedResult createEmpty(VariantSchema
schema) {
+ return new PaimonShreddedResult(schema);
+ }
+
+ // Consider allowing this to be set via config?
+ @Override
+ public boolean allowNumericScaleChanges() {
+ return true;
+ }
+ }
+
+ /** Converts an input variant into shredded components. Returns the
shredded result. */
+ public static InternalRow castShredded(GenericVariant variant,
VariantSchema variantSchema) {
+ return ((PaimonShreddedResult)
+ VariantShreddingWriter.castShredded(
+ variant, variantSchema, new
PaimonShreddedResultBuilder()))
+ .row;
+ }
+
+ /** Rebuilds a variant from shredded components with the variant schema. */
+ public static Variant rebuild(InternalRow row, VariantSchema
variantSchema) {
+ return ShreddingUtils.rebuild(new PaimonShreddedRow(row),
variantSchema);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
new file mode 100644
index 0000000000..f87d5f6178
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import static
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
+/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Shredding Utils. */
+public class ShreddingUtils {
+
+ /**
+ * Interface to read from a shredded result. It essentially has the same
interface and semantics
+ * as Spark's `SpecializedGetters`, but we need a new interface to avoid
the dependency.
+ */
+ public interface ShreddedRow {
+
+ boolean isNullAt(int ordinal);
+
+ boolean getBoolean(int ordinal);
+
+ byte getByte(int ordinal);
+
+ short getShort(int ordinal);
+
+ int getInt(int ordinal);
+
+ long getLong(int ordinal);
+
+ float getFloat(int ordinal);
+
+ double getDouble(int ordinal);
+
+ BigDecimal getDecimal(int ordinal, int precision, int scale);
+
+ String getString(int ordinal);
+
+ byte[] getBinary(int ordinal);
+
+ UUID getUuid(int ordinal);
+
+ ShreddedRow getStruct(int ordinal, int numFields);
+
+ ShreddedRow getArray(int ordinal);
+
+ int numElements();
+ }
+
+ // This `rebuild` function should only be called on the top-level schema,
and that other private
+ // implementation will be called on any recursively shredded sub-schema.
+ public static Variant rebuild(ShreddedRow row, VariantSchema schema) {
+ if (schema.topLevelMetadataIdx < 0 ||
row.isNullAt(schema.topLevelMetadataIdx)) {
+ throw malformedVariant();
+ }
+ byte[] metadata = row.getBinary(schema.topLevelMetadataIdx);
+ if (schema.isUnshredded()) {
+ // `rebuild` is unnecessary for unshredded variant.
+ if (row.isNullAt(schema.variantIdx)) {
+ throw malformedVariant();
+ }
+ return new GenericVariant(row.getBinary(schema.variantIdx),
metadata);
+ }
+ GenericVariantBuilder builder = new GenericVariantBuilder(false);
+ rebuild(row, metadata, schema, builder);
+ return builder.result();
+ }
+
+ // Rebuild a variant value from the shredded data according to the
reconstruction algorithm in
+ //
https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ // Append the result to `builder`.
+ public static void rebuild(
+ ShreddedRow row, byte[] metadata, VariantSchema schema,
GenericVariantBuilder builder) {
+ int typedIdx = schema.typedIdx;
+ int variantIdx = schema.variantIdx;
+ if (typedIdx >= 0 && !row.isNullAt(typedIdx)) {
+ if (schema.scalarSchema != null) {
+ VariantSchema.ScalarType scalar = schema.scalarSchema;
+ if (scalar instanceof VariantSchema.StringType) {
+ builder.appendString(row.getString(typedIdx));
+ } else if (scalar instanceof VariantSchema.IntegralType) {
+ VariantSchema.IntegralType it =
(VariantSchema.IntegralType) scalar;
+ long value = 0;
+ switch (it.size) {
+ case BYTE:
+ value = row.getByte(typedIdx);
+ break;
+ case SHORT:
+ value = row.getShort(typedIdx);
+ break;
+ case INT:
+ value = row.getInt(typedIdx);
+ break;
+ case LONG:
+ value = row.getLong(typedIdx);
+ break;
+ }
+ builder.appendLong(value);
+ } else if (scalar instanceof VariantSchema.FloatType) {
+ builder.appendFloat(row.getFloat(typedIdx));
+ } else if (scalar instanceof VariantSchema.DoubleType) {
+ builder.appendDouble(row.getDouble(typedIdx));
+ } else if (scalar instanceof VariantSchema.BooleanType) {
+ builder.appendBoolean(row.getBoolean(typedIdx));
+ } else if (scalar instanceof VariantSchema.BinaryType) {
+ builder.appendBinary(row.getBinary(typedIdx));
+ } else if (scalar instanceof VariantSchema.UuidType) {
+ builder.appendUuid(row.getUuid(typedIdx));
+ } else if (scalar instanceof VariantSchema.DecimalType) {
+ VariantSchema.DecimalType dt = (VariantSchema.DecimalType)
scalar;
+ builder.appendDecimal(row.getDecimal(typedIdx,
dt.precision, dt.scale));
+ } else if (scalar instanceof VariantSchema.DateType) {
+ builder.appendDate(row.getInt(typedIdx));
+ } else if (scalar instanceof VariantSchema.TimestampType) {
+ builder.appendTimestamp(row.getLong(typedIdx));
+ } else {
+ assert scalar instanceof VariantSchema.TimestampNTZType;
+ builder.appendTimestampNtz(row.getLong(typedIdx));
+ }
+ } else if (schema.arraySchema != null) {
+ VariantSchema elementSchema = schema.arraySchema;
+ ShreddedRow array = row.getArray(typedIdx);
+ int start = builder.getWritePos();
+ ArrayList<Integer> offsets = new
ArrayList<>(array.numElements());
+ for (int i = 0; i < array.numElements(); i++) {
+ offsets.add(builder.getWritePos() - start);
+ rebuild(
+ array.getStruct(i, elementSchema.numFields),
+ metadata,
+ elementSchema,
+ builder);
+ }
+ builder.finishWritingArray(start, offsets);
+ } else {
+ ShreddedRow object = row.getStruct(typedIdx,
schema.objectSchema.length);
+ ArrayList<GenericVariantBuilder.FieldEntry> fields = new
ArrayList<>();
+ int start = builder.getWritePos();
+ for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length;
++fieldIdx) {
+ // Shredded field must not be null.
+ if (object.isNullAt(fieldIdx)) {
+ throw malformedVariant();
+ }
+ String fieldName = schema.objectSchema[fieldIdx].fieldName;
+ VariantSchema fieldSchema =
schema.objectSchema[fieldIdx].schema;
+ ShreddedRow fieldValue = object.getStruct(fieldIdx,
fieldSchema.numFields);
+ // If the field doesn't have non-null `typed_value` or
`value`, it is missing.
+ if ((fieldSchema.typedIdx >= 0 &&
!fieldValue.isNullAt(fieldSchema.typedIdx))
+ || (fieldSchema.variantIdx >= 0
+ &&
!fieldValue.isNullAt(fieldSchema.variantIdx))) {
+ int id = builder.addKey(fieldName);
+ fields.add(
+ new GenericVariantBuilder.FieldEntry(
+ fieldName, id, builder.getWritePos() -
start));
+ rebuild(fieldValue, metadata, fieldSchema, builder);
+ }
+ }
+ if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
+ // Add the leftover fields in the variant binary.
+ GenericVariant v = new
GenericVariant(row.getBinary(variantIdx), metadata);
+ if (v.getType() != GenericVariantUtil.Type.OBJECT) {
+ throw malformedVariant();
+ }
+ for (int i = 0; i < v.objectSize(); ++i) {
+ GenericVariant.ObjectField field =
v.getFieldAtIndex(i);
+ // `value` must not contain any shredded field.
+ if (schema.objectSchemaMap.containsKey(field.key)) {
+ throw malformedVariant();
+ }
+ int id = builder.addKey(field.key);
+ fields.add(
+ new GenericVariantBuilder.FieldEntry(
+ field.key, id, builder.getWritePos() -
start));
+ builder.appendVariant(field.value);
+ }
+ }
+ builder.finishWritingObject(start, fields);
+ }
+ } else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
+ // `typed_value` doesn't exist or is null. Read from `value`.
+ builder.appendVariant(new
GenericVariant(row.getBinary(variantIdx), metadata));
+ } else {
+ // This means the variant is missing in a context where it must
present, so the input
+ // data is invalid.
+ throw malformedVariant();
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
new file mode 100644
index 0000000000..9fb7f25ba2
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. A
shredding schema
+ * contains a value and optional typed_value field. If a typed_value is an
array or struct, it
+ * recursively contains its own shredding schema for elements and fields,
respectively. The schema
+ * also contains a metadata field at the top level, but not in recursively
shredded fields.
+ */
+public class VariantSchema {
+
+ /** Represents one field of an object in the shredding schema. */
+ public static final class ObjectField {
+ public final String fieldName;
+ public final VariantSchema schema;
+
+ public ObjectField(String fieldName, VariantSchema schema) {
+ this.fieldName = fieldName;
+ this.schema = schema;
+ }
+
+ @Override
+ public String toString() {
+ return "ObjectField{" + "fieldName=" + fieldName + ", schema=" +
schema + '}';
+ }
+ }
+
+ /** ScalarType. */
+ public abstract static class ScalarType {}
+
+ /** StringType. */
+ public static final class StringType extends ScalarType {}
+
+ /** IntegralSize. */
+ public enum IntegralSize {
+ BYTE,
+ SHORT,
+ INT,
+ LONG
+ }
+
+ /** IntegralType. */
+ public static final class IntegralType extends ScalarType {
+ public final IntegralSize size;
+
+ public IntegralType(IntegralSize size) {
+ this.size = size;
+ }
+ }
+
+ /** FloatType. */
+ public static final class FloatType extends ScalarType {}
+
+ /** DoubleType. */
+ public static final class DoubleType extends ScalarType {}
+
+ /** BooleanType. */
+ public static final class BooleanType extends ScalarType {}
+
+ /** BinaryType. */
+ public static final class BinaryType extends ScalarType {}
+
+ /** DecimalType. */
+ public static final class DecimalType extends ScalarType {
+ public final int precision;
+ public final int scale;
+
+ public DecimalType(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+ }
+
+ /** DateType. */
+ public static final class DateType extends ScalarType {}
+
+ /** TimestampType. */
+ public static final class TimestampType extends ScalarType {}
+
+ /** TimestampNTZType. */
+ public static final class TimestampNTZType extends ScalarType {}
+
+ /** UuidType. */
+ public static final class UuidType extends ScalarType {}
+
+ // The index of the typed_value, value, and metadata fields in the schema,
respectively. If a
+ // given field is not in the schema, its value must be set to -1 to
indicate that it is invalid.
+ // The indices of valid fields should be contiguous and start from 0.
+ public int typedIdx;
+ public int variantIdx;
+ // topLevelMetadataIdx must be non-negative in the top-level schema, and
-1 at all other nesting
+ // levels.
+ public final int topLevelMetadataIdx;
+ // The number of fields in the schema. I.e. a value between 1 and 3,
depending on which of
+ // value,
+ // typed_value and metadata are present.
+ public final int numFields;
+
+ public final ScalarType scalarSchema;
+ public final ObjectField[] objectSchema;
+ // Map for fast lookup of object fields by name. The values are an index
into `objectSchema`.
+ public final Map<String, Integer> objectSchemaMap;
+ public final VariantSchema arraySchema;
+
+ public VariantSchema(
+ int typedIdx,
+ int variantIdx,
+ int topLevelMetadataIdx,
+ int numFields,
+ ScalarType scalarSchema,
+ ObjectField[] objectSchema,
+ VariantSchema arraySchema) {
+ this.typedIdx = typedIdx;
+ this.numFields = numFields;
+ this.variantIdx = variantIdx;
+ this.topLevelMetadataIdx = topLevelMetadataIdx;
+ this.scalarSchema = scalarSchema;
+ this.objectSchema = objectSchema;
+ if (objectSchema != null) {
+ objectSchemaMap = new HashMap<>();
+ for (int i = 0; i < objectSchema.length; i++) {
+ objectSchemaMap.put(objectSchema[i].fieldName, i);
+ }
+ } else {
+ objectSchemaMap = null;
+ }
+
+ this.arraySchema = arraySchema;
+ }
+
+ public void setTypedIdx(int typedIdx) {
+ this.typedIdx = typedIdx;
+ this.variantIdx = -1;
+ setMetadata();
+ }
+
+ private byte[] metadata;
+
+ public byte[] getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata() {
+ try {
+ String s = new ObjectMapper().writeValueAsString(objectSchemaMap);
+ metadata = GenericVariant.fromJson(s).metadata();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Return whether the variant column is unshrededed. The user is not
required to do anything
+ // special, but can have certain optimizations for unshrededed variant.
+ public boolean isUnshredded() {
+ return topLevelMetadataIdx >= 0 && variantIdx >= 0 && typedIdx < 0;
+ }
+
+ @Override
+ public String toString() {
+ return "VariantSchema{"
+ + "typedIdx="
+ + typedIdx
+ + ", variantIdx="
+ + variantIdx
+ + ", topLevelMetadataIdx="
+ + topLevelMetadataIdx
+ + ", numFields="
+ + numFields
+ + ", scalarSchema="
+ + scalarSchema
+ + ", objectSchema="
+ + objectSchema
+ + ", arraySchema="
+ + arraySchema
+ + '}';
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
new file mode 100644
index 0000000000..2ca87b11e6
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+
+/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Class to implement shredding a Variant value. */
+public class VariantShreddingWriter {
+
+ /**
+ * Interface to build up a shredded result. Callers should implement a
ShreddedResultBuilder to
+ * create an empty result with a given schema. The castShredded method
will call one or more of
+ * the add* methods to populate it.
+ */
+ public interface ShreddedResult {
+
+ // Create an array. The elements are the result of shredding each
element.
+ void addArray(ShreddedResult[] array);
+
+ // Create an object. The values are the result of shredding each
field, order by the index
+ // in objectSchema. Missing fields are populated with an empty result.
+ void addObject(ShreddedResult[] values);
+
+ void addVariantValue(byte[] result);
+
+ // Add a scalar to typed_value. The type of Object depends on the
scalarSchema in the
+ // shredding schema.
+ void addScalar(Object result);
+
+ void addMetadata(byte[] result);
+ }
+
+ /** Shredded result builder. */
+ public interface ShreddedResultBuilder {
+ ShreddedResult createEmpty(VariantSchema schema);
+
+ // If true, we will shred decimals to a different scale or to
integers, as long as they are
+ // numerically equivalent. Similarly, integers will be allowed to
shred to decimals.
+ boolean allowNumericScaleChanges();
+ }
+
+ /**
+ * Converts an input variant into shredded components. Returns the
shredded result, as well as
+ * the original Variant with shredded fields removed. `dataType` must be a
valid shredding
+ * schema, as described in
+ *
https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ */
+ public static ShreddedResult castShredded(
+ GenericVariant v, VariantSchema schema, ShreddedResultBuilder
builder) {
+ GenericVariantUtil.Type variantType = v.getType();
+ ShreddedResult result = builder.createEmpty(schema);
+
+ if (schema.topLevelMetadataIdx >= 0) {
+ result.addMetadata(v.metadata());
+ }
+
+ if (schema.arraySchema != null && variantType ==
GenericVariantUtil.Type.ARRAY) {
+ // The array element is always a struct containing untyped and
typed fields.
+ VariantSchema elementSchema = schema.arraySchema;
+ int size = v.arraySize();
+ ShreddedResult[] array = new ShreddedResult[size];
+ for (int i = 0; i < size; ++i) {
+ ShreddedResult shreddedArray =
+ castShredded(v.getElementAtIndex(i), elementSchema,
builder);
+ array[i] = shreddedArray;
+ }
+ result.addArray(array);
+ } else if (schema.objectSchema != null && variantType ==
GenericVariantUtil.Type.OBJECT) {
+ VariantSchema.ObjectField[] objectSchema = schema.objectSchema;
+ ShreddedResult[] shreddedValues = new
ShreddedResult[objectSchema.length];
+
+ // Create a variantBuilder for any field that exist in `v`, but
not in the
+ // shredding schema.
+ GenericVariantBuilder variantBuilder = new
GenericVariantBuilder(false);
+ ArrayList<GenericVariantBuilder.FieldEntry> fieldEntries = new
ArrayList<>();
+ // Keep track of which schema fields we actually found in the
Variant value.
+ int numFieldsMatched = 0;
+ int start = variantBuilder.getWritePos();
+ for (int i = 0; i < v.objectSize(); ++i) {
+ GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+ Integer fieldIdx = schema.objectSchemaMap.get(field.key);
+ if (fieldIdx != null) {
+ // The field exists in the shredding schema. Recursively
shred, and write the
+ // result.
+ ShreddedResult shreddedField =
+ castShredded(field.value,
objectSchema[fieldIdx].schema, builder);
+ shreddedValues[fieldIdx] = shreddedField;
+ numFieldsMatched++;
+ } else {
+ // The field is not shredded. Put it in the untyped_value
column.
+ int id = v.getDictionaryIdAtIndex(i);
+ fieldEntries.add(
+ new GenericVariantBuilder.FieldEntry(
+ field.key, id,
variantBuilder.getWritePos() - start));
+ // shallowAppendVariant is needed for correctness, since
we're relying on the
+ // metadata IDs being unchanged.
+ variantBuilder.shallowAppendVariant(field.value);
+ }
+ }
+ if (numFieldsMatched < objectSchema.length) {
+ // Set missing fields to non-null with all fields set to null.
+ for (int i = 0; i < objectSchema.length; ++i) {
+ if (shreddedValues[i] == null) {
+ VariantSchema.ObjectField fieldSchema =
objectSchema[i];
+ ShreddedResult emptyChild =
builder.createEmpty(fieldSchema.schema);
+ shreddedValues[i] = emptyChild;
+ numFieldsMatched += 1;
+ }
+ }
+ }
+ if (numFieldsMatched != objectSchema.length) {
+ // Since we just filled in all the null entries, this can only
happen if we tried to
+ // write to the same field twice; i.e. the Variant contained
duplicate fields, which
+ // is invalid.
+ throw new RuntimeException();
+ }
+ result.addObject(shreddedValues);
+ if (variantBuilder.getWritePos() != start) {
+ // We added something to the untyped value.
+ variantBuilder.finishWritingObject(start, fieldEntries);
+ result.addVariantValue(variantBuilder.valueWithoutMetadata());
+ }
+ } else if (schema.scalarSchema != null) {
+ VariantSchema.ScalarType scalarType = schema.scalarSchema;
+ Object typedValue = tryTypedShred(v, variantType, scalarType,
builder);
+ if (typedValue != null) {
+ // Store the typed value.
+ result.addScalar(typedValue);
+ } else {
+ GenericVariantBuilder variantBuilder = new
GenericVariantBuilder(false);
+ variantBuilder.appendVariant(v);
+ result.addVariantValue(v.value());
+ }
+ } else {
+ // Store in untyped.
+ result.addVariantValue(v.value());
+ }
+ return result;
+ }
+
+ /**
+ * Tries to cast a Variant into a typed value. If the cast fails, returns
null.
+ *
+ * @param v
+ * @param variantType The Variant Type of v
+ * @param targetType The target type
+ * @return The scalar value, or null if the cast is not valid.
+ */
+ private static Object tryTypedShred(
+ GenericVariant v,
+ GenericVariantUtil.Type variantType,
+ VariantSchema.ScalarType targetType,
+ ShreddedResultBuilder builder) {
+ switch (variantType) {
+ case LONG:
+ if (targetType instanceof VariantSchema.IntegralType) {
+ // Check that the target type can hold the actual value.
+ VariantSchema.IntegralType integralType =
+ (VariantSchema.IntegralType) targetType;
+ VariantSchema.IntegralSize size = integralType.size;
+ long value = v.getLong();
+ switch (size) {
+ case BYTE:
+ if (value == (byte) value) {
+ return (byte) value;
+ }
+ break;
+ case SHORT:
+ if (value == (short) value) {
+ return (short) value;
+ }
+ break;
+ case INT:
+ if (value == (int) value) {
+ return (int) value;
+ }
+ break;
+ case LONG:
+ return value;
+ }
+ } else if (targetType instanceof VariantSchema.DecimalType
+ && builder.allowNumericScaleChanges()) {
+ VariantSchema.DecimalType decimalType =
(VariantSchema.DecimalType) targetType;
+ // If the integer can fit in the given decimal precision,
allow it.
+ long value = v.getLong();
+ // Set to the requested scale, and check if the precision
is large enough.
+ BigDecimal decimalValue = BigDecimal.valueOf(value);
+ BigDecimal scaledValue =
decimalValue.setScale(decimalType.scale);
+ // The initial value should have scale 0, so rescaling
shouldn't lose
+ // information.
+ assert (decimalValue.compareTo(scaledValue) == 0);
+ if (scaledValue.precision() <= decimalType.precision) {
+ return scaledValue;
+ }
+ }
+ break;
+ case DECIMAL:
+ if (targetType instanceof VariantSchema.DecimalType) {
+ VariantSchema.DecimalType decimalType =
(VariantSchema.DecimalType) targetType;
+ // Use getDecimalWithOriginalScale so that we retain scale
information if
+ // allowNumericScaleChanges() is false.
+ BigDecimal value =
+
GenericVariantUtil.getDecimalWithOriginalScale(v.rawValue(), v.pos());
+ if (value.precision() <= decimalType.precision
+ && value.scale() == decimalType.scale) {
+ return value;
+ }
+ if (builder.allowNumericScaleChanges()) {
+ // Convert to the target scale, and see if it fits.
Rounding mode doesn't
+ // matter, since we'll reject it if it turned out to
require rounding.
+ BigDecimal scaledValue =
+ value.setScale(decimalType.scale,
RoundingMode.FLOOR);
+ if (scaledValue.compareTo(value) == 0
+ && scaledValue.precision() <=
decimalType.precision) {
+ return scaledValue;
+ }
+ }
+ } else if (targetType instanceof VariantSchema.IntegralType
+ && builder.allowNumericScaleChanges()) {
+ VariantSchema.IntegralType integralType =
+ (VariantSchema.IntegralType) targetType;
+ // Check if the decimal happens to be an integer.
+ BigDecimal value = v.getDecimal();
+ VariantSchema.IntegralSize size = integralType.size;
+ // Try to cast to the appropriate type, and check if any
information is lost.
+ switch (size) {
+ case BYTE:
+ if
(value.compareTo(BigDecimal.valueOf(value.byteValue())) == 0) {
+ return value.byteValue();
+ }
+ break;
+ case SHORT:
+ if
(value.compareTo(BigDecimal.valueOf(value.shortValue())) == 0) {
+ return value.shortValue();
+ }
+ break;
+ case INT:
+ if
(value.compareTo(BigDecimal.valueOf(value.intValue())) == 0) {
+ return value.intValue();
+ }
+ break;
+ case LONG:
+ if
(value.compareTo(BigDecimal.valueOf(value.longValue())) == 0) {
+ return value.longValue();
+ }
+ }
+ }
+ break;
+ case BOOLEAN:
+ if (targetType instanceof VariantSchema.BooleanType) {
+ return v.getBoolean();
+ }
+ break;
+ case STRING:
+ if (targetType instanceof VariantSchema.StringType) {
+ return v.getString();
+ }
+ break;
+ case DOUBLE:
+ if (targetType instanceof VariantSchema.DoubleType) {
+ return v.getDouble();
+ }
+ break;
+ case DATE:
+ if (targetType instanceof VariantSchema.DateType) {
+ return (int) v.getLong();
+ }
+ break;
+ case TIMESTAMP:
+ if (targetType instanceof VariantSchema.TimestampType) {
+ return v.getLong();
+ }
+ break;
+ case TIMESTAMP_NTZ:
+ if (targetType instanceof VariantSchema.TimestampNTZType) {
+ return v.getLong();
+ }
+ break;
+ case FLOAT:
+ if (targetType instanceof VariantSchema.FloatType) {
+ return v.getFloat();
+ }
+ break;
+ case BINARY:
+ if (targetType instanceof VariantSchema.BinaryType) {
+ return v.getBinary();
+ }
+ break;
+ case UUID:
+ if (targetType instanceof VariantSchema.UuidType) {
+ return v.getUuid();
+ }
+ break;
+ }
+ // The stored type does not match the requested shredding type. Return
null, and the caller
+ // will store the result in untyped_value.
+ return null;
+ }
+
+ // Add the result to the shredding result.
+ private static void addVariantValueVariant(
+ Variant variantResult, VariantSchema schema, ShreddedResult
result) {
+ result.addVariantValue(variantResult.value());
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
index dbe9bdf3c1..c226b9dead 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
@@ -18,6 +18,15 @@
package org.apache.paimon.data.variant;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
@@ -95,4 +104,138 @@ public class GenericVariantTest {
assertThat(variant.variantGet("$.boolean2")).isEqualTo(false);
assertThat(variant.variantGet("$.nullField")).isNull();
}
+
+ @Test
+ public void testShredding() {
+ GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\":
\"hello\"}");
+
+ // Happy path
+ RowType shreddedType1 =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"a", "b"});
+ GenericRow expert1 =
+ GenericRow.of(
+ variant.metadata(),
+ null,
+ GenericRow.of(
+ GenericRow.of(null, 1),
+ GenericRow.of(null,
BinaryString.fromString("hello"))));
+ testShreddingResult(variant, shreddedType1, expert1);
+
+ // Missing field
+ RowType shreddedType2 =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"a", "c", "b"});
+ GenericRow expert2 =
+ GenericRow.of(
+ variant.metadata(),
+ null,
+ GenericRow.of(
+ GenericRow.of(null, 1),
+ GenericRow.of(null, null),
+ GenericRow.of(null,
BinaryString.fromString("hello"))));
+ testShreddingResult(variant, shreddedType2, expert2);
+
+ // "a" is not present in shredding schema
+ RowType shreddedType3 =
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"b", "c"});
+ GenericRow expert3 =
+ GenericRow.of(
+ variant.metadata(),
+ untypedValue("{\"a\": 1}"),
+ GenericRow.of(
+ GenericRow.of(null,
BinaryString.fromString("hello")),
+ GenericRow.of(null, null)));
+ testShreddingResult(variant, shreddedType3, expert3);
+ }
+
+ @Test
+ public void testShreddingAllTypes() {
+ String json =
+ "{\n"
+ + " \"c1\": \"Hello, World!\",\n"
+ + " \"c2\": 12345678901234,\n"
+ + " \"c3\":
1.0123456789012345678901234567890123456789,\n"
+ + " \"c4\": 100.99,\n"
+ + " \"c5\": true,\n"
+ + " \"c6\": null,\n"
+ + " \"c7\": {\"street\" : \"Main St\",\"city\" :
\"Hangzhou\"},\n"
+ + " \"c8\": [1, 2]\n"
+ + "}\n";
+ GenericVariant variant = GenericVariant.fromJson(json);
+ RowType shreddedType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.BIGINT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(5, 2),
+ DataTypes.BOOLEAN(),
+ DataTypes.STRING(),
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"street", "city"}),
+ DataTypes.ARRAY(DataTypes.INT())
+ },
+ new String[] {"c1", "c2", "c3", "c4", "c5", "c6",
"c7", "c8"});
+ GenericRow expert1 =
+ GenericRow.of(
+ variant.metadata(),
+ null,
+ GenericRow.of(
+ GenericRow.of(null,
BinaryString.fromString("Hello, World!")),
+ GenericRow.of(null, 12345678901234L),
+ GenericRow.of(null,
1.0123456789012345678901234567890123456789D),
+ GenericRow.of(
+ null,
+ Decimal.fromBigDecimal(new
BigDecimal("100.99"), 5, 2)),
+ GenericRow.of(null, true),
+ GenericRow.of(new byte[] {0}, null),
+ GenericRow.of(
+ null,
+ GenericRow.of(
+ GenericRow.of(
+ null,
BinaryString.fromString("Main St")),
+ GenericRow.of(
+ null,
+
BinaryString.fromString("Hangzhou")))),
+ GenericRow.of(
+ null,
+ new GenericArray(
+ new GenericRow[] {
+ GenericRow.of(null, 1),
GenericRow.of(null, 2)
+ }))));
+ testShreddingResult(variant, shreddedType1, expert1);
+
+ // test no shredding
+ RowType shreddedType2 =
+ RowType.of(new DataType[] {DataTypes.STRING()}, new String[]
{"other"});
+ GenericRow expert2 =
+ GenericRow.of(
+ variant.metadata(),
+ untypedValue(json),
+ GenericRow.of(GenericRow.of(null, null)));
+ testShreddingResult(variant, shreddedType2, expert2);
+ }
+
+ private byte[] untypedValue(String input) {
+ return GenericVariant.fromJson(input).value();
+ }
+
+ private void testShreddingResult(
+ GenericVariant variant, RowType shreddedType, InternalRow
expected) {
+ RowType shreddingSchema =
PaimonShreddingUtils.variantShreddingSchema(shreddedType);
+ VariantSchema variantSchema =
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+ // test cast shredded
+ InternalRow shredded = PaimonShreddingUtils.castShredded(variant,
variantSchema);
+ assertThat(shredded).isEqualTo(expected);
+
+ // test rebuild
+ Variant rebuild = PaimonShreddingUtils.rebuild(shredded,
variantSchema);
+ assertThat(variant.toJson()).isEqualTo(rebuild.toJson());
+ }
}