This is an automated email from the ASF dual-hosted git repository.

yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ac582e7107 [flink] Support read and write variant with Flink (#6108)
ac582e7107 is described below

commit ac582e7107dbb90356a0f2ebf3edc6bf7752d68c
Author: Xuannan <[email protected]>
AuthorDate: Wed Aug 27 09:23:08 2025 +0800

    [flink] Support read and write variant with Flink (#6108)
---
 .../flink/table/types/logical/VariantType.java     |  73 +++++++
 .../apache/paimon/flink/DataTypeToLogicalType.java |   2 +-
 .../org/apache/paimon/flink/FlinkRowWrapper.java   |   9 +-
 .../apache/paimon/flink/LogicalTypeToDataType.java |   5 +
 .../apache/paimon/flink/CatalogTableITCase.java    |  21 ++
 .../apache/paimon/flink/util/AbstractTestBase.java |   5 +
 .../org/apache/flink/table/data/ArrayData.java     | 242 +++++++++++++++++++++
 .../flink/table/types/logical/VariantType.java     |  73 +++++++
 .../apache/flink/types/variant/BinaryVariant.java  |   8 +
 .../org/apache/flink/types/variant/Variant.java    |   6 +-
 .../apache/flink/types/variant/VariantBuilder.java | 104 +++++++++
 11 files changed, 544 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
new file mode 100644
index 0000000000..70ba9ad729
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/table/types/logical/VariantType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY, 
MAP, and scalar types.
+ * VARIANT can only store MAP types with keys of type STRING.
+ */
+@PublicEvolving
+public class VariantType extends LogicalType {
+
+    public VariantType(boolean isNullable) {
+        super(isNullable, LogicalTypeRoot.UNRESOLVED);
+    }
+
+    @Override
+    public LogicalType copy(boolean b) {
+        return null;
+    }
+
+    @Override
+    public String asSerializableString() {
+        return "";
+    }
+
+    @Override
+    public boolean supportsInputConversion(Class<?> aClass) {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOutputConversion(Class<?> aClass) {
+        return false;
+    }
+
+    @Override
+    public Class<?> getDefaultConversion() {
+        return null;
+    }
+
+    @Override
+    public List<LogicalType> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(LogicalTypeVisitor<R> logicalTypeVisitor) {
+        return null;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 8ffc9c4efc..2039daec92 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -143,7 +143,7 @@ public class DataTypeToLogicalType implements 
DataTypeVisitor<LogicalType> {
 
     @Override
     public LogicalType visit(VariantType variantType) {
-        throw new UnsupportedOperationException("VariantType is not 
supported.");
+        return new 
org.apache.flink.table.types.logical.VariantType(variantType.isNullable());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index 91ab7cfd5b..b8826232d0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.RowKind;
 
@@ -121,7 +122,9 @@ public class FlinkRowWrapper implements InternalRow {
 
     @Override
     public Variant getVariant(int pos) {
-        throw new UnsupportedOperationException();
+        org.apache.flink.types.variant.BinaryVariant variant =
+                (org.apache.flink.types.variant.BinaryVariant) 
row.getVariant(pos);
+        return new GenericVariant(variant.getValue(), variant.getMetadata());
     }
 
     @Override
@@ -214,7 +217,9 @@ public class FlinkRowWrapper implements InternalRow {
 
         @Override
         public Variant getVariant(int pos) {
-            throw new UnsupportedOperationException();
+            org.apache.flink.types.variant.BinaryVariant variant =
+                    (org.apache.flink.types.variant.BinaryVariant) 
array.getVariant(pos);
+            return new GenericVariant(variant.getValue(), 
variant.getMetadata());
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
index 23dfb2b077..3a54c975d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeToDataType.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import java.util.ArrayList;
@@ -177,6 +178,10 @@ public class LogicalTypeToDataType extends 
LogicalTypeDefaultVisitor<DataType> {
         return new org.apache.paimon.types.RowType(rowType.isNullable(), 
dataFields);
     }
 
+    public DataType visit(VariantType variantType) {
+        return new org.apache.paimon.types.VariantType();
+    }
+
     @Override
     protected DataType defaultMethod(LogicalType logicalType) {
         throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 989446c059..bcdf1e2ef6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -31,8 +31,11 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.variant.Variant;
+import org.apache.flink.types.variant.VariantBuilder;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIf;
 
 import javax.annotation.Nonnull;
 
@@ -1219,6 +1222,24 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
         assertThat(row.getField(6)).isNotNull();
     }
 
+    @Test
+    @EnabledIf("isFlink2_1OrAbove")
+    void testReadWriteVariant() {
+        sql("CREATE TABLE t (v VARIANT)");
+
+        sql(
+                "INSERT INTO t SELECT PARSE_JSON(s) FROM (VALUES 
('{\"a\":1}'), ('{\"a\":2}'), ('\"hello\"')) AS T(s)");
+
+        List<Row> rows = sql("SELECT * FROM t");
+
+        VariantBuilder builder = Variant.newBuilder();
+        assertThat(rows)
+                .containsExactlyInAnyOrder(
+                        Row.of(builder.object().add("a", builder.of((byte) 
1)).build()),
+                        Row.of(builder.object().add("a", builder.of((byte) 
2)).build()),
+                        Row.of(builder.of("hello")));
+    }
+
     private void innerTestReadOptimizedTableAndCheckData(String 
insertTableName) {
         // full compaction will always be performed at the end of batch jobs, 
as long as
         // full-compaction.delta-commits is set, regardless of its value
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index ee838ed682..e9fb0a55c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -55,6 +55,11 @@ import java.util.UUID;
 /** Similar to Flink's AbstractTestBase but using Junit5. */
 public class AbstractTestBase {
 
+    protected static boolean isFlink2_1OrAbove() {
+        String flinkVersion = System.getProperty("test.flink.main.version");
+        return flinkVersion.compareTo("2.1") >= 0;
+    }
+
     private static final int DEFAULT_PARALLELISM = 16;
 
     @RegisterExtension
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
new file mode 100644
index 0000000000..91db97cb3b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/ArrayData.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.variant.Variant;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/**
+ * Base interface of an internal data structure representing data of {@link 
ArrayType}.
+ *
+ * <p>Note: All elements of this data structure must be internal data 
structures and must be of the
+ * same type. See {@link RowData} for more information about internal data 
structures.
+ *
+ * <p>Use {@link GenericArrayData} to construct instances of this interface 
from regular Java
+ * arrays.
+ */
+@PublicEvolving
+public interface ArrayData {
+
+    /** Returns the number of elements in this array. */
+    int size();
+
+    // 
------------------------------------------------------------------------------------------
+    // Read-only accessor methods
+    // 
------------------------------------------------------------------------------------------
+
+    /** Returns true if the element is null at the given position. */
+    boolean isNullAt(int pos);
+
+    /** Returns the boolean value at the given position. */
+    boolean getBoolean(int pos);
+
+    /** Returns the byte value at the given position. */
+    byte getByte(int pos);
+
+    /** Returns the short value at the given position. */
+    short getShort(int pos);
+
+    /** Returns the integer value at the given position. */
+    int getInt(int pos);
+
+    /** Returns the long value at the given position. */
+    long getLong(int pos);
+
+    /** Returns the float value at the given position. */
+    float getFloat(int pos);
+
+    /** Returns the double value at the given position. */
+    double getDouble(int pos);
+
+    /** Returns the string value at the given position. */
+    StringData getString(int pos);
+
+    /**
+     * Returns the decimal value at the given position.
+     *
+     * <p>The precision and scale are required to determine whether the 
decimal value was stored in
+     * a compact representation (see {@link DecimalData}).
+     */
+    DecimalData getDecimal(int pos, int precision, int scale);
+
+    /**
+     * Returns the timestamp value at the given position.
+     *
+     * <p>The precision is required to determine whether the timestamp value 
was stored in a compact
+     * representation (see {@link TimestampData}).
+     */
+    TimestampData getTimestamp(int pos, int precision);
+
+    /** Returns the raw value at the given position. */
+    <T> RawValueData<T> getRawValue(int pos);
+
+    /** Returns the Variant value at the given position. */
+    Variant getVariant(int i);
+
+    /** Returns the binary value at the given position. */
+    byte[] getBinary(int pos);
+
+    /** Returns the array value at the given position. */
+    ArrayData getArray(int pos);
+
+    /** Returns the map value at the given position. */
+    MapData getMap(int pos);
+
+    /**
+     * Returns the row value at the given position.
+     *
+     * <p>The number of fields is required to correctly extract the row.
+     */
+    RowData getRow(int pos, int numFields);
+
+    // 
------------------------------------------------------------------------------------------
+    // Conversion Utilities
+    // 
------------------------------------------------------------------------------------------
+
+    boolean[] toBooleanArray();
+
+    byte[] toByteArray();
+
+    short[] toShortArray();
+
+    int[] toIntArray();
+
+    long[] toLongArray();
+
+    float[] toFloatArray();
+
+    double[] toDoubleArray();
+
+    // 
------------------------------------------------------------------------------------------
+    // Access Utilities
+    // 
------------------------------------------------------------------------------------------
+
+    /**
+     * Creates an accessor for getting elements in an internal array data 
structure at the given
+     * position.
+     *
+     * @param elementType the element type of the array
+     */
+    static ElementGetter createElementGetter(LogicalType elementType) {
+        final ElementGetter elementGetter;
+        // ordered by type root definition
+        switch (elementType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                elementGetter = ArrayData::getString;
+                break;
+            case BOOLEAN:
+                elementGetter = ArrayData::getBoolean;
+                break;
+            case BINARY:
+            case VARBINARY:
+                elementGetter = ArrayData::getBinary;
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(elementType);
+                final int decimalScale = getScale(elementType);
+                elementGetter =
+                        (array, pos) -> array.getDecimal(pos, 
decimalPrecision, decimalScale);
+                break;
+            case TINYINT:
+                elementGetter = ArrayData::getByte;
+                break;
+            case SMALLINT:
+                elementGetter = ArrayData::getShort;
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                elementGetter = ArrayData::getInt;
+                break;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                elementGetter = ArrayData::getLong;
+                break;
+            case FLOAT:
+                elementGetter = ArrayData::getFloat;
+                break;
+            case DOUBLE:
+                elementGetter = ArrayData::getDouble;
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(elementType);
+                elementGetter = (array, pos) -> array.getTimestamp(pos, 
timestampPrecision);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            case ARRAY:
+                elementGetter = ArrayData::getArray;
+                break;
+            case MULTISET:
+            case MAP:
+                elementGetter = ArrayData::getMap;
+                break;
+            case ROW:
+            case STRUCTURED_TYPE:
+                final int rowFieldCount = getFieldCount(elementType);
+                elementGetter = (array, pos) -> array.getRow(pos, 
rowFieldCount);
+                break;
+            case DISTINCT_TYPE:
+                elementGetter = createElementGetter(((DistinctType) 
elementType).getSourceType());
+                break;
+            case RAW:
+                elementGetter = ArrayData::getRawValue;
+                break;
+            case NULL:
+            case SYMBOL:
+            case UNRESOLVED:
+            default:
+                throw new IllegalArgumentException();
+        }
+        if (!elementType.isNullable()) {
+            return elementGetter;
+        }
+        return (array, pos) -> {
+            if (array.isNullAt(pos)) {
+                return null;
+            }
+            return elementGetter.getElementOrNull(array, pos);
+        };
+    }
+
+    /**
+     * Accessor for getting the elements of an array during runtime.
+     *
+     * @see #createElementGetter(LogicalType)
+     */
+    @PublicEvolving
+    interface ElementGetter extends Serializable {
+        @Nullable
+        Object getElementOrNull(ArrayData array, int pos);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
new file mode 100644
index 0000000000..70ba9ad729
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/types/logical/VariantType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data type of semi-structured data.
+ *
+ * <p>The type supports storing any semi-structured data, including ARRAY, 
MAP, and scalar types.
+ * VARIANT can only store MAP types with keys of type STRING.
+ */
+@PublicEvolving
+public class VariantType extends LogicalType {
+
+    public VariantType(boolean isNullable) {
+        super(isNullable, LogicalTypeRoot.UNRESOLVED);
+    }
+
+    @Override
+    public LogicalType copy(boolean b) {
+        return null;
+    }
+
+    @Override
+    public String asSerializableString() {
+        return "";
+    }
+
+    @Override
+    public boolean supportsInputConversion(Class<?> aClass) {
+        return false;
+    }
+
+    @Override
+    public boolean supportsOutputConversion(Class<?> aClass) {
+        return false;
+    }
+
+    @Override
+    public Class<?> getDefaultConversion() {
+        return null;
+    }
+
+    @Override
+    public List<LogicalType> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(LogicalTypeVisitor<R> logicalTypeVisitor) {
+        return null;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
index 1311188617..41440159ea 100644
--- 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -29,4 +29,12 @@ package org.apache.flink.types.variant;
  */
 public class BinaryVariant implements Variant {
     public BinaryVariant(byte[] value, byte[] metadata) {}
+
+    public byte[] getValue() {
+        throw new UnsupportedOperationException();
+    }
+
+    public byte[] getMetadata() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
index 9f6f970b69..594532e9e1 100644
--- 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -24,4 +24,8 @@ import java.io.Serializable;
 
 /** Variant represent a semi-structured data. */
 @PublicEvolving
-public interface Variant extends Serializable {}
+public interface Variant extends Serializable {
+    static VariantBuilder newBuilder() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
new file mode 100644
index 0000000000..550aae226a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/VariantBuilder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/** Builder for variants. */
+@PublicEvolving
+public interface VariantBuilder {
+
+    /** Create a variant from a byte. */
+    Variant of(byte b);
+
+    /** Create a variant from a short. */
+    Variant of(short s);
+
+    /** Create a variant from a int. */
+    Variant of(int i);
+
+    /** Create a variant from a long. */
+    Variant of(long l);
+
+    /** Create a variant from a string. */
+    Variant of(String s);
+
+    /** Create a variant from a double. */
+    Variant of(double d);
+
+    /** Create a variant from a float. */
+    Variant of(float f);
+
+    /** Create a variant from a byte array. */
+    Variant of(byte[] bytes);
+
+    /** Create a variant from a boolean. */
+    Variant of(boolean b);
+
+    /** Create a variant from a BigDecimal. */
+    Variant of(BigDecimal bigDecimal);
+
+    /** Create a variant from an Instant. */
+    Variant of(Instant instant);
+
+    /** Create a variant from a LocalDate. */
+    Variant of(LocalDate localDate);
+
+    /** Create a variant from a LocalDateTime. */
+    Variant of(LocalDateTime localDateTime);
+
+    /** Create a variant of null. */
+    Variant ofNull();
+
+    /** Get the builder of a variant object. */
+    VariantObjectBuilder object();
+
+    /** Get the builder of a variant object. */
+    VariantObjectBuilder object(boolean allowDuplicateKeys);
+
+    /** Get the builder for a variant array. */
+    VariantArrayBuilder array();
+
+    /** Builder for a variant object. */
+    @PublicEvolving
+    interface VariantObjectBuilder {
+
+        /** Add a field to the object. */
+        VariantObjectBuilder add(String key, Variant value);
+
+        /** Build the variant object. */
+        Variant build();
+    }
+
+    /** Builder for a variant array. */
+    @PublicEvolving
+    interface VariantArrayBuilder {
+
+        /** Add a value to the array. */
+        VariantArrayBuilder add(Variant value);
+
+        /** Build the variant array. */
+        Variant build();
+    }
+}

Reply via email to