This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ac582e7107 [flink] Support read and write variant with Flink (#6108)
ac582e7107 is described below
commit ac582e7107dbb90356a0f2ebf3edc6bf7752d68c
Author: Xuannan <[email protected]>
AuthorDate: Wed Aug 27 09:23:08 2025 +0800
[flink] Support read and write variant with Flink (#6108)
---
.../flink/table/types/logical/VariantType.java | 73 +++++++
.../apache/paimon/flink/DataTypeToLogicalType.java | 2 +-
.../org/apache/paimon/flink/FlinkRowWrapper.java | 9 +-
.../apache/paimon/flink/LogicalTypeToDataType.java | 5 +
.../apache/paimon/flink/CatalogTableITCase.java | 21 ++
.../apache/paimon/flink/util/AbstractTestBase.java | 5 +
.../org/apache/flink/table/data/ArrayData.java | 242 +++++++++++++++++++++
.../flink/table/types/logical/VariantType.java | 73 +++++++
.../apache/flink/types/variant/BinaryVariant.java | 8 +
.../org/apache/flink/types/variant/Variant.java | 6 +-
.../apache/flink/types/variant/VariantBuilder.java | 104 +++++++++
11 files changed, 544 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
new file mode 100644
index 0000000000..70ba9ad729
--- /dev/null
+++
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY,
MAP, and scalar types.
+ * VARIANT can only store MAP types with keys of type STRING.
+ */
+@PublicEvolving
+public class VariantType extends LogicalType {
+
+ public VariantType(boolean isNullable) {
+ super(isNullable, LogicalTypeRoot.UNRESOLVED);
+ }
+
+ @Override
+ public LogicalType copy(boolean b) {
+ return null;
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class<?> aClass) {
+ return false;
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class<?> aClass) {
+ return false;
+ }
+
+ @Override
+ public Class<?> getDefaultConversion() {
+ return null;
+ }
+
+ @Override
+ public List<LogicalType> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(LogicalTypeVisitor<R> logicalTypeVisitor) {
+ return null;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 8ffc9c4efc..2039daec92 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -143,7 +143,7 @@ public class DataTypeToLogicalType implements
DataTypeVisitor<LogicalType> {
@Override
public LogicalType visit(VariantType variantType) {
- throw new UnsupportedOperationException("VariantType is not
supported.");
+ return new
org.apache.flink.table.types.logical.VariantType(variantType.isNullable());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index 91ab7cfd5b..b8826232d0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
@@ -121,7 +122,9 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public Variant getVariant(int pos) {
- throw new UnsupportedOperationException();
+ org.apache.flink.types.variant.BinaryVariant variant =
+ (org.apache.flink.types.variant.BinaryVariant)
row.getVariant(pos);
+ return new GenericVariant(variant.getValue(), variant.getMetadata());
}
@Override
@@ -214,7 +217,9 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public Variant getVariant(int pos) {
- throw new UnsupportedOperationException();
+ org.apache.flink.types.variant.BinaryVariant variant =
+ (org.apache.flink.types.variant.BinaryVariant)
array.getVariant(pos);
+ return new GenericVariant(variant.getValue(),
variant.getMetadata());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
index 23dfb2b077..3a54c975d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import java.util.ArrayList;
@@ -177,6 +178,10 @@ public class LogicalTypeToDataType extends
LogicalTypeDefaultVisitor<DataType> {
return new org.apache.paimon.types.RowType(rowType.isNullable(),
dataFields);
}
+ public DataType visit(VariantType variantType) {
+ return new org.apache.paimon.types.VariantType();
+ }
+
@Override
protected DataType defaultMethod(LogicalType logicalType) {
throw new UnsupportedOperationException("Unsupported type: " +
logicalType);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 989446c059..bcdf1e2ef6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -31,8 +31,11 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.types.variant.VariantBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIf;
import javax.annotation.Nonnull;
@@ -1219,6 +1222,24 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
assertThat(row.getField(6)).isNotNull();
}
+ @Test
+ @EnabledIf("isFlink2_1OrAbove")
+ void testReadWriteVariant() {
+ sql("CREATE TABLE t (v VARIANT)");
+
+ sql(
+ "INSERT INTO t SELECT PARSE_JSON(s) FROM (VALUES
('{\"a\":1}'), ('{\"a\":2}'), ('\"hello\"')) AS T(s)");
+
+ List<Row> rows = sql("SELECT * FROM t");
+
+ VariantBuilder builder = Variant.newBuilder();
+ assertThat(rows)
+ .containsExactlyInAnyOrder(
+ Row.of(builder.object().add("a", builder.of((byte)
1)).build()),
+ Row.of(builder.object().add("a", builder.of((byte)
2)).build()),
+ Row.of(builder.of("hello")));
+ }
+
private void innerTestReadOptimizedTableAndCheckData(String
insertTableName) {
// full compaction will always be performed at the end of batch jobs,
as long as
// full-compaction.delta-commits is set, regardless of its value
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index ee838ed682..e9fb0a55c6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -55,6 +55,11 @@ import java.util.UUID;
/** Similar to Flink's AbstractTestBase but using Junit5. */
public class AbstractTestBase {
+ protected static boolean isFlink2_1OrAbove() {
+ String flinkVersion = System.getProperty("test.flink.main.version");
+ return flinkVersion.compareTo("2.1") >= 0;
+ }
+
private static final int DEFAULT_PARALLELISM = 16;
@RegisterExtension
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
new file mode 100644
index 0000000000..91db97cb3b
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.variant.Variant;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/**
+ * Base interface of an internal data structure representing data of {@link
ArrayType}.
+ *
+ * <p>Note: All elements of this data structure must be internal data
structures and must be of the
+ * same type. See {@link RowData} for more information about internal data
structures.
+ *
+ * <p>Use {@link GenericArrayData} to construct instances of this interface
from regular Java
+ * arrays.
+ */
+@PublicEvolving
+public interface ArrayData {
+
+ /** Returns the number of elements in this array. */
+ int size();
+
+ //
------------------------------------------------------------------------------------------
+ // Read-only accessor methods
+ //
------------------------------------------------------------------------------------------
+
+ /** Returns true if the element is null at the given position. */
+ boolean isNullAt(int pos);
+
+ /** Returns the boolean value at the given position. */
+ boolean getBoolean(int pos);
+
+ /** Returns the byte value at the given position. */
+ byte getByte(int pos);
+
+ /** Returns the short value at the given position. */
+ short getShort(int pos);
+
+ /** Returns the integer value at the given position. */
+ int getInt(int pos);
+
+ /** Returns the long value at the given position. */
+ long getLong(int pos);
+
+ /** Returns the float value at the given position. */
+ float getFloat(int pos);
+
+ /** Returns the double value at the given position. */
+ double getDouble(int pos);
+
+ /** Returns the string value at the given position. */
+ StringData getString(int pos);
+
+ /**
+ * Returns the decimal value at the given position.
+ *
+ * <p>The precision and scale are required to determine whether the
decimal value was stored in
+ * a compact representation (see {@link DecimalData}).
+ */
+ DecimalData getDecimal(int pos, int precision, int scale);
+
+ /**
+ * Returns the timestamp value at the given position.
+ *
+ * <p>The precision is required to determine whether the timestamp value
was stored in a compact
+ * representation (see {@link TimestampData}).
+ */
+ TimestampData getTimestamp(int pos, int precision);
+
+ /** Returns the raw value at the given position. */
+ <T> RawValueData<T> getRawValue(int pos);
+
+ /** Returns the Variant value at the given position. */
+ Variant getVariant(int i);
+
+ /** Returns the binary value at the given position. */
+ byte[] getBinary(int pos);
+
+ /** Returns the array value at the given position. */
+ ArrayData getArray(int pos);
+
+ /** Returns the map value at the given position. */
+ MapData getMap(int pos);
+
+ /**
+ * Returns the row value at the given position.
+ *
+ * <p>The number of fields is required to correctly extract the row.
+ */
+ RowData getRow(int pos, int numFields);
+
+ //
------------------------------------------------------------------------------------------
+ // Conversion Utilities
+ //
------------------------------------------------------------------------------------------
+
+ boolean[] toBooleanArray();
+
+ byte[] toByteArray();
+
+ short[] toShortArray();
+
+ int[] toIntArray();
+
+ long[] toLongArray();
+
+ float[] toFloatArray();
+
+ double[] toDoubleArray();
+
+ //
------------------------------------------------------------------------------------------
+ // Access Utilities
+ //
------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an accessor for getting elements in an internal array data
structure at the given
+ * position.
+ *
+ * @param elementType the element type of the array
+ */
+ static ElementGetter createElementGetter(LogicalType elementType) {
+ final ElementGetter elementGetter;
+ // ordered by type root definition
+ switch (elementType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ elementGetter = ArrayData::getString;
+ break;
+ case BOOLEAN:
+ elementGetter = ArrayData::getBoolean;
+ break;
+ case BINARY:
+ case VARBINARY:
+ elementGetter = ArrayData::getBinary;
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(elementType);
+ final int decimalScale = getScale(elementType);
+ elementGetter =
+ (array, pos) -> array.getDecimal(pos,
decimalPrecision, decimalScale);
+ break;
+ case TINYINT:
+ elementGetter = ArrayData::getByte;
+ break;
+ case SMALLINT:
+ elementGetter = ArrayData::getShort;
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTERVAL_YEAR_MONTH:
+ elementGetter = ArrayData::getInt;
+ break;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ elementGetter = ArrayData::getLong;
+ break;
+ case FLOAT:
+ elementGetter = ArrayData::getFloat;
+ break;
+ case DOUBLE:
+ elementGetter = ArrayData::getDouble;
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(elementType);
+ elementGetter = (array, pos) -> array.getTimestamp(pos,
timestampPrecision);
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException();
+ case ARRAY:
+ elementGetter = ArrayData::getArray;
+ break;
+ case MULTISET:
+ case MAP:
+ elementGetter = ArrayData::getMap;
+ break;
+ case ROW:
+ case STRUCTURED_TYPE:
+ final int rowFieldCount = getFieldCount(elementType);
+ elementGetter = (array, pos) -> array.getRow(pos,
rowFieldCount);
+ break;
+ case DISTINCT_TYPE:
+ elementGetter = createElementGetter(((DistinctType)
elementType).getSourceType());
+ break;
+ case RAW:
+ elementGetter = ArrayData::getRawValue;
+ break;
+ case NULL:
+ case SYMBOL:
+ case UNRESOLVED:
+ default:
+ throw new IllegalArgumentException();
+ }
+ if (!elementType.isNullable()) {
+ return elementGetter;
+ }
+ return (array, pos) -> {
+ if (array.isNullAt(pos)) {
+ return null;
+ }
+ return elementGetter.getElementOrNull(array, pos);
+ };
+ }
+
+ /**
+ * Accessor for getting the elements of an array during runtime.
+ *
+ * @see #createElementGetter(LogicalType)
+ */
+ @PublicEvolving
+ interface ElementGetter extends Serializable {
+ @Nullable
+ Object getElementOrNull(ArrayData array, int pos);
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
new file mode 100644
index 0000000000..70ba9ad729
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY,
MAP, and scalar types.
+ * VARIANT can only store MAP types with keys of type STRING.
+ */
+@PublicEvolving
+public class VariantType extends LogicalType {
+
+ public VariantType(boolean isNullable) {
+ super(isNullable, LogicalTypeRoot.UNRESOLVED);
+ }
+
+ @Override
+ public LogicalType copy(boolean b) {
+ return null;
+ }
+
+ @Override
+ public String asSerializableString() {
+ return "";
+ }
+
+ @Override
+ public boolean supportsInputConversion(Class<?> aClass) {
+ return false;
+ }
+
+ @Override
+ public boolean supportsOutputConversion(Class<?> aClass) {
+ return false;
+ }
+
+ @Override
+ public Class<?> getDefaultConversion() {
+ return null;
+ }
+
+ @Override
+ public List<LogicalType> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(LogicalTypeVisitor<R> logicalTypeVisitor) {
+ return null;
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
index 1311188617..41440159ea 100644
---
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -29,4 +29,12 @@ package org.apache.flink.types.variant;
*/
public class BinaryVariant implements Variant {
public BinaryVariant(byte[] value, byte[] metadata) {}
+
+ public byte[] getValue() {
+ throw new UnsupportedOperationException();
+ }
+
+ public byte[] getMetadata() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
index 9f6f970b69..594532e9e1 100644
---
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -24,4 +24,8 @@ import java.io.Serializable;
/** Variant represent a semi-structured data. */
@PublicEvolving
-public interface Variant extends Serializable {}
+public interface Variant extends Serializable {
+ static VariantBuilder newBuilder() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
new file mode 100644
index 0000000000..550aae226a
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Builder for variants. */
+@PublicEvolving
+public interface VariantBuilder {
+
+ /** Create a variant from a byte. */
+ Variant of(byte b);
+
+ /** Create a variant from a short. */
+ Variant of(short s);
+
+ /** Create a variant from a int. */
+ Variant of(int i);
+
+ /** Create a variant from a long. */
+ Variant of(long l);
+
+ /** Create a variant from a string. */
+ Variant of(String s);
+
+ /** Create a variant from a double. */
+ Variant of(double d);
+
+ /** Create a variant from a float. */
+ Variant of(float f);
+
+ /** Create a variant from a byte array. */
+ Variant of(byte[] bytes);
+
+ /** Create a variant from a boolean. */
+ Variant of(boolean b);
+
+ /** Create a variant from a BigDecimal. */
+ Variant of(BigDecimal bigDecimal);
+
+ /** Create a variant from an Instant. */
+ Variant of(Instant instant);
+
+ /** Create a variant from a LocalDate. */
+ Variant of(LocalDate localDate);
+
+ /** Create a variant from a LocalDateTime. */
+ Variant of(LocalDateTime localDateTime);
+
+ /** Create a variant of null. */
+ Variant ofNull();
+
+ /** Get the builder of a variant object. */
+ VariantObjectBuilder object();
+
+ /** Get the builder of a variant object. */
+ VariantObjectBuilder object(boolean allowDuplicateKeys);
+
+ /** Get the builder for a variant array. */
+ VariantArrayBuilder array();
+
+ /** Builder for a variant object. */
+ @PublicEvolving
+ interface VariantObjectBuilder {
+
+ /** Add a field to the object. */
+ VariantObjectBuilder add(String key, Variant value);
+
+ /** Build the variant object. */
+ Variant build();
+ }
+
+ /** Builder for a variant array. */
+ @PublicEvolving
+ interface VariantArrayBuilder {
+
+ /** Add a value to the array. */
+ VariantArrayBuilder add(Variant value);
+
+ /** Build the variant array. */
+ Variant build();
+ }
+}