This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38878
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 004ab744724e114e2ed447b3466b8c489d8af6f1
Author: lvyanquan <[email protected]>
AuthorDate: Thu Jan 15 21:57:56 2026 +0800

    [FLINK-38878][runtime] Add parse_json function.
---
 .../flink/FlinkPipelineTransformITCase.java        |   90 ++
 flink-cdc-runtime/pom.xml                          |    5 +
 .../org/apache/calcite/sql/type/SqlTypeFamily.java |  280 ++++++
 .../org/apache/calcite/sql/type/SqlTypeName.java   | 1054 ++++++++++++++++++++
 .../cdc/runtime/functions/SystemFunctionUtils.java |   35 +
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |   17 +
 .../parser/metadata/TransformSqlOperatorTable.java |   26 +
 .../cdc/runtime/typeutils/DataTypeConverter.java   |   23 +
 .../cdc/runtime/parser/JaninoCompilerTest.java     |   40 +-
 .../cdc/runtime/parser/TransformParserTest.java    |    2 +
 10 files changed, 1571 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 10ea31e53..883a50df9 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1137,6 +1137,96 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, null, null, null, null, null, null], op=INSERT, meta=()}");
     }
 
+    /** This tests if transform variant functions works as expected. */
+    @Test
+    void testTransformWithVariantFunctions() throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("message", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        tableSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+        List<Event> events =
+                Arrays.asList(
+                        new CreateTableEvent(myTable, tableSchema),
+                        DataChangeEvent.insertEvent(
+                                myTable,
+                                generator.generate(
+                                        new Object[] {
+                                            1,
+                                            BinaryStringData.fromString(
+                                                    
"{\"name\":\"张三\",\"age\":30,\"is_active\":true,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"address\":{\"street\":\"MainSt\",\"city\":\"Beijing\",\"zip\":\"100000\"}}"),
+                                        })),
+                        DataChangeEvent.insertEvent(
+                                myTable,
+                                generator.generate(
+                                        new Object[] {
+                                            2,
+                                            BinaryStringData.fromString(
+                                                    
"{\"name\":\"李四\",\"age\":40,\"is_active\":true,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"address\":{\"street\":\"MainSt\",\"city\":\"Beijing\",\"zip\":\"100000\"}}"),
+                                        })),
+                        DataChangeEvent.insertEvent(
+                                myTable, generator.generate(new Object[] {3, 
null})));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.\\.*",
+                                        "id, parse_json(message) as 
variantVal",
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        Assertions.assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`variantVal` VARIANT}, primaryKeys=id, 
options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, 
{\"address\":{\"city\":\"Beijing\",\"street\":\"MainSt\",\"zip\":\"100000\"},\"age\":30,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"is_active\":true,\"name\":\"张三\"}],
 op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, 
{\"address\":{\"city\":\"Beijing\",\"street\":\"MainSt\",\"zip\":\"100000\"},\"age\":40,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"is_active\":true,\"name\":\"李四\"}],
 op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, null], op=INSERT, meta=()}");
+    }
+
     @ParameterizedTest
     @EnumSource
     void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi 
