This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 39cba699d0 [flink] Bump Flink version to 2.1 (#6027)
39cba699d0 is described below
commit 39cba699d0861322c031868fcb590dd1a36f0b32
Author: Xuannan <[email protected]>
AuthorDate: Wed Aug 20 13:52:25 2025 +0800
[flink] Bump Flink version to 2.1 (#6027)
---
.github/workflows/e2e-tests-flink-2.x-jdk11.yml | 2 +-
.github/workflows/utitcase-flink-2.x-jdk11.yml | 2 +-
paimon-e2e-tests/pom.xml | 10 +
.../apache/flink/types/variant/BinaryVariant.java | 32 +++
.../org/apache/flink/types/variant/Variant.java | 27 ++
paimon-flink/paimon-flink-2.1/pom.xml | 93 ++++++
paimon-flink/paimon-flink-common/pom.xml | 12 +-
.../java/org/apache/paimon/flink/FlinkRowData.java | 12 +
.../paimon/flink/NestedProjectedRowData.java | 5 +
.../org/apache/paimon/flink/ProjectedRowData.java | 5 +
.../org/apache/paimon/flink/ChangelogModeTest.java | 10 +-
.../apache/paimon/flink/SchemaChangeITCase.java | 3 -
.../apache/paimon/flink/SerializableRowData.java | 6 +
.../java/org/apache/flink/table/data/RowData.java | 314 +++++++++++++++++++++
.../apache/flink/types/variant/BinaryVariant.java | 32 +++
.../org/apache/flink/types/variant/Variant.java | 27 ++
pom.xml | 8 +-
17 files changed, 585 insertions(+), 15 deletions(-)
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 6333f9f4b9..5f97778467 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -39,7 +39,7 @@ jobs:
fail-fast: true
matrix:
# Last element should be the current default flink version
- flink_version: [ '2.0' ]
+ flink_version: [ '2.0', '2.1' ]
steps:
- name: Checkout code
uses: actions/checkout@v4
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 40cd479001..8e90e679cb 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -56,7 +56,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in 2.0 common; do
+ for suffix in 2.0 2.1 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 6e5a305a22..c46668e953 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -294,6 +294,16 @@ under the License.
<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test
against different Flink versions -->
+
+ <profile>
+ <id>flink-2.0</id>
+ <properties>
+ <test.flink.main.version>2.0</test.flink.main.version>
+ <test.flink.version>2.0.0</test.flink.version>
+
<test.flink.connector.kafka.version>4.0.0-${test.flink.main.version}</test.flink.connector.kafka.version>
+ </properties>
+ </profile>
+
<profile>
<id>flink-1.19</id>
<properties>
diff --git
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
new file mode 100644
index 0000000000..1311188617
--- /dev/null
+++
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the
detailed binary
+ * format given the version flag.
+ *
+ * @see <a
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Variant
+ * Binary Encoding</a> for the detail layout of the data structure.
+ */
+public class BinaryVariant implements Variant {
+ public BinaryVariant(byte[] value, byte[] metadata) {}
+}
diff --git
a/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
new file mode 100644
index 0000000000..9f6f970b69
--- /dev/null
+++
b/paimon-flink/paimon-flink-2.0/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {}
diff --git a/paimon-flink/paimon-flink-2.1/pom.xml
b/paimon-flink/paimon-flink-2.1/pom.xml
new file mode 100644
index 0000000000..062154d9fe
--- /dev/null
+++ b/paimon-flink/paimon-flink-2.1/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink</artifactId>
+ <version>1.3-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <artifactId>paimon-flink-2.1</artifactId>
+ <name>Paimon : Flink : 2.1</name>
+
+ <properties>
+ <flink.version>2.1.0</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink2-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-paimon</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.paimon:paimon-flink-common</include>
+
<include>org.apache.paimon:paimon-flink2-common</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 28ce295453..715718b9cb 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -38,6 +38,12 @@ under the License.
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>${paimon-flinkx-common}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
@@ -81,12 +87,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.paimon</groupId>
- <artifactId>${paimon-flinkx-common}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>${paimon-flinkx-common}</artifactId>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index 6e71b32922..a4d81364b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.BinaryVariant;
+import org.apache.flink.types.variant.Variant;
import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
@@ -145,6 +147,11 @@ public class FlinkRowData implements RowData {
return new FlinkRowData(row.getRow(pos, numFields));
}
+ public Variant getVariant(int pos) {
+ org.apache.paimon.data.variant.Variant variant = row.getVariant(pos);
+ return new BinaryVariant(variant.value(), variant.metadata());
+ }
+
private static class FlinkArrayData implements ArrayData {
private final InternalArray array;
@@ -218,6 +225,11 @@ public class FlinkRowData implements RowData {
throw new UnsupportedOperationException();
}
+ public Variant getVariant(int pos) {
+ org.apache.paimon.data.variant.Variant variant =
array.getVariant(pos);
+ return new BinaryVariant(variant.value(), variant.metadata());
+ }
+
@Override
public byte[] getBinary(int pos) {
return array.getBinary(pos);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
index 810cc1ae42..b9fa8b9d6b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
import javax.annotation.Nullable;
@@ -195,6 +196,10 @@ public class NestedProjectedRowData implements RowData,
Serializable {
return getFieldAs(pos, (rowData, internalPos) ->
rowData.getRow(internalPos, numFields));
}
+ public Variant getVariant(int pos) {
+ return getFieldAs(pos, RowData::getVariant);
+ }
+
private @Nullable RowData extractInternalRow(int pos) {
int[] projectedField = projectedFields[pos];
RowData rowData = this.row;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
index 6bdbe58801..03d8420cef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
import java.util.Arrays;
@@ -156,6 +157,10 @@ public class ProjectedRowData implements RowData {
return row.getRow(indexMapping[pos], numFields);
}
+ public Variant getVariant(int pos) {
+ return row.getVariant(indexMapping[pos]);
+ }
+
@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException("Projected row data cannot be
compared");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
index 9022bbb505..7a24efa0fa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.types.RowType;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -76,7 +77,14 @@ public class ChangelogModeTest {
@Test
public void testDefault() throws Exception {
- test(new Options(), ChangelogMode.upsert(), ChangelogMode.upsert());
+ test(
+ new Options(),
+ ChangelogMode.upsert(),
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build());
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index da260851a8..8edc85a3c2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -685,7 +684,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
"INSERT INTO T VALUES('aaa', 'bbb',
'ccc', 1, CAST(NULL AS FLOAT))"))
.satisfies(
anyCauseMatches(
- TableException.class,
"Column 'e' is NOT NULL, however, a null value
is being written into it."));
// Not null -> nullable
@@ -718,7 +716,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
"INSERT INTO T VALUES('aaa', 'bbb',
CAST(NULL AS STRING), 1, CAST(NULL AS FLOAT))"))
.satisfies(
anyCauseMatches(
- TableException.class,
"Column 'c' is NOT NULL, however, a null value
is being written into it."));
// Insert a null value
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
index 75b96cbe02..3b0db92a02 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -153,4 +154,9 @@ public class SerializableRowData implements RowData,
Serializable {
public RowData getRow(int i, int rowArity) {
return row.getRow(i, rowArity);
}
+
+ @Override
+ public Variant getVariant(int i) {
+ return row.getVariant(i);
+ }
}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
new file mode 100644
index 0000000000..f0dd820066
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/data/RowData.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.types.variant.Variant;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+
+/**
+ * Base interface for an internal data structure representing data of {@link
RowType} and other
+ * (possibly nested) structured types such as {@link StructuredType} in the
table ecosystem.
+ *
+ * <p>All top-level records that are travelling through Table API or SQL
pipelines during runtime
+ * are instances of this interface. Each {@link RowData} contains a {@link
RowKind} which represents
+ * the kind of change that a row describes in a changelog. The {@link RowKind}
is just metadata
+ * information of row and thus not part of the table's schema, i.e., not a
dedicated field.
+ *
+ * <p>Note: All fields of this data structure must be internal data structures.
+ *
+ * <p>The {@link RowData} interface has different implementations which are
designed for different
+ * scenarios:
+ *
+ * <ul>
+ * <li>The binary-oriented implementation {@code BinaryRowData} is backed by
references to {@link
+ * MemorySegment} instead of using Java objects to reduce the
serialization/deserialization
+ * overhead.
+ * <li>The object-oriented implementation {@link GenericRowData} is backed
by an array of Java
+ * {@link Object} which is easy to construct and efficient to update.
+ * </ul>
+ *
+ * <p>{@link GenericRowData} is intended for public use and has stable
behavior. It is recommended
+ * to construct instances of {@link RowData} with this class if internal data
structures are
+ * required.
+ *
+ * <p>The mappings from Flink's Table API and SQL data types to the internal
data structures are
+ * listed in the following table:
+ *
+ * <pre>
+ * +--------------------------------+-----------------------------------------+
+ * | SQL Data Types | Internal Data Structures |
+ * +--------------------------------+-----------------------------------------+
+ * | BOOLEAN | boolean |
+ * +--------------------------------+-----------------------------------------+
+ * | CHAR / VARCHAR / STRING | {@link StringData} |
+ * +--------------------------------+-----------------------------------------+
+ * | BINARY / VARBINARY / BYTES | byte[] |
+ * +--------------------------------+-----------------------------------------+
+ * | DECIMAL | {@link DecimalData} |
+ * +--------------------------------+-----------------------------------------+
+ * | TINYINT | byte |
+ * +--------------------------------+-----------------------------------------+
+ * | SMALLINT | short |
+ * +--------------------------------+-----------------------------------------+
+ * | INT | int |
+ * +--------------------------------+-----------------------------------------+
+ * | BIGINT | long |
+ * +--------------------------------+-----------------------------------------+
+ * | FLOAT | float |
+ * +--------------------------------+-----------------------------------------+
+ * | DOUBLE | double |
+ * +--------------------------------+-----------------------------------------+
+ * | DATE | int (number of days since epoch) |
+ * +--------------------------------+-----------------------------------------+
+ * | TIME | int (number of milliseconds of the day) |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP | {@link TimestampData} |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP WITH LOCAL TIME ZONE | {@link TimestampData} |
+ * +--------------------------------+-----------------------------------------+
+ * | INTERVAL YEAR TO MONTH | int (number of months) |
+ * +--------------------------------+-----------------------------------------+
+ * | INTERVAL DAY TO MONTH | long (number of milliseconds) |
+ * +--------------------------------+-----------------------------------------+
+ * | ROW / structured types | {@link RowData} |
+ * +--------------------------------+-----------------------------------------+
+ * | ARRAY | {@link ArrayData} |
+ * +--------------------------------+-----------------------------------------+
+ * | MAP / MULTISET | {@link MapData} |
+ * +--------------------------------+-----------------------------------------+
+ * | RAW | {@link RawValueData} |
+ * +--------------------------------+-----------------------------------------+
+ * </pre>
+ *
+ * <p>Nullability is always handled by the container data structure.
+ */
+@PublicEvolving
+public interface RowData {
+
+ /**
+ * Returns the number of fields in this row.
+ *
+ * <p>The number does not include {@link RowKind}. It is kept separately.
+ */
+ int getArity();
+
+ /**
+ * Returns the kind of change that this row describes in a changelog.
+ *
+ * @see RowKind
+ */
+ RowKind getRowKind();
+
+ /**
+ * Sets the kind of change that this row describes in a changelog.
+ *
+ * @see RowKind
+ */
+ void setRowKind(RowKind kind);
+
+ //
------------------------------------------------------------------------------------------
+ // Read-only accessor methods
+ //
------------------------------------------------------------------------------------------
+
+ /** Returns true if the field is null at the given position. */
+ boolean isNullAt(int pos);
+
+ /** Returns the boolean value at the given position. */
+ boolean getBoolean(int pos);
+
+ /** Returns the byte value at the given position. */
+ byte getByte(int pos);
+
+ /** Returns the short value at the given position. */
+ short getShort(int pos);
+
+ /** Returns the integer value at the given position. */
+ int getInt(int pos);
+
+ /** Returns the long value at the given position. */
+ long getLong(int pos);
+
+ /** Returns the float value at the given position. */
+ float getFloat(int pos);
+
+ /** Returns the double value at the given position. */
+ double getDouble(int pos);
+
+ /** Returns the string value at the given position. */
+ StringData getString(int pos);
+
+ /**
+ * Returns the decimal value at the given position.
+ *
+ * <p>The precision and scale are required to determine whether the
decimal value was stored in
+ * a compact representation (see {@link DecimalData}).
+ */
+ DecimalData getDecimal(int pos, int precision, int scale);
+
+ /**
+ * Returns the timestamp value at the given position.
+ *
+ * <p>The precision is required to determine whether the timestamp value
was stored in a compact
+ * representation (see {@link TimestampData}).
+ */
+ TimestampData getTimestamp(int pos, int precision);
+
+ /** Returns the raw value at the given position. */
+ <T> RawValueData<T> getRawValue(int pos);
+
+ /** Returns the binary value at the given position. */
+ byte[] getBinary(int pos);
+
+ /** Returns the array value at the given position. */
+ ArrayData getArray(int pos);
+
+ /** Returns the map value at the given position. */
+ MapData getMap(int pos);
+
+ /**
+ * Returns the row value at the given position.
+ *
+ * <p>The number of fields is required to correctly extract the row.
+ */
+ RowData getRow(int pos, int numFields);
+
+ /** Returns the variant value at the given position. */
+ Variant getVariant(int pos);
+
+ //
------------------------------------------------------------------------------------------
+ // Access Utilities
+ //
------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an accessor for getting elements in an internal row data
structure at the given
+ * position.
+ *
+ * @param fieldType the element type of the row
+ * @param fieldPos the element position of the row
+ */
+ static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
+ final FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldGetter = row -> row.getString(fieldPos);
+ break;
+ case BOOLEAN:
+ fieldGetter = row -> row.getBoolean(fieldPos);
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldGetter = row -> row.getBinary(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter = row -> row.getDecimal(fieldPos,
decimalPrecision, decimalScale);
+ break;
+ case TINYINT:
+ fieldGetter = row -> row.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = row -> row.getShort(fieldPos);
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTERVAL_YEAR_MONTH:
+ fieldGetter = row -> row.getInt(fieldPos);
+ break;
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ fieldGetter = row -> row.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = row -> row.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = row -> row.getDouble(fieldPos);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(fieldType);
+ fieldGetter = row -> row.getTimestamp(fieldPos,
timestampPrecision);
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException();
+ case ARRAY:
+ fieldGetter = row -> row.getArray(fieldPos);
+ break;
+ case MULTISET:
+ case MAP:
+ fieldGetter = row -> row.getMap(fieldPos);
+ break;
+ case ROW:
+ case STRUCTURED_TYPE:
+ final int rowFieldCount = getFieldCount(fieldType);
+ fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+ break;
+ case DISTINCT_TYPE:
+ fieldGetter =
+ createFieldGetter(((DistinctType)
fieldType).getSourceType(), fieldPos);
+ break;
+ case RAW:
+ fieldGetter = row -> row.getRawValue(fieldPos);
+ break;
+ case NULL:
+ case SYMBOL:
+ case UNRESOLVED:
+ default:
+ throw new IllegalArgumentException();
+ }
+ if (!fieldType.isNullable()) {
+ return fieldGetter;
+ }
+ return row -> {
+ if (row.isNullAt(fieldPos)) {
+ return null;
+ }
+ return fieldGetter.getFieldOrNull(row);
+ };
+ }
+
+ /**
+ * Accessor for getting the field of a row during runtime.
+ *
+ * @see #createFieldGetter(LogicalType, int)
+ */
+ @PublicEvolving
+ interface FieldGetter extends Serializable {
+ @Nullable
+ Object getFieldOrNull(RowData row);
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
new file mode 100644
index 0000000000..1311188617
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/BinaryVariant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+/**
+ * A data structure that represents a semi-structured value. It consists of
two binary values: value
+ * and metadata. The value encodes types and values, but not field names. The
metadata currently
+ * contains a version flag and a list of field names. We can extend/modify the
detailed binary
+ * format given the version flag.
+ *
+ * @see <a
href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Variant
+ * Binary Encoding</a> for the detail layout of the data structure.
+ */
+public class BinaryVariant implements Variant {
+ public BinaryVariant(byte[] value, byte[] metadata) {}
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
new file mode 100644
index 0000000000..9f6f970b69
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/types/variant/Variant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.variant;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Variant represent a semi-structured data. */
+@PublicEvolving
+public interface Variant extends Serializable {}
diff --git a/pom.xml b/pom.xml
index bace1e5165..65453e7d3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -466,13 +466,15 @@ under the License.
<id>flink2</id>
<properties>
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
-
<paimon-flink-common.flink.version>2.0.0</paimon-flink-common.flink.version>
- <test.flink.main.version>2.0</test.flink.main.version>
- <test.flink.version>2.0.0</test.flink.version>
+
<paimon-flink-common.flink.version>2.1.0</paimon-flink-common.flink.version>
+ <test.flink.main.version>2.1</test.flink.main.version>
+ <test.flink.version>2.1.0</test.flink.version>
+ <target.java.version>11</target.java.version>
</properties>
<modules>
<module>paimon-flink/paimon-flink2-common</module>
<module>paimon-flink/paimon-flink-2.0</module>
+ <module>paimon-flink/paimon-flink-2.1</module>
</modules>
<activation>
<property>