This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5949538651 [variant] Introduce assemble shredded variant struct
function (#6652)
5949538651 is described below
commit 5949538651104cd071858da7139b157f4078131b
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Nov 24 16:08:59 2025 +0800
[variant] Introduce assemble shredded variant struct function (#6652)
---
LICENSE | 1 +
.../paimon/data/variant/BaseVariantReader.java | 519 +++++++++++++++++++++
.../apache/paimon/data/variant/GenericVariant.java | 46 +-
.../paimon/data/variant/PaimonShreddingUtils.java | 312 ++++++++++++-
.../org/apache/paimon/data/variant/Variant.java | 16 +-
.../paimon/data/variant/VariantCastArgs.java | 46 ++
.../org/apache/paimon/data/variant/VariantGet.java | 184 ++++++++
.../{PathSegment.java => VariantPathSegment.java} | 73 +--
.../apache/paimon/data/variant/VariantSchema.java | 8 +
.../data/variant/VariantShreddingWriter.java | 2 -
.../paimon/data/variant/GenericVariantTest.java | 71 ++-
.../data/variant/PaimonShreddingUtilsTest.java | 317 +++++++++++++
12 files changed, 1492 insertions(+), 103 deletions(-)
diff --git a/LICENSE b/LICENSE
index 7c5f46ffe4..16447645b8 100644
--- a/LICENSE
+++ b/LICENSE
@@ -295,6 +295,7 @@
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaLen
paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
from https://spark.apache.org/ version 4.0.0-preview2
MIT License
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
new file mode 100644
index 0000000000..7b8872973f
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VariantType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type;
+import static
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
+/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * The base class to read variant values into a Paimon type. For convenience,
we also allow creating
+ * an instance of the base class itself. None of its functions can be used,
but it can serve as a
+ * container of `targetType` and `castArgs`.
+ */
+public class BaseVariantReader {
+
+ protected final VariantSchema schema;
+ protected final DataType targetType;
+ protected final VariantCastArgs castArgs;
+
+ public BaseVariantReader(VariantSchema schema, DataType targetType,
VariantCastArgs castArgs) {
+ this.schema = schema;
+ this.targetType = targetType;
+ this.castArgs = castArgs;
+ }
+
+ public VariantSchema schema() {
+ return schema;
+ }
+
+ public DataType targetType() {
+ return targetType;
+ }
+
+ public VariantCastArgs castArgs() {
+ return castArgs;
+ }
+
+ /**
+ * Read from a row containing a variant value (shredded or unshredded) and
return a value of
+ * `targetType`. The row schema is described by `schema`. This function
throws MALFORMED_VARIANT
+ * if the variant is missing. If the variant can be legally missing (the
only possible situation
+ * is struct fields in object `typed_value`), the caller should check for
it and avoid calling
+ * this function if the variant is missing.
+ */
+ public Object read(InternalRow row, byte[] topLevelMetadata) {
+ if (schema.typedIdx < 0 || row.isNullAt(schema.typedIdx)) {
+ if (schema.variantIdx < 0 || row.isNullAt(schema.variantIdx)) {
+ // Both `typed_value` and `value` are null, meaning the
variant is missing.
+ throw malformedVariant();
+ }
+ GenericVariant variant =
+ new GenericVariant(row.getBinary(schema.variantIdx),
topLevelMetadata);
+ return VariantGet.cast(variant, targetType, castArgs);
+ } else {
+ return readFromTyped(row, topLevelMetadata);
+ }
+ }
+
+ /** Subclasses should override it to produce the read result when
`typed_value` is not null. */
+ protected Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** A util function to rebuild the variant in binary format from a variant
value. */
+ protected Variant rebuildVariant(InternalRow row, byte[] topLevelMetadata)
{
+ GenericVariantBuilder builder = new GenericVariantBuilder(false);
+ ShreddingUtils.rebuild(
+ new PaimonShreddingUtils.PaimonShreddedRow(row),
topLevelMetadata, schema, builder);
+ return builder.result();
+ }
+
+ /** A util function to throw error or return null when an invalid cast
happens. */
+ protected Object invalidCast(InternalRow row, byte[] topLevelMetadata) {
+ return VariantGet.invalidCast(rebuildVariant(row, topLevelMetadata),
targetType, castArgs);
+ }
+
+ /**
+ * Create a reader for `targetType`. If `schema` is null, meaning that the
extraction path
+ * doesn't exist in `typed_value`, it returns an instance of
`BaseVariantReader`. As described
+ * in the class comment, the reader is only a container of `targetType`
and `castArgs` in this
+ * case.
+ */
+ public static BaseVariantReader create(
+ @Nullable VariantSchema schema,
+ DataType targetType,
+ VariantCastArgs castArgs,
+ boolean isTopLevelUnshredded) {
+ if (schema == null) {
+ return new BaseVariantReader(null, targetType, castArgs);
+ }
+
+ if (targetType instanceof RowType) {
+ return new RowReader(schema, (RowType) targetType, castArgs);
+ } else if (targetType instanceof ArrayType) {
+ return new ArrayReader(schema, (ArrayType) targetType, castArgs);
+ } else if (targetType instanceof MapType
+ && ((MapType)
targetType).getKeyType().equals(DataTypes.STRING())) {
+ if (((MapType)
targetType).getKeyType().equals(DataTypes.STRING())) {
+ return new MapReader(schema, (MapType) targetType, castArgs);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ } else if (targetType instanceof VariantType) {
+ return new VariantReader(
+ schema, (VariantType) targetType, castArgs,
isTopLevelUnshredded);
+ } else {
+ return new ScalarReader(schema, targetType, castArgs);
+ }
+ }
+
+ /**
+ * Read variant values into a Paimon row type. It reads unshredded fields
(fields that are not
+ * in the typed object) from the `value`, and reads the shredded fields
from the object
+ * `typed_value`. `value` must not contain any shredded field according to
the shredding spec,
+ * but this requirement is not enforced. If `value` does contain a
shredded field, no error will
+ * occur, and the field in object `typed_value` will be the final result.
+ */
+ private static final class RowReader extends BaseVariantReader {
+
+ // For each field in `targetType`, store the index of the field with
the same name in object
+ // `typed_value`, or -1 if it doesn't exist in object `typed_value`.
+ private final int[] fieldInputIndices;
+
+ // For each field in `targetType`, store the reader from the
corresponding field in object
+ // `typed_value`, or null if it doesn't exist in object `typed_value`.
+ private final BaseVariantReader[] fieldReaders;
+
+ // If all fields in `targetType` can be found in object `typed_value`,
then the reader
+ // doesn't
+ // need to read from `value`.
+ private final boolean needUnshreddedObject;
+
+ public RowReader(VariantSchema schema, RowType targetType,
VariantCastArgs castArgs) {
+ super(schema, targetType, castArgs);
+
+ List<DataField> targetFields = targetType.getFields();
+ this.fieldInputIndices = new int[targetFields.size()];
+ for (int i = 0; i < targetFields.size(); i++) {
+ fieldInputIndices[i] =
+ schema.objectSchemaMap != null
+ ?
schema.objectSchemaMap.get(targetFields.get(i).name())
+ : -1;
+ }
+
+ this.fieldReaders = new BaseVariantReader[targetFields.size()];
+ for (int i = 0; i < targetFields.size(); i++) {
+ int inputIdx = fieldInputIndices[i];
+ if (inputIdx >= 0) {
+ VariantSchema fieldSchema =
schema.objectSchema[inputIdx].schema();
+ fieldReaders[i] =
+ BaseVariantReader.create(
+ fieldSchema, targetFields.get(i).type(),
castArgs, false);
+ } else {
+ fieldReaders[i] = null;
+ }
+ }
+
+ // Check if any field is missing (i.e., index == -1)
+ boolean needsUnshredded = false;
+ for (int idx : fieldInputIndices) {
+ if (idx < 0) {
+ needsUnshredded = true;
+ break;
+ }
+ }
+ this.needUnshreddedObject = needsUnshredded;
+ }
+
+ @Override
+ public Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+ if (schema.objectSchema == null) {
+ return invalidCast(row, topLevelMetadata);
+ }
+
+ InternalRow obj = row.getRow(schema.typedIdx,
schema.objectSchema.length);
+ GenericRow result = new GenericRow(fieldInputIndices.length);
+ GenericVariant unshreddedObject = null;
+
+ if (needUnshreddedObject
+ && schema.variantIdx >= 0
+ && !row.isNullAt(schema.variantIdx)) {
+ unshreddedObject =
+ new GenericVariant(row.getBinary(schema.variantIdx),
topLevelMetadata);
+ if (unshreddedObject.getType() != Type.OBJECT) {
+ throw malformedVariant();
+ }
+ }
+
+ int numFields = fieldInputIndices.length;
+ int i = 0;
+ while (i < numFields) {
+ int inputIdx = fieldInputIndices[i];
+ if (inputIdx >= 0) {
+ // Shredded field must not be null.
+ if (obj.isNullAt(inputIdx)) {
+ throw malformedVariant();
+ }
+
+ VariantSchema fieldSchema =
schema.objectSchema[inputIdx].schema();
+ InternalRow fieldInput = obj.getRow(inputIdx,
fieldSchema.numFields);
+ // Only read from the shredded field if it is not missing.
+ if ((fieldSchema.typedIdx >= 0 &&
!fieldInput.isNullAt(fieldSchema.typedIdx))
+ || (fieldSchema.variantIdx >= 0
+ &&
!fieldInput.isNullAt(fieldSchema.variantIdx))) {
+ Object fieldValue = fieldReaders[i].read(fieldInput,
topLevelMetadata);
+ result.setField(i, fieldValue);
+ }
+ } else if (unshreddedObject != null) {
+ DataField field = ((RowType) targetType).getField(i);
+ String fieldName = field.name();
+ DataType fieldType = field.type();
+ GenericVariant unshreddedField =
unshreddedObject.getFieldByKey(fieldName);
+ if (unshreddedField != null) {
+ Object castedValue = VariantGet.cast(unshreddedField,
fieldType, castArgs);
+ result.setField(i, castedValue);
+ }
+ }
+ i += 1;
+ }
+ return result;
+ }
+ }
+
+ /** Read Parquet variant values into a Paimon array type. */
+ private static final class ArrayReader extends BaseVariantReader {
+
+ private final BaseVariantReader elementReader;
+
+ public ArrayReader(VariantSchema schema, ArrayType targetType,
VariantCastArgs castArgs) {
+ super(schema, targetType, castArgs);
+ if (schema.arraySchema != null) {
+ this.elementReader =
+ BaseVariantReader.create(
+ schema.arraySchema,
targetType.getElementType(), castArgs, false);
+ } else {
+ this.elementReader = null;
+ }
+ }
+
+ @Override
+ protected Object readFromTyped(InternalRow row, byte[]
topLevelMetadata) {
+ if (schema.arraySchema == null) {
+ return invalidCast(row, topLevelMetadata);
+ }
+
+ int elementNumFields = schema.arraySchema.numFields;
+ InternalArray arr = row.getArray(schema.typedIdx);
+ int size = arr.size();
+ Object[] result = new Object[size];
+ int i = 0;
+ while (i < size) {
+ if (arr.isNullAt(i)) {
+ throw malformedVariant();
+ }
+ result[i] = elementReader.read(arr.getRow(i,
elementNumFields), topLevelMetadata);
+ i += 1;
+ }
+ return new GenericArray(result);
+ }
+ }
+
+ /**
+ * Read variant values into a Paimon map type with string key type. The
input must be object for
+ * a valid cast. The resulting map contains shredded fields from object
`typed_value` and
+ * unshredded fields from object `value`. `value` must not contain any
shredded field according
+ * to the shredding spec. Unlike `StructReader`, this requirement is
enforced in `MapReader`. If
+ * `value` does contain a shredded field, throw a MALFORMED_VARIANT error.
The purpose is to
+ * avoid duplicate map keys.
+ */
+ private static final class MapReader extends BaseVariantReader {
+
+ private final BaseVariantReader[] valueReaders;
+ private final BinaryString[] shreddedFieldNames;
+ private final MapType targetType;
+
+ public MapReader(VariantSchema schema, MapType targetType,
VariantCastArgs castArgs) {
+ super(schema, targetType, castArgs);
+ this.targetType = targetType;
+
+ if (schema.objectSchema != null) {
+ int len = schema.objectSchema.length;
+ this.valueReaders = new BaseVariantReader[len];
+ this.shreddedFieldNames = new BinaryString[len];
+
+ for (int i = 0; i < len; i++) {
+ VariantSchema.ObjectField fieldInfo =
schema.objectSchema[i];
+ this.valueReaders[i] =
+ BaseVariantReader.create(
+ fieldInfo.schema(),
targetType.getValueType(), castArgs, false);
+ this.shreddedFieldNames[i] =
BinaryString.fromString(fieldInfo.fieldName());
+ }
+ } else {
+ this.valueReaders = null;
+ this.shreddedFieldNames = null;
+ }
+ }
+
+ @Override
+ public Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+ if (schema.objectSchema == null) {
+ return invalidCast(row, topLevelMetadata);
+ }
+
+ InternalRow obj = row.getRow(schema.typedIdx,
schema.objectSchema.length);
+ int numShreddedFields = (valueReaders != null) ?
valueReaders.length : 0;
+
+ GenericVariant unshreddedObject = null;
+ if (schema.variantIdx >= 0 && !row.isNullAt(schema.variantIdx)) {
+ unshreddedObject =
+ new GenericVariant(row.getBinary(schema.variantIdx),
topLevelMetadata);
+ if (unshreddedObject.getType() != Type.OBJECT) {
+ throw malformedVariant();
+ }
+ }
+
+ int numUnshreddedFields =
+ (unshreddedObject != null) ? unshreddedObject.objectSize()
: 0;
+ int totalCapacity = numShreddedFields + numUnshreddedFields;
+
+ HashMap<BinaryString, Object> map = new HashMap<>(totalCapacity);
+ int i = 0;
+ while (i < numShreddedFields) {
+ // Shredded field must not be null.
+ if (obj.isNullAt(i)) {
+ throw malformedVariant();
+ }
+ VariantSchema fieldSchema = schema.objectSchema[i].schema();
+ InternalRow fieldInput = obj.getRow(i, fieldSchema.numFields);
+ // Only add the shredded field to map if it is not missing.
+ if ((fieldSchema.typedIdx >= 0 &&
!fieldInput.isNullAt(fieldSchema.typedIdx))
+ || (fieldSchema.variantIdx >= 0
+ &&
!fieldInput.isNullAt(fieldSchema.variantIdx))) {
+ map.put(
+ shreddedFieldNames[i],
+ valueReaders[i].read(fieldInput,
topLevelMetadata));
+ }
+ i += 1;
+ }
+ i = 0;
+ while (i < numUnshreddedFields) {
+ GenericVariant.ObjectField field =
unshreddedObject.getFieldAtIndex(i);
+ if (schema.objectSchemaMap.containsKey(field.key)) {
+ throw malformedVariant();
+ }
+ map.put(
+ BinaryString.fromString(field.key),
+ VariantGet.cast(field.value,
targetType.getValueType(), castArgs));
+ i += 1;
+ }
+ return new GenericMap(map);
+ }
+ }
+
+ /** Read variant values into a Paimon variant type (the binary format). */
+ private static final class VariantReader extends BaseVariantReader {
+
+ // An optional optimization: the user can set it to true if the
variant column is
+ // unshredded and the extraction path is empty. We are not required to
do anything special,
+ // but
+ // we can avoid rebuilding variant for optimization purpose.
+ private final boolean isTopLevelUnshredded;
+
+ public VariantReader(
+ VariantSchema schema,
+ VariantType targetType,
+ VariantCastArgs castArgs,
+ boolean isTopLevelUnshredded) {
+ super(schema, targetType, castArgs);
+ this.isTopLevelUnshredded = isTopLevelUnshredded;
+ }
+
+ @Override
+ public Object read(InternalRow row, byte[] topLevelMetadata) {
+ if (isTopLevelUnshredded) {
+ if (row.isNullAt(schema.variantIdx)) {
+ throw malformedVariant();
+ }
+ return new GenericVariant(row.getBinary(schema.variantIdx),
topLevelMetadata);
+ }
+ return rebuildVariant(row, topLevelMetadata);
+ }
+ }
+
+ /**
+ * Read variant values into a Paimon scalar type. When `typed_value` is
not null but not a
+ * scalar, all other target types should return an invalid cast, but only
the string target type
+ * can still build a string from array/object `typed_value`. For scalar
`typed_value`, it
+ * depends on `ScalarCastHelper` to perform the cast. According to the
shredding spec, scalar
+ * `typed_value` and `value` must not be non-null at the same time. The
requirement is not
+ * enforced in this reader. If they are both non-null, no error will
occur, and the reader will
+ * read from `typed_value`.
+ */
+ private static final class ScalarReader extends BaseVariantReader {
+
+ private final DataType scalaType;
+ @Nullable private final CastExecutor<Object, Object> resolve;
+ private final boolean noNeedCast;
+
+ public ScalarReader(VariantSchema schema, DataType targetType,
VariantCastArgs castArgs) {
+ super(schema, targetType, castArgs);
+ if (schema.scalarSchema != null) {
+ scalaType =
PaimonShreddingUtils.scalarSchemaToPaimonType(schema.scalarSchema);
+ noNeedCast = scalaType.equals(targetType);
+ resolve =
+ noNeedCast
+ ? null
+ : (CastExecutor<Object, Object>)
+ CastExecutors.resolve(scalaType,
targetType);
+ } else {
+ scalaType = null;
+ noNeedCast = false;
+ resolve = null;
+ }
+ }
+
+ @Override
+ protected Object readFromTyped(InternalRow row, byte[]
topLevelMetadata) {
+ if (!noNeedCast && resolve == null) {
+ if (targetType.equals(DataTypes.STRING())) {
+ return BinaryString.fromString(
+ rebuildVariant(row,
topLevelMetadata).toJson(castArgs.zoneId()));
+ } else {
+ return invalidCast(row, topLevelMetadata);
+ }
+ }
+
+ int typedValueIdx = schema.typedIdx;
+
+ if (row.isNullAt(typedValueIdx)) {
+ return null;
+ }
+
+ Object i;
+ if (scalaType.equals(DataTypes.STRING())) {
+ i = row.getString(typedValueIdx);
+ } else if (scalaType instanceof TinyIntType) {
+ i = row.getByte(typedValueIdx);
+ } else if (scalaType instanceof SmallIntType) {
+ i = row.getShort(typedValueIdx);
+ } else if (scalaType instanceof IntType) {
+ i = row.getInt(typedValueIdx);
+ } else if (scalaType instanceof BigIntType) {
+ i = row.getLong(typedValueIdx);
+ } else if (scalaType instanceof FloatType) {
+ i = row.getFloat(typedValueIdx);
+ } else if (scalaType instanceof DoubleType) {
+ i = row.getDouble(typedValueIdx);
+ } else if (scalaType instanceof BooleanType) {
+ i = row.getBoolean(typedValueIdx);
+ } else if (scalaType.equals(DataTypes.BYTES())) {
+ i = row.getBinary(typedValueIdx);
+ } else if (scalaType instanceof DecimalType) {
+ i =
+ row.getDecimal(
+ typedValueIdx,
+ ((DecimalType) scalaType).getPrecision(),
+ ((DecimalType) scalaType).getScale());
+ } else {
+ throw new UnsupportedOperationException("Unsupported scalar
type: " + scalaType);
+ }
+ if (noNeedCast) {
+ return i;
+ }
+ try {
+ return resolve.cast(i);
+ } catch (Exception e) {
+ return invalidCast(row, topLevelMetadata);
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
index 7463490d57..89181c72e0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
@@ -18,6 +18,10 @@
package org.apache.paimon.data.variant;
+import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
+import org.apache.paimon.data.variant.VariantPathSegment.ObjectExtraction;
+import org.apache.paimon.types.DataType;
+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -134,13 +138,9 @@ public final class GenericVariant implements Variant,
Serializable {
}
}
- @Override
- public String toJson() {
- return toJson(ZoneOffset.UTC);
- }
-
// Stringify the variant in JSON format.
// Throw `MALFORMED_VARIANT` if the variant is malformed.
+ @Override
public String toJson(ZoneId zoneId) {
StringBuilder sb = new StringBuilder();
toJsonImpl(value, metadata, pos, sb, zoneId);
@@ -152,39 +152,19 @@ public final class GenericVariant implements Variant,
Serializable {
return toJson();
}
- public Object variantGet(String path) {
+ public Object variantGet(String path, DataType dataType, VariantCastArgs
castArgs) {
GenericVariant v = this;
- PathSegment[] parsedPath = PathSegment.parse(path);
- for (PathSegment pathSegment : parsedPath) {
- if (pathSegment.isKey() && v.getType() == Type.OBJECT) {
- v = v.getFieldByKey(pathSegment.getKey());
- } else if (pathSegment.isIndex() && v.getType() == Type.ARRAY) {
- v = v.getElementAtIndex(pathSegment.getIndex());
+ VariantPathSegment[] parsedPath = VariantPathSegment.parse(path);
+ for (VariantPathSegment pathSegment : parsedPath) {
+ if (pathSegment instanceof ObjectExtraction && v.getType() ==
Type.OBJECT) {
+ v = v.getFieldByKey(((ObjectExtraction) pathSegment).getKey());
+ } else if (pathSegment instanceof ArrayExtraction && v.getType()
== Type.ARRAY) {
+ v = v.getElementAtIndex(((ArrayExtraction)
pathSegment).getIndex());
} else {
return null;
}
}
-
- switch (v.getType()) {
- case OBJECT:
- case ARRAY:
- return v.toJson();
- case STRING:
- return v.getString();
- case LONG:
- return v.getLong();
- case DOUBLE:
- return v.getDouble();
- case DECIMAL:
- return v.getDecimal();
- case BOOLEAN:
- return v.getBoolean();
- case NULL:
- return null;
- default:
- // todo: support other types
- throw new IllegalArgumentException("Unsupported type: " +
v.getType());
- }
+ return VariantGet.cast(v, dataType, castArgs);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index fd8285a151..b239c5cf98 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -28,6 +28,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
+import org.apache.paimon.data.variant.VariantPathSegment.ObjectExtraction;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -41,6 +43,10 @@ import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type.ARRAY;
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type.OBJECT;
+import static
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
/** Utils for paimon shredding. */
public class PaimonShreddingUtils {
@@ -134,8 +140,76 @@ public class PaimonShreddingUtils {
}
}
+ /** The search result of a `VariantPathSegment` in a `VariantSchema`. */
+ public static class SchemaPathSegment {
+
+ private final VariantPathSegment rawPath;
+
+ // Whether this path segment is an object or array extraction.
+ private final boolean isObject;
+ // `schema.typedIdx`, if the path exists in the schema (for object
extraction, the schema
+ // should contain an object `typed_value` containing the requested
field; similar for array
+ // extraction). Negative otherwise.
+ private final int typedIdx;
+
+ // For object extraction, it is the index of the desired field in
`schema.objectSchema`. If
+ // the
+ // requested field doesn't exist, both `extractionIdx/typedIdx` are
set to negative.
+ // For array extraction, it is the array index. The information is
already stored in
+ // `rawPath`,
+ // but accessing a raw int should be more efficient than `rawPath`,
which is an `Either`.
+ private final int extractionIdx;
+
+ public SchemaPathSegment(
+ VariantPathSegment rawPath, boolean isObject, int typedIdx,
int extractionIdx) {
+ this.rawPath = rawPath;
+ this.isObject = isObject;
+ this.typedIdx = typedIdx;
+ this.extractionIdx = extractionIdx;
+ }
+
+ public VariantPathSegment rawPath() {
+ return rawPath;
+ }
+
+ public boolean isObject() {
+ return isObject;
+ }
+
+ public int typedIdx() {
+ return typedIdx;
+ }
+
+ public int extractionIdx() {
+ return extractionIdx;
+ }
+ }
+
+ /**
+ * Represent a single field in a variant struct, that is a single
requested field that the scan
+ * should produce by extracting from the variant column.
+ */
+ public static class FieldToExtract {
+
+ private final SchemaPathSegment[] path;
+ private final BaseVariantReader reader;
+
+ public FieldToExtract(SchemaPathSegment[] path, BaseVariantReader
reader) {
+ this.path = path;
+ this.reader = reader;
+ }
+
+ public SchemaPathSegment[] path() {
+ return path;
+ }
+
+ public BaseVariantReader reader() {
+ return reader;
+ }
+ }
+
public static RowType variantShreddingSchema(RowType rowType) {
- return variantShreddingSchema(rowType, true);
+ return variantShreddingSchema(rowType, true, false);
}
/**
@@ -146,10 +220,11 @@ public class PaimonShreddingUtils {
* binary, value: binary, typed_value: struct< a:
struct<typed_value: int, value:
* binary>, b: struct<typed_value: string, value: binary>>>
*/
- private static RowType variantShreddingSchema(DataType dataType, boolean
topLevel) {
+ private static RowType variantShreddingSchema(
+ DataType dataType, boolean isTopLevel, boolean isObjectField) {
RowType.Builder builder = RowType.builder();
- if (topLevel) {
- builder.field(METADATA_FIELD_NAME, DataTypes.BYTES());
+ if (isTopLevel) {
+ builder.field(METADATA_FIELD_NAME, DataTypes.BYTES().copy(false));
}
switch (dataType.getTypeRoot()) {
case ARRAY:
@@ -157,7 +232,7 @@ public class PaimonShreddingUtils {
ArrayType shreddedArrayType =
new ArrayType(
arrayType.isNullable(),
-
variantShreddingSchema(arrayType.getElementType(), false));
+
variantShreddingSchema(arrayType.getElementType(), false, false));
builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
builder.field(TYPED_VALUE_FIELD_NAME, shreddedArrayType);
break;
@@ -173,14 +248,21 @@ public class PaimonShreddingUtils {
field ->
field.newType(
variantShreddingSchema(
-
field.type(), false)
+
field.type(),
+
false,
+
true)
.notNull()))
.collect(Collectors.toList()));
builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
builder.field(TYPED_VALUE_FIELD_NAME, shreddedRowType);
break;
case VARIANT:
- builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+ // For Variant, we don't need a typed column. If there is no
typed column, value is
+ // required
+ // for array elements or top-level fields, but optional for
objects (where a null
+ // represents
+ // a missing field).
+ builder.field(VARIANT_VALUE_FIELD_NAME,
DataTypes.BYTES().copy(isObjectField));
break;
case CHAR:
case VARCHAR:
@@ -345,6 +427,45 @@ public class PaimonShreddingUtils {
arraySchema);
}
+ public static DataType scalarSchemaToPaimonType(VariantSchema.ScalarType
scala) {
+ if (scala instanceof VariantSchema.StringType) {
+ return DataTypes.STRING();
+ } else if (scala instanceof VariantSchema.IntegralType) {
+ VariantSchema.IntegralType it = (VariantSchema.IntegralType) scala;
+ switch (it.size) {
+ case BYTE:
+ return DataTypes.TINYINT();
+ case SHORT:
+ return DataTypes.SMALLINT();
+ case INT:
+ return DataTypes.INT();
+ case LONG:
+ return DataTypes.BIGINT();
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } else if (scala instanceof VariantSchema.FloatType) {
+ return DataTypes.FLOAT();
+ } else if (scala instanceof VariantSchema.DoubleType) {
+ return DataTypes.DOUBLE();
+ } else if (scala instanceof VariantSchema.BooleanType) {
+ return DataTypes.BOOLEAN();
+ } else if (scala instanceof VariantSchema.BinaryType) {
+ return DataTypes.BYTES();
+ } else if (scala instanceof VariantSchema.DecimalType) {
+ VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scala;
+ return DataTypes.DECIMAL(dt.precision, dt.scale);
+ } else if (scala instanceof VariantSchema.DateType) {
+ return DataTypes.DATE();
+ } else if (scala instanceof VariantSchema.TimestampType) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+ } else if (scala instanceof VariantSchema.TimestampNTZType) {
+ return DataTypes.TIMESTAMP();
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private static RuntimeException invalidVariantShreddingSchema(DataType
dataType) {
return new RuntimeException("Invalid variant shredding schema: " +
dataType);
}
@@ -428,9 +549,178 @@ public class PaimonShreddingUtils {
.row;
}
- /** Rebuilds a variant from shredded components with the variant schema. */
- public static Variant rebuild(InternalRow row, VariantSchema
variantSchema) {
- return ShreddingUtils.rebuild(new PaimonShreddedRow(row),
variantSchema);
+ /** Assemble a variant (binary format) from a variant value. */
+ public static Variant assembleVariant(InternalRow row, VariantSchema
schema) {
+ return ShreddingUtils.rebuild(new PaimonShreddedRow(row), schema);
+ }
+
+ /** Assemble a variant struct, in which each field is extracted from the
variant value. */
+ public static InternalRow assembleVariantStruct(
+ InternalRow inputRow, VariantSchema schema, FieldToExtract[]
fields) {
+ if (inputRow.isNullAt(schema.topLevelMetadataIdx)) {
+ throw malformedVariant();
+ }
+ byte[] topLevelMetadata =
inputRow.getBinary(schema.topLevelMetadataIdx);
+ int numFields = fields.length;
+ GenericRow resultRow = new GenericRow(numFields);
+ int fieldIdx = 0;
+ while (fieldIdx < numFields) {
+ resultRow.setField(
+ fieldIdx,
+ extractField(
+ inputRow,
+ topLevelMetadata,
+ schema,
+ fields[fieldIdx].path(),
+ fields[fieldIdx].reader()));
+ fieldIdx += 1;
+ }
+ return resultRow;
+ }
+
+ /**
+ * Return a list of fields to extract. `targetType` must be either variant
or variant struct. If
+ * it is variant, return null because the target is the full variant and
there is no field to
+ * extract. If it is variant struct, return a list of fields matching the
variant struct fields.
+ */
+ public static FieldToExtract[] getFieldsToExtract(
+ DataType targetType, VariantSchema variantSchema) {
+ // todo: implement it
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * According to the dataType, variant extraction path and variantSchema,
build the
+ * FieldToExtract.
+ */
+ public static FieldToExtract buildFieldsToExtract(
+ DataType dataType, String path, VariantCastArgs castArgs,
VariantSchema inputSchema) {
+ VariantPathSegment[] rawPath = VariantPathSegment.parse(path);
+ SchemaPathSegment[] schemaPath = new SchemaPathSegment[rawPath.length];
+ VariantSchema schema = inputSchema;
+ // Search `rawPath` in `schema` to produce `schemaPath`. If a raw path
segment cannot be
+ // found at a certain level of the file type, then `typedIdx` will be
-1 starting from
+ // this position, and the final `schema` will be null.
+ for (int i = 0; i < rawPath.length; i++) {
+ VariantPathSegment extraction = rawPath[i];
+ boolean isObject = extraction instanceof ObjectExtraction;
+ int typedIdx = -1;
+ int extractionIdx = -1;
+
+ if (extraction instanceof ObjectExtraction) {
+ ObjectExtraction objExtr = (ObjectExtraction) extraction;
+ if (schema != null && schema.objectSchemaMap != null) {
+ Integer fieldIdx =
schema.objectSchemaMap.get(objExtr.getKey());
+ if (fieldIdx != null) {
+ typedIdx = schema.typedIdx;
+ extractionIdx = fieldIdx;
+ schema = schema.objectSchema[fieldIdx].schema();
+ } else {
+ schema = null;
+ }
+ } else {
+ schema = null;
+ }
+ } else if (extraction instanceof ArrayExtraction) {
+ ArrayExtraction arrExtr = (ArrayExtraction) extraction;
+ if (schema != null && schema.arraySchema != null) {
+ typedIdx = schema.typedIdx;
+ extractionIdx = arrExtr.getIndex();
+ schema = schema.arraySchema;
+ } else {
+ schema = null;
+ }
+ } else {
+ schema = null;
+ }
+ schemaPath[i] = new SchemaPathSegment(extraction, isObject,
typedIdx, extractionIdx);
+ }
+
+ BaseVariantReader reader =
+ BaseVariantReader.create(
+ schema,
+ dataType,
+ castArgs,
+ (schemaPath.length == 0) &&
inputSchema.isUnshredded());
+ return new FieldToExtract(schemaPath, reader);
+ }
+
+ /**
+ * Extract a single variant struct field from a variant value. It steps
into `inputRow`
+ * according to the variant extraction path, and read the extracted value
as the target type.
+ */
+ private static Object extractField(
+ InternalRow inputRow,
+ byte[] topLevelMetadata,
+ VariantSchema inputSchema,
+ SchemaPathSegment[] pathList,
+ BaseVariantReader reader) {
+ int pathIdx = 0;
+ int pathLen = pathList.length;
+ InternalRow row = inputRow;
+ VariantSchema schema = inputSchema;
+ while (pathIdx < pathLen) {
+ SchemaPathSegment path = pathList[pathIdx];
+
+ if (path.typedIdx() < 0) {
+ // The extraction doesn't exist in `typed_value`. Try to
extract the remaining part
+ // of the
+ // path in `value`.
+ int variantIdx = schema.variantIdx;
+ if (variantIdx < 0 || row.isNullAt(variantIdx)) {
+ return null;
+ }
+ GenericVariant v = new
GenericVariant(row.getBinary(variantIdx), topLevelMetadata);
+ while (pathIdx < pathLen) {
+ VariantPathSegment rowPath = pathList[pathIdx].rawPath();
+ if (rowPath instanceof ObjectExtraction && v.getType() ==
OBJECT) {
+ v = v.getFieldByKey(((ObjectExtraction)
rowPath).getKey());
+ } else if (rowPath instanceof ArrayExtraction &&
v.getType() == ARRAY) {
+ v = v.getElementAtIndex(((ArrayExtraction)
rowPath).getIndex());
+ } else {
+ v = null;
+ }
+ if (v == null) {
+ return null;
+ }
+ pathIdx += 1;
+ }
+ return VariantGet.cast(v, reader.targetType(),
reader.castArgs());
+ }
+
+ if (row.isNullAt(path.typedIdx())) {
+ return null;
+ }
+
+ if (path.isObject()) {
+ InternalRow obj = row.getRow(path.typedIdx(),
schema.objectSchema.length);
+ // Object field must not be null.
+ if (obj.isNullAt(path.extractionIdx())) {
+ throw malformedVariant();
+ }
+ schema = schema.objectSchema[path.extractionIdx()].schema();
+ row = obj.getRow(path.extractionIdx(), schema.numFields);
+ // Return null if the field is missing.
+ if ((schema.typedIdx < 0 || row.isNullAt(schema.typedIdx))
+ && (schema.variantIdx < 0 ||
row.isNullAt(schema.variantIdx))) {
+ return null;
+ }
+ } else {
+ InternalArray arr = row.getArray(path.typedIdx());
+ // Return null if the extraction index is out of bound.
+ if (path.extractionIdx() >= arr.size()) {
+ return null;
+ }
+ // Array element must not be null.
+ if (arr.isNullAt(path.extractionIdx())) {
+ throw malformedVariant();
+ }
+ schema = schema.arraySchema;
+ row = arr.getRow(path.extractionIdx(), schema.numFields);
+ }
+ pathIdx += 1;
+ }
+ return reader.read(row, topLevelMetadata);
}
/** Assemble a batch of variant (binary format) from a batch of variant
values. */
@@ -445,7 +735,7 @@ public class PaimonShreddingUtils {
if (input.isNullAt(i)) {
output.setNullAt(i);
} else {
- Variant v = rebuild(((RowColumnVector) input).getRow(i),
variantSchema);
+ Variant v = assembleVariant(((RowColumnVector)
input).getRow(i), variantSchema);
byte[] value = v.value();
byte[] metadata = v.metadata();
valueChild.putByteArray(i, value, 0, value.length);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
index af37d18b26..5bc19972f9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
@@ -18,6 +18,11 @@
package org.apache.paimon.data.variant;
+import org.apache.paimon.types.DataType;
+
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
/**
* A Variant represents a type that contain one of: 1) Primitive: A type and
corresponding value
* (e.g. INT, STRING); 2) Array: An ordered list of Variant values; 3) Object:
An unordered
@@ -43,7 +48,12 @@ public interface Variant {
byte[] value();
/** Parses the variant to json. */
- String toJson();
+ default String toJson() {
+ return toJson(ZoneOffset.UTC);
+ }
+
+ /** Parses the variant to json with zoneId. */
+ String toJson(ZoneId zoneId);
/**
* Extracts a sub-variant value according to a path which start with a
`$`. e.g.
@@ -51,8 +61,10 @@ public interface Variant {
* <p>access object's field: `$.key` or `$['key']` or `$["key"]`.
*
* <p>access array's first elem: `$.array[0]`
+ *
+ * <p>and then cast the value to the target type.
*/
- Object variantGet(String path);
+ Object variantGet(String path, DataType dataType, VariantCastArgs
castArgs);
/** Returns the size of the variant in bytes. */
long sizeInBytes();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
new file mode 100644
index 0000000000..f95acc94b1
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.time.ZoneId;
+
+/** Several parameters used by `VariantGet.cast`. Packed together to simplify
parameter passing. */
+public class VariantCastArgs {
+
+ private final boolean failOnError;
+ private final ZoneId zoneId;
+
+ public VariantCastArgs(boolean failOnError, ZoneId zoneId) {
+ this.failOnError = failOnError;
+ this.zoneId = zoneId;
+ }
+
+ public boolean failOnError() {
+ return failOnError;
+ }
+
+ public ZoneId zoneId() {
+ return zoneId;
+ }
+
+ @Override
+ public String toString() {
+ return "VariantCastArgs{" + "failOnError=" + failOnError + ", zoneId="
+ zoneId + '}';
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java
new file mode 100644
index 0000000000..51233624f9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.GenericVariantUtil.Type;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+
+/** Utils for variant get. */
+public class VariantGet {
+
+ public static Object cast(GenericVariant v, DataType dataType,
VariantCastArgs castArgs) {
+ if (dataType instanceof VariantType) {
+ GenericVariantBuilder builder = new GenericVariantBuilder(false);
+ builder.appendVariant(v);
+ GenericVariant result = builder.result();
+ return new GenericVariant(result.value(), result.metadata());
+ }
+
+ Type variantType = v.getType();
+ if (variantType == Type.NULL) {
+ return null;
+ }
+
+ if (variantType == Type.UUID) {
+ // There's no UUID type in Paimon. We only allow it to be cast to
string.
+ if (dataType.equals(DataTypes.STRING())) {
+ return BinaryString.fromString(v.getUuid().toString());
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ }
+
+ if (dataType instanceof RowType) {
+ RowType rowType = (RowType) dataType;
+ if (variantType == Type.OBJECT) {
+ GenericRow row = new GenericRow(rowType.getFieldCount());
+ for (int i = 0; i < v.objectSize(); i++) {
+ GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+ int idx = rowType.getFieldIndex(field.key);
+ if (idx != -1) {
+ row.setField(idx, cast(field.value,
rowType.getTypeAt(idx), castArgs));
+ }
+ }
+ return row;
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ } else if (dataType instanceof MapType) {
+ MapType mapType = (MapType) dataType;
+ DataType valueType = mapType.getValueType();
+ if (mapType.getKeyType().equals(DataTypes.STRING())) {
+ if (variantType == Type.OBJECT) {
+ int size = v.objectSize();
+ HashMap<BinaryString, Object> map = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ GenericVariant.ObjectField field =
v.getFieldAtIndex(i);
+ map.put(
+ BinaryString.fromString(field.key),
+ cast(field.value, valueType, castArgs));
+ }
+ return new GenericMap(map);
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ } else if (dataType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dataType;
+ if (variantType == Type.ARRAY) {
+ int size = v.arraySize();
+ Object[] array = new Object[size];
+ for (int i = 0; i < size; i++) {
+ array[i] = cast(v.getElementAtIndex(i),
arrayType.getElementType(), castArgs);
+ }
+ return new GenericArray(array);
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ } else {
+ Object input;
+ DataType inputType;
+ switch (variantType) {
+ case OBJECT:
+ case ARRAY:
+ if (dataType.equals(DataTypes.STRING())) {
+ return
BinaryString.fromString(v.toJson(castArgs.zoneId()));
+ } else {
+ return invalidCast(v, dataType, castArgs);
+ }
+ case BOOLEAN:
+ input = v.getBoolean();
+ inputType = DataTypes.BOOLEAN();
+ break;
+ case LONG:
+ input = v.getLong();
+ inputType = DataTypes.BIGINT();
+ break;
+ case STRING:
+ input = BinaryString.fromString(v.getString());
+ inputType = DataTypes.STRING();
+ break;
+ case DOUBLE:
+ input = v.getDouble();
+ inputType = DataTypes.DOUBLE();
+ break;
+ case DECIMAL:
+ BigDecimal decimal = v.getDecimal();
+ int precision = decimal.precision();
+ int scale = decimal.scale();
+ input = Decimal.fromBigDecimal(decimal, precision, scale);
+ inputType = DataTypes.DECIMAL(precision, scale);
+ break;
+ case DATE:
+ input = (int) v.getLong();
+ inputType = DataTypes.DATE();
+ break;
+ case FLOAT:
+ input = v.getFloat();
+ inputType = DataTypes.FLOAT();
+ break;
+ default:
+ // todo: support other types
+ throw new IllegalArgumentException("Unsupported type: " +
v.getType());
+ }
+
+ if (inputType.equals(dataType)) {
+ return input;
+ }
+
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
CastExecutors.resolve(inputType, dataType);
+ if (resolve != null) {
+ try {
+ return resolve.cast(input);
+ } catch (Exception e) {
+ return invalidCast(v, dataType, castArgs);
+ }
+ }
+
+ return invalidCast(v, dataType, castArgs);
+ }
+ }
+
+ public static Object invalidCast(Variant v, DataType dataType,
VariantCastArgs castArgs) {
+ if (castArgs.failOnError()) {
+ throw new RuntimeException(
+ "Invalid cast " + v.toJson(castArgs.zoneId()) + " to " +
dataType);
+ } else {
+ return null;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
similarity index 70%
rename from
paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java
rename to
paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
index 5804fb9fcb..e6d8866ce6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
@@ -26,38 +26,9 @@ import java.util.regex.Pattern;
/**
* A path segment for variant get to represent either an object key access or
an array index access.
*/
-public class PathSegment {
- private final String key;
- private final Integer index;
+public abstract class VariantPathSegment {
- private PathSegment(String key, Integer index) {
- this.key = key;
- this.index = index;
- }
-
- public static PathSegment createKeySegment(String key) {
- return new PathSegment(key, null);
- }
-
- public static PathSegment createIndexSegment(int index) {
- return new PathSegment(null, index);
- }
-
- public boolean isKey() {
- return key != null;
- }
-
- public boolean isIndex() {
- return index != null;
- }
-
- public String getKey() {
- return key;
- }
-
- public Integer getIndex() {
- return index;
- }
+ public VariantPathSegment() {}
private static final Pattern ROOT_PATTERN = Pattern.compile("\\$");
// Parse index segment like `[123]`.
@@ -66,21 +37,21 @@ public class PathSegment {
private static final Pattern KEY_PATTERN =
Pattern.compile("\\.([^.\\[]+)|\\['([^']+)']|\\[\"([^\"]+)\"]");
- public static PathSegment[] parse(String str) {
+ public static VariantPathSegment[] parse(String str) {
// Validate root
Matcher rootMatcher = ROOT_PATTERN.matcher(str);
if (str.isEmpty() || !rootMatcher.find()) {
throw new IllegalArgumentException("Invalid path: " + str);
}
- List<PathSegment> segments = new ArrayList<>();
+ List<VariantPathSegment> segments = new ArrayList<>();
String remaining = str.substring(rootMatcher.end());
// Parse indexes and keys
while (!remaining.isEmpty()) {
Matcher indexMatcher = INDEX_PATTERN.matcher(remaining);
if (indexMatcher.lookingAt()) {
int index = Integer.parseInt(indexMatcher.group(1));
- segments.add(PathSegment.createIndexSegment(index));
+ segments.add(new ArrayExtraction(index));
remaining = remaining.substring(indexMatcher.end());
continue;
}
@@ -89,7 +60,7 @@ public class PathSegment {
if (keyMatcher.lookingAt()) {
for (int i = 1; i <= 3; i++) {
if (keyMatcher.group(i) != null) {
-
segments.add(PathSegment.createKeySegment(keyMatcher.group(i)));
+ segments.add(new
ObjectExtraction(keyMatcher.group(i)));
break;
}
}
@@ -99,6 +70,36 @@ public class PathSegment {
throw new IllegalArgumentException("Invalid path: " + str);
}
- return segments.toArray(new PathSegment[0]);
+ return segments.toArray(new VariantPathSegment[0]);
+ }
+
+ /** A path segment for object extraction. */
+ public static class ObjectExtraction extends VariantPathSegment {
+
+ private final String key;
+
+ private ObjectExtraction(String key) {
+ super();
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+ }
+
+ /** A path segment for array extraction. */
+ public static class ArrayExtraction extends VariantPathSegment {
+
+ private final Integer index;
+
+ public ArrayExtraction(Integer index) {
+ super();
+ this.index = index;
+ }
+
+ public Integer getIndex() {
+ return index;
+ }
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
index 9fb7f25ba2..68cc9ab0c0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
@@ -47,6 +47,14 @@ public class VariantSchema {
this.schema = schema;
}
+ public String fieldName() {
+ return fieldName;
+ }
+
+ public VariantSchema schema() {
+ return schema;
+ }
+
@Override
public String toString() {
return "ObjectField{" + "fieldName=" + fieldName + ", schema=" +
schema + '}';
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
index 2ca87b11e6..6a4aa5944b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
@@ -149,8 +149,6 @@ public class VariantShreddingWriter {
// Store the typed value.
result.addScalar(typedValue);
} else {
- GenericVariantBuilder variantBuilder = new
GenericVariantBuilder(false);
- variantBuilder.appendVariant(v);
result.addVariantValue(v.value());
}
} else {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
index c226b9dead..e7d81b57a2 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
@@ -30,7 +30,12 @@ import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
+import java.time.ZoneOffset;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.assembleVariant;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.buildVariantSchema;
+import static org.apache.paimon.data.variant.PaimonShreddingUtils.castShredded;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
import static org.apache.paimon.types.DataTypesTest.assertThat;
/** Test of {@link GenericVariant}. */
@@ -86,23 +91,51 @@ public class GenericVariantTest {
+ "}\n";
Variant variant = GenericVariant.fromJson(json);
- assertThat(variant.variantGet("$.object"))
+
+ VariantCastArgs castArgs = new VariantCastArgs(false, ZoneOffset.UTC);
+ assertThat(variant.variantGet("$.object", DataTypes.STRING(),
castArgs))
.isEqualTo(
- "{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main
St\"},\"age\":2,\"name\":\"Apache Paimon\"}");
- assertThat(variant.variantGet("$.object.name")).isEqualTo("Apache
Paimon");
-
assertThat(variant.variantGet("$.object.address.street")).isEqualTo("Main St");
-
assertThat(variant.variantGet("$[\"object\"]['address'].city")).isEqualTo("Hangzhou");
- assertThat(variant.variantGet("$.array")).isEqualTo("[1,2,3,4,5]");
- assertThat(variant.variantGet("$.array[0]")).isEqualTo(1L);
- assertThat(variant.variantGet("$.array[3]")).isEqualTo(4L);
- assertThat(variant.variantGet("$.string")).isEqualTo("Hello, World!");
- assertThat(variant.variantGet("$.long")).isEqualTo(12345678901234L);
- assertThat(variant.variantGet("$.double"))
+ BinaryString.fromString(
+
"{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main
St\"},\"age\":2,\"name\":\"Apache Paimon\"}"));
+ RowType address =
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"street", "city"});
+ assertThat(variant.variantGet("$.object.address", address, castArgs))
+ .isEqualTo(
+ GenericRow.of(
+ BinaryString.fromString("Main St"),
+ BinaryString.fromString("Hangzhou")));
+ assertThat(variant.variantGet("$.object.name", DataTypes.STRING(),
castArgs))
+ .isEqualTo(BinaryString.fromString("Apache Paimon"));
+ assertThat(variant.variantGet("$.object.address.street",
DataTypes.STRING(), castArgs))
+ .isEqualTo(BinaryString.fromString("Main St"));
+ assertThat(
+ variant.variantGet(
+ "$[\"object\"]['address'].city",
DataTypes.STRING(), castArgs))
+ .isEqualTo(BinaryString.fromString("Hangzhou"));
+ assertThat(variant.variantGet("$.array", DataTypes.STRING(), castArgs))
+ .isEqualTo(BinaryString.fromString("[1,2,3,4,5]"));
+ assertThat(variant.variantGet("$.array",
DataTypes.ARRAY(DataTypes.INT()), castArgs))
+ .isEqualTo(new GenericArray(new Integer[] {1, 2, 3, 4, 5}));
+ assertThat(variant.variantGet("$.array[0]", DataTypes.BIGINT(),
castArgs)).isEqualTo(1L);
+ assertThat(variant.variantGet("$.array[3]", DataTypes.BIGINT(),
castArgs)).isEqualTo(4L);
+ assertThat(variant.variantGet("$.string", DataTypes.STRING(),
castArgs))
+ .isEqualTo(BinaryString.fromString("Hello, World!"));
+ assertThat(variant.variantGet("$.long", DataTypes.BIGINT(), castArgs))
+ .isEqualTo(12345678901234L);
+ assertThat(variant.variantGet("$.long", DataTypes.STRING(), castArgs))
+ .isEqualTo(BinaryString.fromString("12345678901234"));
+ assertThat(variant.variantGet("$.double", DataTypes.DOUBLE(),
castArgs))
.isEqualTo(1.0123456789012345678901234567890123456789);
- assertThat(variant.variantGet("$.decimal")).isEqualTo(new
BigDecimal("100.99"));
- assertThat(variant.variantGet("$.boolean1")).isEqualTo(true);
- assertThat(variant.variantGet("$.boolean2")).isEqualTo(false);
- assertThat(variant.variantGet("$.nullField")).isNull();
+ assertThat(variant.variantGet("$.decimal", DataTypes.DECIMAL(5, 2),
castArgs))
+ .isEqualTo(Decimal.fromBigDecimal(new BigDecimal("100.99"), 5,
2));
+ assertThat(variant.variantGet("$.decimal", DataTypes.STRING(),
castArgs))
+ .isEqualTo(BinaryString.fromString("100.99"));
+ assertThat(variant.variantGet("$.boolean1", DataTypes.BOOLEAN(),
castArgs)).isEqualTo(true);
+ assertThat(variant.variantGet("$.boolean2", DataTypes.BOOLEAN(),
castArgs))
+ .isEqualTo(false);
+ assertThat(variant.variantGet("$.nullField", DataTypes.BOOLEAN(),
castArgs)).isNull();
}
@Test
@@ -228,14 +261,14 @@ public class GenericVariantTest {
private void testShreddingResult(
GenericVariant variant, RowType shreddedType, InternalRow
expected) {
- RowType shreddingSchema =
PaimonShreddingUtils.variantShreddingSchema(shreddedType);
- VariantSchema variantSchema =
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+ RowType shreddingSchema = variantShreddingSchema(shreddedType);
+ VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
// test cast shredded
- InternalRow shredded = PaimonShreddingUtils.castShredded(variant,
variantSchema);
+ InternalRow shredded = castShredded(variant, variantSchema);
assertThat(shredded).isEqualTo(expected);
// test rebuild
- Variant rebuild = PaimonShreddingUtils.rebuild(shredded,
variantSchema);
+ Variant rebuild = assembleVariant(shredded, variantSchema);
assertThat(variant.toJson()).isEqualTo(rebuild.toJson());
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
new file mode 100644
index 0000000000..d918045702
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneOffset;
+import java.util.HashMap;
+
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.assembleVariantStruct;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.buildFieldsToExtract;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.buildVariantSchema;
+import static org.apache.paimon.data.variant.PaimonShreddingUtils.castShredded;
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for PaimonShreddingUtils. */
+public class PaimonShreddingUtilsTest {
+
+ @Test
+ void testAssembleAllTypes() {
+ VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+ DataField f1 =
+ new DataField(
+ 1,
+ "object",
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.INT()},
+ new String[] {"name", "age"}));
+ DataField f2 = new DataField(2, "array",
DataTypes.ARRAY(DataTypes.INT()));
+ DataField f3 = new DataField(3, "string", DataTypes.STRING());
+ DataField f4 = new DataField(4, "tinyint", DataTypes.TINYINT());
+ DataField f5 = new DataField(5, "smallint", DataTypes.SMALLINT());
+ DataField f6 = new DataField(6, "int", DataTypes.INT());
+ DataField f7 = new DataField(7, "long", DataTypes.BIGINT());
+ DataField f8 = new DataField(8, "double", DataTypes.DOUBLE());
+ DataField f9 = new DataField(9, "decimal", DataTypes.DECIMAL(5, 2));
+ DataField f10 = new DataField(10, "boolean", DataTypes.BOOLEAN());
+ DataField f11 = new DataField(11, "nullField", DataTypes.INT());
+ RowType allTypes = RowType.of(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10,
f11);
+
+ String json =
+ "{\n"
+ + " \"object\": {\n"
+ + " \"name\": \"Apache Paimon\",\n"
+ + " \"age\": 3\n"
+ + " },\n"
+ + " \"array\": [1, 2, 3, 4, 5],\n"
+ + " \"string\": \"Hello, World!\",\n"
+ + " \"tinyint\": 1,\n"
+ + " \"smallint\": 3000,\n"
+ + " \"int\": 400000,\n"
+ + " \"long\": 12345678901234,\n"
+ + " \"double\":
1.0123456789012345678901234567890123456789,\n"
+ + " \"decimal\": 100.99,\n"
+ + " \"boolean\": true,\n"
+ + " \"nullField\": null\n"
+ + "}\n";
+
+ GenericVariant v = GenericVariant.fromJson(json);
+ GenericRow expert =
+ GenericRow.of(
+ GenericRow.of(BinaryString.fromString("Apache
Paimon"), 3),
+ new GenericArray(new Integer[] {1, 2, 3, 4, 5}),
+ BinaryString.fromString("Hello, World!"),
+ (byte) 1,
+ (short) 3000,
+ 400000,
+ 12345678901234L,
+ 1.0123456789012345678901234567890123456789D,
+ Decimal.fromBigDecimal(new
java.math.BigDecimal("100.99"), 5, 2),
+ true,
+ null);
+
+ // shredding to real type
+ RowType shreddedType = new RowType(allTypes.getFields());
+ RowType shreddingSchema = variantShreddingSchema(shreddedType);
+ VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+ FieldToExtract[] fields = new FieldToExtract[allTypes.getFieldCount()];
+ for (int i = 0; i < allTypes.getFields().size(); i++) {
+ fields[i] =
+ buildFieldsToExtract(
+ allTypes.getFields().get(i).type(),
+ "$." + allTypes.getFields().get(i).name(),
+ castArgs,
+ variantSchema);
+ }
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(expert);
+
+ // no shredding
+ shreddedType = RowType.of();
+ shreddingSchema = variantShreddingSchema(shreddedType);
+ variantSchema = buildVariantSchema(shreddingSchema);
+ fields = new FieldToExtract[allTypes.getFieldCount()];
+ for (int i = 0; i < allTypes.getFields().size(); i++) {
+ fields[i] =
+ buildFieldsToExtract(
+ allTypes.getFields().get(i).type(),
+ "$." + allTypes.getFields().get(i).name(),
+ castArgs,
+ variantSchema);
+ }
+
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(expert);
+
+ // shredding to string, then cast to the real type
+ shreddedType =
+ RowType.of(
+ allTypes.getFields().stream()
+ .map(a -> a.newType(DataTypes.STRING()))
+ .toArray(DataField[]::new));
+ shreddingSchema = variantShreddingSchema(shreddedType);
+ variantSchema = buildVariantSchema(shreddingSchema);
+ fields = new FieldToExtract[allTypes.getFieldCount()];
+ for (int i = 0; i < allTypes.getFields().size(); i++) {
+ fields[i] =
+ buildFieldsToExtract(
+ allTypes.getFields().get(i).type(),
+ "$." + allTypes.getFields().get(i).name(),
+ castArgs,
+ variantSchema);
+ }
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(expert);
+
+ // shredding to real type, then cast to the string
+ expert =
+ GenericRow.of(
+ BinaryString.fromString("{\"age\":3,\"name\":\"Apache
Paimon\"}"),
+ BinaryString.fromString("[1,2,3,4,5]"),
+ BinaryString.fromString("Hello, World!"),
+ BinaryString.fromString("1"),
+ BinaryString.fromString("3000"),
+ BinaryString.fromString("400000"),
+ BinaryString.fromString("12345678901234"),
+ BinaryString.fromString("1.0123456789012346"),
+ BinaryString.fromString("100.99"),
+ BinaryString.fromString("true"),
+ null);
+
+ shreddedType = new RowType(allTypes.getFields());
+ shreddingSchema = variantShreddingSchema(shreddedType);
+ variantSchema = buildVariantSchema(shreddingSchema);
+ fields = new FieldToExtract[allTypes.getFieldCount()];
+ for (int i = 0; i < allTypes.getFields().size(); i++) {
+ fields[i] =
+ buildFieldsToExtract(
+ DataTypes.STRING(),
+ "$." + allTypes.getFields().get(i).name(),
+ castArgs,
+ variantSchema);
+ }
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(expert);
+
+ // no shredding, then cast to the string
+ shreddedType = RowType.of();
+ shreddingSchema = variantShreddingSchema(shreddedType);
+ variantSchema = buildVariantSchema(shreddingSchema);
+ fields = new FieldToExtract[allTypes.getFieldCount()];
+ for (int i = 0; i < allTypes.getFields().size(); i++) {
+ fields[i] =
+ buildFieldsToExtract(
+ DataTypes.STRING(),
+ "$." + allTypes.getFields().get(i).name(),
+ castArgs,
+ variantSchema);
+ }
+
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(expert);
+
+ // cast struct to map
+ shreddedType = RowType.of(f1);
+ shreddingSchema = variantShreddingSchema(shreddedType);
+ variantSchema = buildVariantSchema(shreddingSchema);
+ fields = new FieldToExtract[1];
+ fields[0] =
+ buildFieldsToExtract(
+ new MapType(DataTypes.STRING(), DataTypes.STRING()),
+ "$.object",
+ castArgs,
+ variantSchema);
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(
+ GenericRow.of(
+ new GenericMap(
+ new HashMap<BinaryString,
BinaryString>() {
+ {
+ put(
+
BinaryString.fromString("age"),
+
BinaryString.fromString("3"));
+ put(
+
BinaryString.fromString("name"),
+
BinaryString.fromString("Apache Paimon"));
+ }
+ })));
+ }
+
+ @Test
+ void testAssembleVariantStructWithShredding() {
+ VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+ // shreddedType: ROW<a INT, b STRING>
+ RowType shreddedType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"a", "b"});
+ RowType shreddingSchema = variantShreddingSchema(shreddedType);
+ VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+
+ // fieldsToExtract: $.a : int, $.b : string
+ FieldToExtract f1 = buildFieldsToExtract(DataTypes.INT(), "$.a",
castArgs, variantSchema);
+ FieldToExtract f2 =
+ buildFieldsToExtract(DataTypes.STRING(), "$.b", castArgs,
variantSchema);
+ FieldToExtract[] fields = new FieldToExtract[] {f1, f2};
+
+ GenericVariant v = GenericVariant.fromJson("{\"a\": 1, \"b\":
\"hello\"}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(1, BinaryString.fromString("hello")));
+
+ v = GenericVariant.fromJson("{\"a\": 27}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(27, null));
+
+ v = GenericVariant.fromJson("{\"b\":\"hangzhou\", \"other\":\"xxx\"}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(null,
BinaryString.fromString("hangzhou")));
+
+ v = GenericVariant.fromJson("{\"other\":\"yyy\"}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(null, null));
+
+ v = GenericVariant.fromJson("{\"a\":\"27\"}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(27, null));
+
+ v = GenericVariant.fromJson("{}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(null, null));
+
+ // fieldsToExtract: $.a : int, $.b : string, $.c : string and c is not
in shreddedType
+ FieldToExtract f3 =
+ buildFieldsToExtract(DataTypes.STRING(), "$.c", castArgs,
variantSchema);
+ fields = new FieldToExtract[] {f1, f2, f3};
+ v = GenericVariant.fromJson("{\"c\": \"hi\", \"a\": 27}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(27, null,
BinaryString.fromString("hi")));
+
+ v = GenericVariant.fromJson("{\"c\": 1111, \"a\": 27}");
+ assertThat(assembleVariantStruct(castShredded(v, variantSchema),
variantSchema, fields))
+ .isEqualTo(GenericRow.of(27, null,
BinaryString.fromString("1111")));
+ }
+
+ @Test
+ void testVariantCastArgs() {
+ // shreddedType: ROW<a INT, b STRING>
+ RowType shreddedType = RowType.of(new DataType[] {DataTypes.INT()},
new String[] {"a"});
+ RowType shreddingSchema = variantShreddingSchema(shreddedType);
+ VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+
+ // failOnError = true
+ VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+ FieldToExtract f1 = buildFieldsToExtract(DataTypes.INT(), "$.a",
castArgs, variantSchema);
+ FieldToExtract[] fields1 = new FieldToExtract[] {f1};
+
+ GenericVariant v1 = GenericVariant.fromJson("{\"a\": \"not a num\"}");
+ assertThatThrownBy(
+ () ->
+ assembleVariantStruct(
+ castShredded(v1, variantSchema),
variantSchema, fields1));
+
+ // failOnError = false
+ castArgs = new VariantCastArgs(false, ZoneOffset.UTC);
+ FieldToExtract f2 = buildFieldsToExtract(DataTypes.INT(), "$.a",
castArgs, variantSchema);
+ FieldToExtract[] fields2 = new FieldToExtract[] {f2};
+
+ GenericVariant v2 = GenericVariant.fromJson("{\"a\": \"not a num\"}");
+ GenericRow genericRow = new GenericRow(1);
+ genericRow.setField(0, null);
+ assertThat(assembleVariantStruct(castShredded(v2, variantSchema),
variantSchema, fields2))
+ .isEqualTo(genericRow);
+ }
+}