apiVersion) {
diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml
index 6b1ea91a9..32d210e3e 100644
--- a/flink-cdc-runtime/pom.xml
+++ b/flink-cdc-runtime/pom.xml
@@ -44,6 +44,11 @@ limitations under the License.
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.checkerframework</groupId>
+            <artifactId>checker-qual</artifactId>
+            <version>3.12.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-common</artifactId>
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeFamily.java
 
b/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeFamily.java
new file mode 100644
index 000000000..94cca219d
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeFamily.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.sql.Types;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SqlTypeFamily provides SQL type categorization.
+ *
+ * <p>The <em>primary</em> family categorization is a complete disjoint 
partitioning of SQL types
+ * into families, where two types are members of the same primary family iff 
instances of the two
+ * types can be the operands of an SQL equality predicate such as <code>WHERE 
v1 = v2</code>.
+ * Primary families are returned by RelDataType.getFamily().
+ *
+ * <p>There is also a <em>secondary</em> family categorization which overlaps 
with the primary
+ * categorization. It is used in type strategies for more specific or more 
general categorization
+ * than the primary families. Secondary families are never returned by 
RelDataType.getFamily().
+ *
+ * <p>This class was copied over from Calcite to support variant 
type(CALCITE-4918). When upgrading
+ * to Calcite 1.39.0 version, please remove the entire class.
+ */
+public enum SqlTypeFamily implements RelDataTypeFamily {
+    // Primary families.
+    CHARACTER,
+    BINARY,
+    NUMERIC,
+    DATE,
+    TIME,
+    TIMESTAMP,
+    BOOLEAN,
+    INTERVAL_YEAR_MONTH,
+    INTERVAL_DAY_TIME,
+
+    // Secondary families.
+
+    STRING,
+    APPROXIMATE_NUMERIC,
+    EXACT_NUMERIC,
+    DECIMAL,
+    INTEGER,
+    DATETIME,
+    DATETIME_INTERVAL,
+    MULTISET,
+    ARRAY,
+    MAP,
+    NULL,
+    ANY,
+    CURSOR,
+    COLUMN_LIST,
+    GEO,
+    VARIANT,
+    /** Like ANY, but do not even validate the operand. It may not be an 
expression. */
+    IGNORE;
+
+    private static final Map<Integer, SqlTypeFamily> JDBC_TYPE_TO_FAMILY =
+            ImmutableMap.<Integer, SqlTypeFamily>builder()
+                    // Not present:
+                    // SqlTypeName.MULTISET shares Types.ARRAY with 
SqlTypeName.ARRAY;
+                    // SqlTypeName.MAP has no corresponding JDBC type
+                    // SqlTypeName.COLUMN_LIST has no corresponding JDBC type
+                    .put(Types.BIT, NUMERIC)
+                    .put(Types.TINYINT, NUMERIC)
+                    .put(Types.SMALLINT, NUMERIC)
+                    .put(Types.BIGINT, NUMERIC)
+                    .put(Types.INTEGER, NUMERIC)
+                    .put(Types.NUMERIC, NUMERIC)
+                    .put(Types.DECIMAL, NUMERIC)
+                    .put(Types.FLOAT, NUMERIC)
+                    .put(Types.REAL, NUMERIC)
+                    .put(Types.DOUBLE, NUMERIC)
+                    .put(Types.CHAR, CHARACTER)
+                    .put(Types.VARCHAR, CHARACTER)
+                    .put(Types.LONGVARCHAR, CHARACTER)
+                    .put(Types.CLOB, CHARACTER)
+                    .put(Types.BINARY, BINARY)
+                    .put(Types.VARBINARY, BINARY)
+                    .put(Types.LONGVARBINARY, BINARY)
+                    .put(Types.BLOB, BINARY)
+                    .put(Types.DATE, DATE)
+                    .put(Types.TIME, TIME)
+                    .put(ExtraSqlTypes.TIME_WITH_TIMEZONE, TIME)
+                    .put(Types.TIMESTAMP, TIMESTAMP)
+                    .put(ExtraSqlTypes.TIMESTAMP_WITH_TIMEZONE, TIMESTAMP)
+                    .put(Types.BOOLEAN, BOOLEAN)
+                    .put(ExtraSqlTypes.REF_CURSOR, CURSOR)
+                    .put(Types.ARRAY, ARRAY)
+                    .put(Types.JAVA_OBJECT, VARIANT)
+                    .build();
+
+    /**
+     * Gets the primary family containing a JDBC type.
+     *
+     * @param jdbcType the JDBC type of interest
+     * @return containing family
+     */
+    public static @Nullable SqlTypeFamily getFamilyForJdbcType(int jdbcType) {
+        return JDBC_TYPE_TO_FAMILY.get(jdbcType);
+    }
+
+    /**
+     * For this type family, returns the allow types of the difference between 
two values of this
+     * family.
+     *
+     * <p>Equivalently, given an {@code ORDER BY} expression with one key, 
returns the allowable
+     * type families of the difference between two keys.
+     *
+     * <p>Example 1. For {@code ORDER BY empno}, a NUMERIC, the difference 
between two {@code empno}
+     * values is also NUMERIC.
+     *
+     * <p>Example 2. For {@code ORDER BY hireDate}, a DATE, the difference 
between two {@code
+     * hireDate} values might be an INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH.
+     *
+     * <p>The result determines whether a {@link SqlWindow} with a {@code 
RANGE} is valid (for
+     * example, {@code OVER (ORDER BY empno RANGE 10} is valid because {@code 
10} is numeric); and
+     * whether a call to {@link 
org.apache.calcite.sql.fun.SqlStdOperatorTable#PERCENTILE_CONT
+     * PERCENTILE_CONT} is valid (for example, {@code PERCENTILE_CONT(0.25)} 
ORDER BY (hireDate)} is
+     * valid because {@code hireDate} values may be interpolated by adding 
values of type {@code
+     * INTERVAL_DAY_TIME}.
+     */
+    public List<SqlTypeFamily> allowableDifferenceTypes() {
+        switch (this) {
+            case NUMERIC:
+                return ImmutableList.of(NUMERIC);
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return ImmutableList.of(INTERVAL_DAY_TIME, 
INTERVAL_YEAR_MONTH);
+            default:
+                return ImmutableList.of();
+        }
+    }
+
+    /** Returns the collection of {@link SqlTypeName}s included in this 
family. */
+    public Collection<SqlTypeName> getTypeNames() {
+        switch (this) {
+            case CHARACTER:
+                return SqlTypeName.CHAR_TYPES;
+            case BINARY:
+                return SqlTypeName.BINARY_TYPES;
+            case NUMERIC:
+                return SqlTypeName.NUMERIC_TYPES;
+            case DECIMAL:
+                return ImmutableList.of(SqlTypeName.DECIMAL);
+            case DATE:
+                return ImmutableList.of(SqlTypeName.DATE);
+            case TIME:
+                return ImmutableList.of(SqlTypeName.TIME, 
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
+            case TIMESTAMP:
+                return ImmutableList.of(
+                        SqlTypeName.TIMESTAMP, 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            case BOOLEAN:
+                return SqlTypeName.BOOLEAN_TYPES;
+            case INTERVAL_YEAR_MONTH:
+                return SqlTypeName.YEAR_INTERVAL_TYPES;
+            case INTERVAL_DAY_TIME:
+                return SqlTypeName.DAY_INTERVAL_TYPES;
+            case STRING:
+                return SqlTypeName.STRING_TYPES;
+            case APPROXIMATE_NUMERIC:
+                return SqlTypeName.APPROX_TYPES;
+            case EXACT_NUMERIC:
+                return SqlTypeName.EXACT_TYPES;
+            case INTEGER:
+                return SqlTypeName.INT_TYPES;
+            case DATETIME:
+                return SqlTypeName.DATETIME_TYPES;
+            case DATETIME_INTERVAL:
+                return SqlTypeName.INTERVAL_TYPES;
+            case GEO:
+                return SqlTypeName.GEOMETRY_TYPES;
+            case MULTISET:
+                return ImmutableList.of(SqlTypeName.MULTISET);
+            case ARRAY:
+                return ImmutableList.of(SqlTypeName.ARRAY);
+            case MAP:
+                return ImmutableList.of(SqlTypeName.MAP);
+            case NULL:
+                return ImmutableList.of(SqlTypeName.NULL);
+            case ANY:
+                return SqlTypeName.ALL_TYPES;
+            case CURSOR:
+                return ImmutableList.of(SqlTypeName.CURSOR);
+            case COLUMN_LIST:
+                return ImmutableList.of(SqlTypeName.COLUMN_LIST);
+            case VARIANT:
+                return ImmutableList.of(SqlTypeName.VARIANT);
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    /** Return the default {@link RelDataType} that belongs to this family. */
+    public @Nullable RelDataType getDefaultConcreteType(RelDataTypeFactory 
factory) {
+        switch (this) {
+            case CHARACTER:
+                return factory.createSqlType(SqlTypeName.VARCHAR);
+            case BINARY:
+                return factory.createSqlType(SqlTypeName.VARBINARY);
+            case NUMERIC:
+                return SqlTypeUtil.getMaxPrecisionScaleDecimal(factory);
+            case DATE:
+                return factory.createSqlType(SqlTypeName.DATE);
+            case TIME:
+                return factory.createSqlType(SqlTypeName.TIME);
+            case TIMESTAMP:
+                return factory.createSqlType(SqlTypeName.TIMESTAMP);
+            case BOOLEAN:
+                return factory.createSqlType(SqlTypeName.BOOLEAN);
+            case STRING:
+                return factory.createSqlType(SqlTypeName.VARCHAR);
+            case APPROXIMATE_NUMERIC:
+                return factory.createSqlType(SqlTypeName.DOUBLE);
+            case EXACT_NUMERIC:
+                return SqlTypeUtil.getMaxPrecisionScaleDecimal(factory);
+            case INTEGER:
+                return factory.createSqlType(SqlTypeName.BIGINT);
+            case DECIMAL:
+                return factory.createSqlType(SqlTypeName.DECIMAL);
+            case DATETIME:
+                return factory.createSqlType(SqlTypeName.TIMESTAMP);
+            case INTERVAL_DAY_TIME:
+                return factory.createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.DAY, 
TimeUnit.SECOND, SqlParserPos.ZERO));
+            case INTERVAL_YEAR_MONTH:
+                return factory.createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.YEAR, 
TimeUnit.MONTH, SqlParserPos.ZERO));
+            case GEO:
+                return factory.createSqlType(SqlTypeName.GEOMETRY);
+            case MULTISET:
+                return 
factory.createMultisetType(factory.createSqlType(SqlTypeName.ANY), -1);
+            case ARRAY:
+                return 
factory.createArrayType(factory.createSqlType(SqlTypeName.ANY), -1);
+            case MAP:
+                return factory.createMapType(
+                        factory.createSqlType(SqlTypeName.ANY),
+                        factory.createSqlType(SqlTypeName.ANY));
+            case NULL:
+                return factory.createSqlType(SqlTypeName.NULL);
+            case CURSOR:
+                return factory.createSqlType(SqlTypeName.CURSOR);
+            case COLUMN_LIST:
+                return factory.createSqlType(SqlTypeName.COLUMN_LIST);
+            default:
+                return null;
+        }
+    }
+
+    public boolean contains(RelDataType type) {
+        return SqlTypeUtil.isOfSameTypeName(getTypeNames(), type);
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeName.java 
b/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeName.java
new file mode 100644
index 000000000..8ee6851d8
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeName.java
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Enumeration of the type names which can be used to construct a SQL type. 
Rationale for this
+ * class's existence (instead of just using the standard java.sql.Type 
ordinals):
+ *
+ * <ul>
+ *   <li>{@link Types} does not include all SQL2003 data-types;
+ *   <li>SqlTypeName provides a type-safe enumeration;
+ *   <li>SqlTypeName provides a place to hang extra information such as 
whether the type carries
+ *       precision and scale.
+ * </ul>
+ *
+ * <p>This class was copied over from Calcite to support variant 
type(CALCITE-4918). When upgrading
+ * to Calcite 1.39.0 version, please remove the entire class.
+ */
+public enum SqlTypeName {
+    BOOLEAN(PrecScale.NO_NO, false, Types.BOOLEAN, SqlTypeFamily.BOOLEAN),
+    TINYINT(PrecScale.NO_NO, false, Types.TINYINT, SqlTypeFamily.NUMERIC),
+    SMALLINT(PrecScale.NO_NO, false, Types.SMALLINT, SqlTypeFamily.NUMERIC),
+    INTEGER(PrecScale.NO_NO, false, Types.INTEGER, SqlTypeFamily.NUMERIC),
+    BIGINT(PrecScale.NO_NO, false, Types.BIGINT, SqlTypeFamily.NUMERIC),
+    DECIMAL(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.DECIMAL,
+            SqlTypeFamily.NUMERIC),
+    FLOAT(PrecScale.NO_NO, false, Types.FLOAT, SqlTypeFamily.NUMERIC),
+    REAL(PrecScale.NO_NO, false, Types.REAL, SqlTypeFamily.NUMERIC),
+    DOUBLE(PrecScale.NO_NO, false, Types.DOUBLE, SqlTypeFamily.NUMERIC),
+    DATE(PrecScale.NO_NO, false, Types.DATE, SqlTypeFamily.DATE),
+    TIME(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.TIME, 
SqlTypeFamily.TIME),
+    TIME_WITH_LOCAL_TIME_ZONE(
+            PrecScale.NO_NO | PrecScale.YES_NO, false, Types.OTHER, 
SqlTypeFamily.TIME),
+    TIMESTAMP(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.TIMESTAMP, 
SqlTypeFamily.TIMESTAMP),
+    TIMESTAMP_WITH_LOCAL_TIME_ZONE(
+            PrecScale.NO_NO | PrecScale.YES_NO, false, Types.TIMESTAMP, 
SqlTypeFamily.TIMESTAMP),
+    INTERVAL_YEAR(PrecScale.NO_NO, false, Types.OTHER, 
SqlTypeFamily.INTERVAL_YEAR_MONTH),
+    INTERVAL_YEAR_MONTH(PrecScale.NO_NO, false, Types.OTHER, 
SqlTypeFamily.INTERVAL_YEAR_MONTH),
+    INTERVAL_MONTH(PrecScale.NO_NO, false, Types.OTHER, 
SqlTypeFamily.INTERVAL_YEAR_MONTH),
+    INTERVAL_DAY(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_DAY_HOUR(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_DAY_MINUTE(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_DAY_SECOND(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_HOUR(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_HOUR_MINUTE(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_HOUR_SECOND(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_MINUTE(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_MINUTE_SECOND(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    INTERVAL_SECOND(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            false,
+            Types.OTHER,
+            SqlTypeFamily.INTERVAL_DAY_TIME),
+    CHAR(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.CHAR, 
SqlTypeFamily.CHARACTER),
+    VARCHAR(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.VARCHAR, 
SqlTypeFamily.CHARACTER),
+    BINARY(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.BINARY, 
SqlTypeFamily.BINARY),
+    VARBINARY(PrecScale.NO_NO | PrecScale.YES_NO, false, Types.VARBINARY, 
SqlTypeFamily.BINARY),
+    NULL(PrecScale.NO_NO, true, Types.NULL, SqlTypeFamily.NULL),
+    UNKNOWN(PrecScale.NO_NO, true, Types.NULL, SqlTypeFamily.NULL),
+    ANY(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            true,
+            Types.JAVA_OBJECT,
+            SqlTypeFamily.ANY),
+    SYMBOL(PrecScale.NO_NO, true, Types.OTHER, null),
+    MULTISET(PrecScale.NO_NO, false, Types.ARRAY, SqlTypeFamily.MULTISET),
+    ARRAY(PrecScale.NO_NO, false, Types.ARRAY, SqlTypeFamily.ARRAY),
+    MAP(PrecScale.NO_NO, false, Types.OTHER, SqlTypeFamily.MAP),
+    DISTINCT(PrecScale.NO_NO, false, Types.DISTINCT, null),
+    STRUCTURED(PrecScale.NO_NO, false, Types.STRUCT, null),
+    ROW(PrecScale.NO_NO, false, Types.STRUCT, null),
+    OTHER(PrecScale.NO_NO, false, Types.OTHER, null),
+    CURSOR(PrecScale.NO_NO, false, ExtraSqlTypes.REF_CURSOR, 
SqlTypeFamily.CURSOR),
+    COLUMN_LIST(PrecScale.NO_NO, false, Types.OTHER + 2, 
SqlTypeFamily.COLUMN_LIST),
+    DYNAMIC_STAR(
+            PrecScale.NO_NO | PrecScale.YES_NO | PrecScale.YES_YES,
+            true,
+            Types.JAVA_OBJECT,
+            SqlTypeFamily.ANY),
+    /**
+     * Spatial type. Though not standard, it is common to several DBs, so we 
do not flag it
+     * 'special' (internal).
+     */
+    GEOMETRY(PrecScale.NO_NO, false, ExtraSqlTypes.GEOMETRY, 
SqlTypeFamily.GEO),
+    MEASURE(PrecScale.NO_NO, true, Types.OTHER, SqlTypeFamily.ANY),
+    SARG(PrecScale.NO_NO, true, Types.OTHER, SqlTypeFamily.ANY),
+    /**
+     * VARIANT data type, a dynamically-typed value that can have at runtime 
any of the other data
+     * types in this table.
+     */
+    VARIANT(PrecScale.NO_NO, false, Types.OTHER, SqlTypeFamily.VARIANT);
+
+    public static final int MAX_DATETIME_PRECISION = 3;
+
+    // Minimum and default interval precisions are  defined by SQL2003
+    // Maximum interval precisions are implementation dependent,
+    //  but must be at least the default value
+    public static final int DEFAULT_INTERVAL_START_PRECISION = 2;
+    public static final int DEFAULT_INTERVAL_FRACTIONAL_SECOND_PRECISION = 6;
+    public static final int MIN_INTERVAL_START_PRECISION = 1;
+    public static final int MIN_INTERVAL_FRACTIONAL_SECOND_PRECISION = 1;
+    public static final int MAX_INTERVAL_START_PRECISION = 10;
+    public static final int MAX_INTERVAL_FRACTIONAL_SECOND_PRECISION = 9;
+
+    // Cached map of enum values
+    private static final Map<String, SqlTypeName> VALUES_MAP =
+            Util.enumConstants(SqlTypeName.class);
+
+    // categorizations used by SqlTypeFamily definitions
+
+    // you probably want to use JDK 1.5 support for treating enumeration
+    // as collection instead; this is only here to support
+    // SqlTypeFamily.ANY
+    public static final List<SqlTypeName> ALL_TYPES =
+            ImmutableList.of(
+                    BOOLEAN,
+                    INTEGER,
+                    VARCHAR,
+                    DATE,
+                    TIME,
+                    TIMESTAMP,
+                    NULL,
+                    DECIMAL,
+                    ANY,
+                    CHAR,
+                    BINARY,
+                    VARBINARY,
+                    TINYINT,
+                    SMALLINT,
+                    BIGINT,
+                    REAL,
+                    DOUBLE,
+                    SYMBOL,
+                    INTERVAL_YEAR,
+                    INTERVAL_YEAR_MONTH,
+                    INTERVAL_MONTH,
+                    INTERVAL_DAY,
+                    INTERVAL_DAY_HOUR,
+                    INTERVAL_DAY_MINUTE,
+                    INTERVAL_DAY_SECOND,
+                    INTERVAL_HOUR,
+                    INTERVAL_HOUR_MINUTE,
+                    INTERVAL_HOUR_SECOND,
+                    INTERVAL_MINUTE,
+                    INTERVAL_MINUTE_SECOND,
+                    INTERVAL_SECOND,
+                    TIME_WITH_LOCAL_TIME_ZONE,
+                    TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                    FLOAT,
+                    MULTISET,
+                    DISTINCT,
+                    STRUCTURED,
+                    ROW,
+                    CURSOR,
+                    COLUMN_LIST,
+                    VARIANT);
+
+    public static final List<SqlTypeName> BOOLEAN_TYPES = 
ImmutableList.of(BOOLEAN);
+
+    public static final List<SqlTypeName> BINARY_TYPES = 
ImmutableList.of(BINARY, VARBINARY);
+
+    public static final List<SqlTypeName> INT_TYPES =
+            ImmutableList.of(TINYINT, SMALLINT, INTEGER, BIGINT);
+
+    public static final List<SqlTypeName> EXACT_TYPES =
+            combine(INT_TYPES, ImmutableList.of(DECIMAL));
+
+    public static final List<SqlTypeName> APPROX_TYPES = 
ImmutableList.of(FLOAT, REAL, DOUBLE);
+
+    public static final List<SqlTypeName> NUMERIC_TYPES = combine(EXACT_TYPES, 
APPROX_TYPES);
+
+    public static final List<SqlTypeName> FRACTIONAL_TYPES =
+            combine(APPROX_TYPES, ImmutableList.of(DECIMAL));
+
+    public static final List<SqlTypeName> CHAR_TYPES = ImmutableList.of(CHAR, 
VARCHAR);
+
+    public static final List<SqlTypeName> STRING_TYPES = combine(CHAR_TYPES, 
BINARY_TYPES);
+
+    public static final List<SqlTypeName> GEOMETRY_TYPES = 
ImmutableList.of(GEOMETRY);
+
+    public static final List<SqlTypeName> DATETIME_TYPES =
+            ImmutableList.of(
+                    DATE,
+                    TIME,
+                    TIME_WITH_LOCAL_TIME_ZONE,
+                    TIMESTAMP,
+                    TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+    public static final Set<SqlTypeName> YEAR_INTERVAL_TYPES =
+            Sets.immutableEnumSet(
+                    SqlTypeName.INTERVAL_YEAR,
+                    SqlTypeName.INTERVAL_YEAR_MONTH,
+                    SqlTypeName.INTERVAL_MONTH);
+
+    public static final Set<SqlTypeName> DAY_INTERVAL_TYPES =
+            Sets.immutableEnumSet(
+                    SqlTypeName.INTERVAL_DAY,
+                    SqlTypeName.INTERVAL_DAY_HOUR,
+                    SqlTypeName.INTERVAL_DAY_MINUTE,
+                    SqlTypeName.INTERVAL_DAY_SECOND,
+                    SqlTypeName.INTERVAL_HOUR,
+                    SqlTypeName.INTERVAL_HOUR_MINUTE,
+                    SqlTypeName.INTERVAL_HOUR_SECOND,
+                    SqlTypeName.INTERVAL_MINUTE,
+                    SqlTypeName.INTERVAL_MINUTE_SECOND,
+                    SqlTypeName.INTERVAL_SECOND);
+
+    public static final Set<SqlTypeName> INTERVAL_TYPES =
+            Sets.immutableEnumSet(Iterables.concat(YEAR_INTERVAL_TYPES, 
DAY_INTERVAL_TYPES));
+
+    /** The possible types of a time frame argument to a function such as 
{@code TIMESTAMP_DIFF}. */
+    public static final Set<SqlTypeName> TIME_FRAME_TYPES =
+            Sets.immutableEnumSet(Iterables.concat(INTERVAL_TYPES, 
ImmutableList.of(SYMBOL)));
+
+    private static final Map<Integer, SqlTypeName> JDBC_TYPE_TO_NAME =
+            ImmutableMap.<Integer, SqlTypeName>builder()
+                    .put(Types.TINYINT, TINYINT)
+                    .put(Types.SMALLINT, SMALLINT)
+                    .put(Types.BIGINT, BIGINT)
+                    .put(Types.INTEGER, INTEGER)
+                    .put(Types.NUMERIC, DECIMAL) // REVIEW
+                    .put(Types.DECIMAL, DECIMAL)
+                    .put(Types.FLOAT, FLOAT)
+                    .put(Types.REAL, REAL)
+                    .put(Types.DOUBLE, DOUBLE)
+                    .put(Types.CHAR, CHAR)
+                    .put(Types.VARCHAR, VARCHAR)
+
+                    // TODO: provide real support for these eventually
+                    .put(ExtraSqlTypes.NCHAR, CHAR)
+                    .put(ExtraSqlTypes.NVARCHAR, VARCHAR)
+
+                    // TODO: additional types not yet supported. See 
ExtraSqlTypes.
+                    // .put(Types.LONGVARCHAR, Longvarchar)
+                    // .put(Types.CLOB, Clob)
+                    // .put(Types.LONGVARBINARY, Longvarbinary)
+                    // .put(Types.BLOB, Blob)
+                    // .put(Types.LONGNVARCHAR, Longnvarchar)
+                    // .put(Types.NCLOB, Nclob)
+                    // .put(Types.ROWID, Rowid)
+                    // .put(Types.SQLXML, Sqlxml)
+
+                    .put(Types.BINARY, BINARY)
+                    .put(Types.VARBINARY, VARBINARY)
+                    .put(Types.DATE, DATE)
+                    .put(Types.TIME, TIME)
+                    .put(Types.TIMESTAMP, TIMESTAMP)
+                    .put(Types.BIT, BOOLEAN)
+                    .put(Types.BOOLEAN, BOOLEAN)
+                    .put(Types.DISTINCT, DISTINCT)
+                    .put(Types.STRUCT, STRUCTURED)
+                    .put(Types.ARRAY, ARRAY)
+                    .build();
+
+    /** Bitwise-or of flags indicating allowable precision/scale combinations. 
*/
+    private final int signatures;
+
+    /**
+     * Returns true if not of a "pure" standard sql type. "Inpure" types are 
{@link #ANY}, {@link
+     * #NULL} and {@link #SYMBOL}
+     */
+    private final boolean special;
+
+    private final int jdbcOrdinal;
+    private final @Nullable SqlTypeFamily family;
+
+    SqlTypeName(int signatures, boolean special, int jdbcType, @Nullable 
SqlTypeFamily family) {
+        this.signatures = signatures;
+        this.special = special;
+        this.jdbcOrdinal = jdbcType;
+        this.family = family;
+    }
+
+    /**
+     * Looks up a type name from its name.
+     *
+     * @return Type name, or null if not found
+     */
+    public static @Nullable SqlTypeName get(String name) {
+        if (false) {
+            // The following code works OK, but the spurious exceptions are
+            // annoying.
+            try {
+                return SqlTypeName.valueOf(name);
+            } catch (IllegalArgumentException e) {
+                return null;
+            }
+        }
+        return VALUES_MAP.get(name);
+    }
+
+    /**
+     * Returns the SqlTypeName value whose name or {@link #getSpaceName()} 
matches the given name,
+     * or throws {@link IllegalArgumentException}; never returns null.
+     */
+    public static SqlTypeName lookup(String tag) {
+        String tag2 = tag.replace(' ', '_');
+        return valueOf(tag2);
+    }
+
+    public boolean allowsNoPrecNoScale() {
+        return (signatures & PrecScale.NO_NO) != 0;
+    }
+
+    public boolean allowsPrecNoScale() {
+        return (signatures & PrecScale.YES_NO) != 0;
+    }
+
+    public boolean allowsPrec() {
+        return allowsPrecScale(true, true) || allowsPrecScale(true, false);
+    }
+
+    public boolean allowsScale() {
+        return allowsPrecScale(true, true);
+    }
+
+    /**
+     * Returns whether this type can be specified with a given combination of 
precision and scale.
+     * For example,
+     *
+     * <ul>
+     *   <li><code>Varchar.allowsPrecScale(true, false)</code> returns <code>
+     * true</code>, because the VARCHAR type allows a precision parameter, as 
in <code>VARCHAR(10)
+     *       </code>.
+     *   <li><code>Varchar.allowsPrecScale(true, true)</code> returns <code>
+     * true</code>, because the VARCHAR type does not allow a precision and a 
scale parameter, as in
+     *       <code>VARCHAR(10, 4)</code>.
+     *   <li><code>allowsPrecScale(false, true)</code> returns 
<code>false</code> for every type.
+     * </ul>
+     *
+     * @param precision Whether the precision/length field is part of the type 
specification
+     * @param scale Whether the scale field is part of the type specification
+     * @return Whether this combination of precision/scale is valid
+     */
+    public boolean allowsPrecScale(boolean precision, boolean scale) {
+        int mask =
+                precision
+                        ? (scale ? PrecScale.YES_YES : PrecScale.YES_NO)
+                        : (scale ? 0 : PrecScale.NO_NO);
+        return (signatures & mask) != 0;
+    }
+
+    public boolean isSpecial() {
+        return special;
+    }
+
+    /** Returns the ordinal from {@link Types} corresponding to this 
SqlTypeName. */
+    public int getJdbcOrdinal() {
+        return jdbcOrdinal;
+    }
+
+    private static List<SqlTypeName> combine(List<SqlTypeName> list0, 
List<SqlTypeName> list1) {
+        return 
ImmutableList.<SqlTypeName>builder().addAll(list0).addAll(list1).build();
+    }
+
+    /**
+     * Returns the default scale for this type if supported, otherwise -1 if 
scale is either
+     * unsupported or must be specified explicitly.
+     */
+    public int getDefaultScale() {
+        switch (this) {
+            case DECIMAL:
+                return 0;
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+            case INTERVAL_SECOND:
+                return DEFAULT_INTERVAL_FRACTIONAL_SECOND_PRECISION;
+            default:
+                return -1;
+        }
+    }
+
+    /**
+     * Gets the SqlTypeFamily containing this SqlTypeName.
+     *
+     * @return containing family, or null for none (SYMBOL, DISTINCT, 
STRUCTURED, ROW, OTHER)
+     */
+    public @Nullable SqlTypeFamily getFamily() {
+        return family;
+    }
+
+    /**
+     * Gets the SqlTypeName corresponding to a JDBC type.
+     *
+     * @param jdbcType the JDBC type of interest
+     * @return corresponding SqlTypeName, or null if the type is not known
+     */
+    public static @Nullable SqlTypeName getNameForJdbcType(int jdbcType) {
+        return JDBC_TYPE_TO_NAME.get(jdbcType);
+    }
+
+    /**
+     * Returns the limit of this datatype. For example,
+     *
+     * <table border="1">
+     * <caption>Datatype limits</caption>
+     * <tr>
+     * <th>Datatype</th>
+     * <th>sign</th>
+     * <th>limit</th>
+     * <th>beyond</th>
+     * <th>precision</th>
+     * <th>scale</th>
+     * <th>Returns</th>
+     * </tr>
+     * <tr>
+     * <td>Integer</td>
+     * <td>true</td>
+     * <td>true</td>
+     * <td>false</td>
+     * <td>-1</td>
+     * <td>-1</td>
+     * <td>2147483647 (2 ^ 31 -1 = MAXINT)</td>
+     * </tr>
+     * <tr>
+     * <td>Integer</td>
+     * <td>true</td>
+     * <td>true</td>
+     * <td>true</td>
+     * <td>-1</td>
+     * <td>-1</td>
+     * <td>2147483648 (2 ^ 31 = MAXINT + 1)</td>
+     * </tr>
+     * <tr>
+     * <td>Integer</td>
+     * <td>false</td>
+     * <td>true</td>
+     * <td>false</td>
+     * <td>-1</td>
+     * <td>-1</td>
+     * <td>-2147483648 (-2 ^ 31 = MININT)</td>
+     * </tr>
+     * <tr>
+     * <td>Boolean</td>
+     * <td>true</td>
+     * <td>true</td>
+     * <td>false</td>
+     * <td>-1</td>
+     * <td>-1</td>
+     * <td>TRUE</td>
+     * </tr>
+     * <tr>
+     * <td>Varchar</td>
+     * <td>true</td>
+     * <td>true</td>
+     * <td>false</td>
+     * <td>10</td>
+     * <td>-1</td>
+     * <td>'ZZZZZZZZZZ'</td>
+     * </tr>
+     * </table>
+     *
+     * @param sign If true, returns upper limit, otherwise lower limit
+     * @param limit If true, returns value at or near to overflow; otherwise 
value at or near to
+     *     underflow
+     * @param beyond If true, returns the value just beyond the limit, 
otherwise the value at the
+     *     limit
+     * @param precision Precision, or -1 if not applicable
+     * @param scale Scale, or -1 if not applicable
+     * @return Limit value
+     */
+    public @Nullable Object getLimit(
+            boolean sign, Limit limit, boolean beyond, int precision, int 
scale) {
+        assert allowsPrecScale(precision != -1, scale != -1) : this;
+        if (limit == Limit.ZERO) {
+            if (beyond) {
+                return null;
+            }
+            sign = true;
+        }
+        Calendar calendar;
+
+        switch (this) {
+            case BOOLEAN:
+                switch (limit) {
+                    case ZERO:
+                        return false;
+                    case UNDERFLOW:
+                        return null;
+                    case OVERFLOW:
+                        if (beyond || !sign) {
+                            return null;
+                        } else {
+                            return true;
+                        }
+                    default:
+                        throw Util.unexpected(limit);
+                }
+
+            case TINYINT:
+                return getNumericLimit(2, 8, sign, limit, beyond);
+
+            case SMALLINT:
+                return getNumericLimit(2, 16, sign, limit, beyond);
+
+            case INTEGER:
+                return getNumericLimit(2, 32, sign, limit, beyond);
+
+            case BIGINT:
+                return getNumericLimit(2, 64, sign, limit, beyond);
+
+            case DECIMAL:
+                BigDecimal decimal = getNumericLimit(10, precision, sign, 
limit, beyond);
+                if (decimal == null) {
+                    return null;
+                }
+
+                // Decimal values must fit into 64 bits. So, the maximum value 
of
+                // a DECIMAL(19, 0) is 2^63 - 1, not 10^19 - 1.
+                switch (limit) {
+                    case OVERFLOW:
+                        final BigDecimal other =
+                                (BigDecimal) BIGINT.getLimit(sign, limit, 
beyond, -1, -1);
+                        if (other != null && decimal.compareTo(other) == (sign 
? 1 : -1)) {
+                            decimal = other;
+                        }
+                        break;
+                    default:
+                        break;
+                }
+
+                // Apply scale.
+                if (scale == 0) {
+                    // do nothing
+                } else if (scale > 0) {
+                    decimal = decimal.divide(BigDecimal.TEN.pow(scale));
+                } else {
+                    decimal = decimal.multiply(BigDecimal.TEN.pow(-scale));
+                }
+                return decimal;
+
+            case CHAR:
+            case VARCHAR:
+                if (!sign) {
+                    return null; // this type does not have negative values
+                }
+                StringBuilder buf = new StringBuilder();
+                switch (limit) {
+                    case ZERO:
+                        break;
+                    case UNDERFLOW:
+                        if (beyond) {
+                            // There is no value between the empty string and 
the
+                            // smallest non-empty string.
+                            return null;
+                        }
+                        buf.append("a");
+                        break;
+                    case OVERFLOW:
+                        for (int i = 0; i < precision; ++i) {
+                            buf.append("Z");
+                        }
+                        if (beyond) {
+                            buf.append("Z");
+                        }
+                        break;
+                    default:
+                        break;
+                }
+                return buf.toString();
+
+            case BINARY:
+            case VARBINARY:
+                if (!sign) {
+                    return null; // this type does not have negative values
+                }
+                byte[] bytes;
+                switch (limit) {
+                    case ZERO:
+                        bytes = new byte[0];
+                        break;
+                    case UNDERFLOW:
+                        if (beyond) {
+                            // There is no value between the empty string and 
the
+                            // smallest value.
+                            return null;
+                        }
+                        bytes = new byte[] {0x00};
+                        break;
+                    case OVERFLOW:
+                        bytes = new byte[precision + (beyond ? 1 : 0)];
+                        Arrays.fill(bytes, (byte) 0xff);
+                        break;
+                    default:
+                        throw Util.unexpected(limit);
+                }
+                return bytes;
+
+            case DATE:
+                calendar = Util.calendar();
+                switch (limit) {
+                    case ZERO:
+                        // The epoch.
+                        calendar.set(Calendar.YEAR, 1970);
+                        calendar.set(Calendar.MONTH, 0);
+                        calendar.set(Calendar.DAY_OF_MONTH, 1);
+                        break;
+                    case UNDERFLOW:
+                        return null;
+                    case OVERFLOW:
+                        if (beyond) {
+                            // It is impossible to represent an invalid year 
as a date
+                            // literal. SQL dates are represented as 
'yyyy-mm-dd', and
+                            // 1 <= yyyy <= 9999 is valid. There is no year 0: 
the year
+                            // before 1AD is 1BC, so SimpleDateFormat renders 
the day
+                            // before 0001-01-01 (AD) as 0001-12-31 (BC), 
which looks
+                            // like a valid date.
+                            return null;
+                        }
+
+                        // "SQL:2003 6.1 <data type> Access Rules 6" says that 
year is
+                        // between 1 and 9999, and days/months are the valid 
Gregorian
+                        // calendar values for these years.
+                        if (sign) {
+                            calendar.set(Calendar.YEAR, 9999);
+                            calendar.set(Calendar.MONTH, 11);
+                            calendar.set(Calendar.DAY_OF_MONTH, 31);
+                        } else {
+                            calendar.set(Calendar.YEAR, 1);
+                            calendar.set(Calendar.MONTH, 0);
+                            calendar.set(Calendar.DAY_OF_MONTH, 1);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+                calendar.set(Calendar.HOUR_OF_DAY, 0);
+                calendar.set(Calendar.MINUTE, 0);
+                calendar.set(Calendar.SECOND, 0);
+                return calendar;
+
+            case TIME:
+                if (!sign) {
+                    return null; // this type does not have negative values
+                }
+                if (beyond) {
+                    return null; // invalid values are impossible to represent
+                }
+                calendar = Util.calendar();
+                switch (limit) {
+                    case ZERO:
+
+                        // The epoch.
+                        calendar.set(Calendar.HOUR_OF_DAY, 0);
+                        calendar.set(Calendar.MINUTE, 0);
+                        calendar.set(Calendar.SECOND, 0);
+                        calendar.set(Calendar.MILLISECOND, 0);
+                        break;
+                    case UNDERFLOW:
+                        return null;
+                    case OVERFLOW:
+                        calendar.set(Calendar.HOUR_OF_DAY, 23);
+                        calendar.set(Calendar.MINUTE, 59);
+                        calendar.set(Calendar.SECOND, 59);
+                        int millis =
+                                (precision >= 3)
+                                        ? 999
+                                        : ((precision == 2) ? 990 : 
((precision == 1) ? 900 : 0));
+                        calendar.set(Calendar.MILLISECOND, millis);
+                        break;
+                    default:
+                        break;
+                }
+                return calendar;
+
+            case TIMESTAMP:
+                calendar = Util.calendar();
+                switch (limit) {
+                    case ZERO:
+                        // The epoch.
+                        calendar.set(Calendar.YEAR, 1970);
+                        calendar.set(Calendar.MONTH, 0);
+                        calendar.set(Calendar.DAY_OF_MONTH, 1);
+                        calendar.set(Calendar.HOUR_OF_DAY, 0);
+                        calendar.set(Calendar.MINUTE, 0);
+                        calendar.set(Calendar.SECOND, 0);
+                        calendar.set(Calendar.MILLISECOND, 0);
+                        break;
+                    case UNDERFLOW:
+                        return null;
+                    case OVERFLOW:
+                        if (beyond) {
+                            // It is impossible to represent an invalid year 
as a date
+                            // literal. SQL dates are represented as 
'yyyy-mm-dd', and
+                            // 1 <= yyyy <= 9999 is valid. There is no year 0: 
the year
+                            // before 1AD is 1BC, so SimpleDateFormat renders 
the day
+                            // before 0001-01-01 (AD) as 0001-12-31 (BC), 
which looks
+                            // like a valid date.
+                            return null;
+                        }
+
+                        // "SQL:2003 6.1 <data type> Access Rules 6" says that 
year is
+                        // between 1 and 9999, and days/months are the valid 
Gregorian
+                        // calendar values for these years.
+                        if (sign) {
+                            calendar.set(Calendar.YEAR, 9999);
+                            calendar.set(Calendar.MONTH, 11);
+                            calendar.set(Calendar.DAY_OF_MONTH, 31);
+                            calendar.set(Calendar.HOUR_OF_DAY, 23);
+                            calendar.set(Calendar.MINUTE, 59);
+                            calendar.set(Calendar.SECOND, 59);
+                            int millis =
+                                    (precision >= 3)
+                                            ? 999
+                                            : ((precision == 2)
+                                                    ? 990
+                                                    : ((precision == 1) ? 900 
: 0));
+                            calendar.set(Calendar.MILLISECOND, millis);
+                        } else {
+                            calendar.set(Calendar.YEAR, 1);
+                            calendar.set(Calendar.MONTH, 0);
+                            calendar.set(Calendar.DAY_OF_MONTH, 1);
+                            calendar.set(Calendar.HOUR_OF_DAY, 0);
+                            calendar.set(Calendar.MINUTE, 0);
+                            calendar.set(Calendar.SECOND, 0);
+                            calendar.set(Calendar.MILLISECOND, 0);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+                return calendar;
+
+            default:
+                throw Util.unexpected(this);
+        }
+    }
+
+    /**
+     * Returns the minimum precision (or length) allowed for this type, or -1 
if precision/length
+     * are not applicable for this type.
+     *
+     * @return Minimum allowed precision
+     */
+    public int getMinPrecision() {
+        switch (this) {
+            case DECIMAL:
+            case VARCHAR:
+            case CHAR:
+            case VARBINARY:
+            case BINARY:
+            case TIME:
+            case TIME_WITH_LOCAL_TIME_ZONE:
+            case TIMESTAMP:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return 1;
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+            case INTERVAL_SECOND:
+                return MIN_INTERVAL_START_PRECISION;
+            default:
+                return -1;
+        }
+    }
+
+    /**
+     * Returns the minimum scale (or fractional second precision in the case 
of intervals) allowed
+     * for this type, or -1 if precision/length are not applicable for this 
type.
+     *
+     * @return Minimum allowed scale
+     */
+    public int getMinScale() {
+        switch (this) {
+            // TODO: Minimum numeric scale for decimal
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+            case INTERVAL_SECOND:
+                return MIN_INTERVAL_FRACTIONAL_SECOND_PRECISION;
+            default:
+                return -1;
+        }
+    }
+
+    /**
+     * Returns {@code HOUR} for {@code HOUR TO SECOND} and {@code HOUR}, 
{@code SECOND} for {@code
+     * SECOND}.
+     */
+    public TimeUnit getStartUnit() {
+        switch (this) {
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+                return TimeUnit.YEAR;
+            case INTERVAL_MONTH:
+                return TimeUnit.MONTH;
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+                return TimeUnit.DAY;
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+                return TimeUnit.HOUR;
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+                return TimeUnit.MINUTE;
+            case INTERVAL_SECOND:
+                return TimeUnit.SECOND;
+            default:
+                throw new AssertionError(this);
+        }
+    }
+
+    /** Returns {@code SECOND} for both {@code HOUR TO SECOND} and {@code 
SECOND}. */
+    public TimeUnit getEndUnit() {
+        switch (this) {
+            case INTERVAL_YEAR:
+                return TimeUnit.YEAR;
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+                return TimeUnit.MONTH;
+            case INTERVAL_DAY:
+                return TimeUnit.DAY;
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_HOUR:
+                return TimeUnit.HOUR;
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_MINUTE:
+                return TimeUnit.MINUTE;
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE_SECOND:
+            case INTERVAL_SECOND:
+                return TimeUnit.SECOND;
+            default:
+                throw new AssertionError(this);
+        }
+    }
+
+    public boolean isYearMonth() {
+        switch (this) {
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /** Limit. */
+    public enum Limit {
+        ZERO,
+        UNDERFLOW,
+        OVERFLOW
+    }
+
+    private static @Nullable BigDecimal getNumericLimit(
+            int radix, int exponent, boolean sign, Limit limit, boolean 
beyond) {
+        switch (limit) {
+            case OVERFLOW:
+
+                // 2-based schemes run from -2^(N-1) to 2^(N-1)-1 e.g. -128 to 
+127
+                // 10-based schemas run from -(10^N-1) to 10^N-1 e.g. -99 to 
+99
+                final BigDecimal bigRadix = BigDecimal.valueOf(radix);
+                if (radix == 2) {
+                    --exponent;
+                }
+                BigDecimal decimal = bigRadix.pow(exponent);
+                if (sign || (radix != 2)) {
+                    decimal = decimal.subtract(BigDecimal.ONE);
+                }
+                if (beyond) {
+                    decimal = decimal.add(BigDecimal.ONE);
+                }
+                if (!sign) {
+                    decimal = decimal.negate();
+                }
+                return decimal;
+            case UNDERFLOW:
+                return beyond ? null : (sign ? BigDecimal.ONE : 
BigDecimal.ONE.negate());
+            case ZERO:
+                return BigDecimal.ZERO;
+            default:
+                throw Util.unexpected(limit);
+        }
+    }
+
+    public SqlLiteral createLiteral(Object o, SqlParserPos pos) {
+        switch (this) {
+            case BOOLEAN:
+                return SqlLiteral.createBoolean((Boolean) o, pos);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case DECIMAL:
+                return SqlLiteral.createExactNumeric(o.toString(), pos);
+            case VARCHAR:
+            case CHAR:
+                return SqlLiteral.createCharString((String) o, pos);
+            case VARBINARY:
+            case BINARY:
+                return SqlLiteral.createBinaryString((byte[]) o, pos);
+            case DATE:
+                return SqlLiteral.createDate(
+                        o instanceof Calendar
+                                ? DateString.fromCalendarFields((Calendar) o)
+                                : (DateString) o,
+                        pos);
+            case TIME:
+                return SqlLiteral.createTime(
+                        o instanceof Calendar
+                                ? TimeString.fromCalendarFields((Calendar) o)
+                                : (TimeString) o,
+                        0 /* todo */,
+                        pos);
+            case TIMESTAMP:
+                return SqlLiteral.createTimestamp(
+                        o instanceof Calendar
+                                ? 
TimestampString.fromCalendarFields((Calendar) o)
+                                : (TimestampString) o,
+                        0,
+                        pos);
+            default:
+                throw Util.unexpected(this);
+        }
+    }
+
+    /** Returns the name of this type. */
+    public String getName() {
+        return name();
+    }
+
+    /**
+     * Returns the name of this type, with underscores converted to spaces, 
for example "TIMESTAMP
+     * WITH LOCAL TIME ZONE", "DATE".
+     */
+    public String getSpaceName() {
+        return name().replace('_', ' ');
+    }
+
+    /**
+     * Flags indicating precision/scale combinations.
+     *
+     * <p>Note: for intervals:
+     *
+     * <ul>
+     *   <li>precision = start (leading field) precision
+     *   <li>scale = fractional second precision
+     * </ul>
+     */
+    private interface PrecScale {
+        int NO_NO = 1;
+        int YES_NO = 2;
+        int YES_YES = 4;
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index eb9f593c3..faaa70028 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
+import org.apache.flink.cdc.common.types.variant.Variant;
 import org.apache.flink.cdc.common.utils.DateTimeUtils;
 
 import org.slf4j.Logger;
@@ -1096,4 +1098,37 @@ public class SystemFunctionUtils {
         }
         return universalCompares(lhs, rhs) <= 0;
     }
+
+    public static Variant tryParseJson(String jsonStr) {
+        return tryParseJson(jsonStr, false);
+    }
+
+    public static Variant tryParseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonStr, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
+    public static Variant parseJson(String jsonStr) {
+        return parseJson(jsonStr, false);
+    }
+
+    public static Variant parseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonStr, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            throw new IllegalArgumentException(
+                    String.format("Failed to parse json string: %s", jsonStr), 
e);
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 74dceba59..4017ba059 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -47,6 +47,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable.PARSE_JSON;
+import static 
org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable.TRY_PARSE_JSON;
+
 /**
  * Use Janino compiler to compiler the statement of flink cdc pipeline 
transform into the executable
  * code of Janino. For example, compiler 'string1 || string2' into 
'concat(string1, string2)'. The
@@ -446,6 +449,20 @@ public class JaninoCompiler {
             return new Java.MethodInvocation(
                     Location.NOWHERE, null, 
StringUtils.convertToCamelCase("CONCAT"), atoms);
         }
+        if 
(sqlBasicCall.getOperator().getName().equalsIgnoreCase(PARSE_JSON.getName())) {
+            return new Java.MethodInvocation(
+                    Location.NOWHERE,
+                    null,
+                    StringUtils.convertToCamelCase(PARSE_JSON.getName()),
+                    atoms);
+        }
+        if 
(sqlBasicCall.getOperator().getName().equalsIgnoreCase(TRY_PARSE_JSON.getName()))
 {
+            return new Java.MethodInvocation(
+                    Location.NOWHERE,
+                    null,
+                    StringUtils.convertToCamelCase(TRY_PARSE_JSON.getName()),
+                    atoms);
+        }
         throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index bfeba7ab3..106b50524 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -383,4 +383,30 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     OperandTypes.family(
                             SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
                     SqlFunctionCategory.USER_DEFINED_FUNCTION);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Variant functions
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final SqlFunction PARSE_JSON =
+            new SqlFunction(
+                    "PARSE_JSON",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.explicit(SqlTypeName.VARIANT),
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.BOOLEAN)),
+                    SqlFunctionCategory.USER_DEFINED_FUNCTION);
+
+    public static final SqlFunction TRY_PARSE_JSON =
+            new SqlFunction(
+                    "TRY_PARSE_JSON",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.explicit(SqlTypeName.VARIANT),
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.BOOLEAN)),
+                    SqlFunctionCategory.USER_DEFINED_FUNCTION);
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
index 8175a79ad..6121dbed6 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
@@ -51,6 +51,8 @@ import org.apache.flink.cdc.common.types.TinyIntType;
 import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
+import org.apache.flink.cdc.common.types.variant.Variant;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -123,6 +125,8 @@ public class DataTypeConverter {
                 return ArrayData.class;
             case MAP:
                 return MapData.class;
+            case VARIANT:
+                return Variant.class;
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
         }
@@ -407,6 +411,8 @@ public class DataTypeConverter {
                 return DataTypes.MAP(
                         convertCalciteRelDataTypeToDataType(keyType),
                         convertCalciteRelDataTypeToDataType(valueType));
+            case VARIANT:
+                return DataTypes.VARIANT();
             case ROW:
             default:
                 throw new UnsupportedOperationException(
@@ -457,6 +463,8 @@ public class DataTypeConverter {
                 return convertToArray(value, (ArrayType) dataType);
             case MAP:
                 return convertToMap(value, (MapType) dataType);
+            case VARIANT:
+                return convertToVariant(value);
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
         }
@@ -699,6 +707,21 @@ public class DataTypeConverter {
         throw new IllegalArgumentException("Unable to convert to MapData: " + 
obj);
     }
 
+    private static Object convertToVariant(Object obj) {
+        if (obj instanceof Variant) {
+            return obj;
+        }
+        if (obj instanceof String) {
+            try {
+                return BinaryVariantInternalBuilder.parseJson((String) obj, 
false);
+            } catch (Throwable e) {
+                throw new IllegalArgumentException(
+                        String.format("Failed to parse json string: %s", obj), 
e);
+            }
+        }
+        throw new IllegalArgumentException("Unable to convert to Variant: " + 
obj);
+    }
+
     private static Object convertToMapOriginal(Object obj, MapType mapType) {
         if (obj instanceof MapData) {
             MapData mapData = (MapData) obj;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
index d1ec296db..c688569b3 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
@@ -18,6 +18,8 @@
 package org.apache.flink.cdc.runtime.parser;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
+import org.apache.flink.cdc.common.types.variant.Variant;
 
 import org.assertj.core.api.Assertions;
 import org.codehaus.commons.compiler.CompileException;
@@ -113,7 +115,7 @@ class JaninoCompilerTest {
     }
 
     @Test
-    void testBuildInFunction() throws InvocationTargetException {
+    void testBuildInFunction() throws InvocationTargetException, IOException {
         String expression = "ceil(2.4)";
         List<String> columnNames = new ArrayList<>();
         List<Class<?>> paramTypes = new ArrayList<>();
@@ -126,6 +128,42 @@ class JaninoCompilerTest {
                         Double.class);
         Object evaluate = expressionEvaluator.evaluate(params.toArray());
         Assertions.assertThat(evaluate).isEqualTo(3.0);
+
+        String jsonStr =
+                
"{\"name\":\"张三\",\"age\":30,\"is_active\":true,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"address\":{\"street\":\"MainSt\",\"city\":\"Beijing\",\"zip\":\"100000\"}}";
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("parseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {jsonStr});
+        Assertions.assertThat(evaluate)
+                .isEqualTo(BinaryVariantInternalBuilder.parseJson(jsonStr, 
false));
+
+        final String invalidJsonStr = "invalidJson";
+        Assertions.assertThatThrownBy(
+                        () -> {
+                            JaninoCompiler.compileExpression(
+                                            JaninoCompiler.loadSystemFunction(
+                                                    "parseJson(testJsonStr)"),
+                                            List.of("testJsonStr"),
+                                            List.of(String.class),
+                                            Variant.class)
+                                    .evaluate(new Object[] {invalidJsonStr});
+                        })
+                .cause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Failed to parse json string");
+
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("tryParseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {invalidJsonStr});
+        Assertions.assertThat(evaluate).isEqualTo(null);
     }
 
     @Test
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 6aa6a626b..d2b8ed120 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -388,6 +388,8 @@ class TransformParserTest {
                 "cast(CURRENT_TIMESTAMP as TIMESTAMP)",
                 "castToTimestamp(currentTimestamp(__epoch_time__), 
__time_zone__)");
         testFilterExpression("cast(dt as TIMESTAMP)", "castToTimestamp(dt, 
__time_zone__)");
+        testFilterExpression("parse_json(jsonStr)", "parseJson(jsonStr)");
+        testFilterExpression("try_parse_json(jsonStr)", 
"tryParseJson(jsonStr)");
     }
 
     @Test

Reply via email to