This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a54c2e7  [FLINK-25282][table-planner][table-runtime] Move runtime code 
from table-planner to table-runtime
a54c2e7 is described below

commit a54c2e75eb8f214552643adbdc2a5ce2abc2506c
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Dec 9 12:30:18 2021 +0100

    [FLINK-25282][table-planner][table-runtime] Move runtime code from 
table-planner to table-runtime
    
    - Removes the dependency on SqlFunctions from Calcite
    - Move DefaultWatermarkGeneratorSupplier to runtime and rename to 
GeneratedWatermarkGeneratorSupplier
    - Remove dependency on BuiltInMethod from Calcite for floor, ceil and abs
    - Copy from Calcite json functions in SqlJsonUtils. Now jackson and 
jsonpath are shipped by runtime.
    - Move various Flink functions
    
    This closes #18108.
---
 .../apache/flink/table/functions/SqlLikeUtils.java |  24 +-
 .../apache/flink/table/utils/DateTimeUtils.java    |  89 ++++
 flink-table/flink-table-planner/pom.xml            |   8 +-
 .../abilities/source/WatermarkPushDownSpec.java    |  92 +---
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |  75 +--
 .../stream/StreamExecLegacyTableSourceScan.java    |  82 +---
 .../src/main/resources/META-INF/NOTICE             |   4 -
 .../table/planner/codegen/ExprCodeGenerator.scala  |  18 +-
 .../table/planner/codegen/GenerateUtils.scala      |  32 +-
 .../planner/codegen/calls/BuiltInMethods.scala     |  86 ++--
 .../planner/codegen/calls/FloorCeilCallGen.scala   |  14 +-
 .../planner/codegen/calls/FunctionGenerator.scala  |  81 ++--
 .../planner/codegen/calls/JsonValueCallGen.scala   |  17 +-
 .../table/planner/codegen/calls/LikeCallGen.scala  |   9 +-
 .../planner/codegen/calls/StringCallGen.scala      |   7 +-
 flink-table/flink-table-runtime/pom.xml            |  38 ++
 .../table/runtime/functions/SqlFunctionUtils.java  | 148 ++++++
 .../table/runtime/functions/SqlJsonUtils.java      | 517 ++++++++++++++++++++-
 .../GeneratedWatermarkGeneratorSupplier.java       | 109 +++++
 .../join/interval/FilterAllFlatMapFunction.java    |  48 ++
 .../join/interval/PaddingLeftMapFunction.java      |  53 +++
 .../join/interval/PaddingRightMapFunction.java     |  53 +++
 .../PeriodicWatermarkAssignerWrapper.java          |  57 +++
 .../PunctuatedWatermarkAssignerWrapper.java        |  74 +++
 .../src/main/resources/META-INF/NOTICE             |   9 +
 flink-table/pom.xml                                |   5 +-
 26 files changed, 1393 insertions(+), 356 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
index aa22466..5c3efaf 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
@@ -46,12 +46,30 @@ public class SqlLikeUtils {
 
     private SqlLikeUtils() {}
 
-    /** SQL like function with escape. */
+    /** SQL {@code LIKE} function. */
+    public static boolean like(String s, String pattern) {
+        final String regex = sqlToRegexLike(pattern, null);
+        return Pattern.matches(regex, s);
+    }
+
+    /** SQL {@code LIKE} function with escape. */
     public static boolean like(String s, String pattern, String escape) {
         final String regex = sqlToRegexLike(pattern, escape);
         return Pattern.matches(regex, s);
     }
 
+    /** SQL {@code SIMILAR} function. */
+    public static boolean similar(String s, String pattern) {
+        final String regex = sqlToRegexSimilar(pattern, null);
+        return Pattern.matches(regex, s);
+    }
+
+    /** SQL {@code SIMILAR} function with escape. */
+    public static boolean similar(String s, String pattern, String escape) {
+        final String regex = sqlToRegexSimilar(pattern, escape);
+        return Pattern.matches(regex, s);
+    }
+
     /** Translates a SQL LIKE pattern to Java regex pattern, with optional 
escape string. */
     public static String sqlToRegexLike(String sqlPattern, CharSequence 
escapeStr) {
         final char escapeChar;
@@ -192,7 +210,7 @@ public class SqlLikeUtils {
     }
 
     /** Translates a SQL SIMILAR pattern to Java regex pattern, with optional 
escape string. */
-    static String sqlToRegexSimilar(String sqlPattern, CharSequence escapeStr) 
{
+    public static String sqlToRegexSimilar(String sqlPattern, CharSequence 
escapeStr) {
         final char escapeChar;
         if (escapeStr != null) {
             if (escapeStr.length() != 1) {
@@ -206,7 +224,7 @@ public class SqlLikeUtils {
     }
 
     /** Translates SQL SIMILAR pattern to Java regex pattern. */
-    static String sqlToRegexSimilar(String sqlPattern, char escapeChar) {
+    public static String sqlToRegexSimilar(String sqlPattern, char escapeChar) 
{
         similarEscapeRuleChecking(sqlPattern, escapeChar);
 
         boolean insideCharacterEnumeration = false;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 566956a..51b0090 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -1624,6 +1624,95 @@ public class DateTimeUtils {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    // ADD/REMOVE months
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Adds a given number of months to a timestamp, represented as the number 
of milliseconds since
+     * the epoch.
+     */
+    public static long addMonths(long timestamp, int m) {
+        final long millis = DateTimeUtils.floorMod(timestamp, 
DateTimeUtils.MILLIS_PER_DAY);
+        timestamp -= millis;
+        final long x = addMonths((int) (timestamp / 
DateTimeUtils.MILLIS_PER_DAY), m);
+        return x * DateTimeUtils.MILLIS_PER_DAY + millis;
+    }
+
+    /**
+     * Adds a given number of months to a date, represented as the number of 
days since the epoch.
+     */
+    public static int addMonths(int date, int m) {
+        int y0 = (int) extractFromDate(TimeUnitRange.YEAR, date);
+        int m0 = (int) extractFromDate(TimeUnitRange.MONTH, date);
+        int d0 = (int) extractFromDate(TimeUnitRange.DAY, date);
+        m0 += m;
+        int deltaYear = (int) DateTimeUtils.floorDiv(m0, 12);
+        y0 += deltaYear;
+        m0 = (int) DateTimeUtils.floorMod(m0, 12);
+        if (m0 == 0) {
+            y0 -= 1;
+            m0 += 12;
+        }
+
+        int last = lastDay(y0, m0);
+        if (d0 > last) {
+            d0 = last;
+        }
+        return ymdToUnixDate(y0, m0, d0);
+    }
+
+    private static int lastDay(int y, int m) {
+        switch (m) {
+            case 2:
+                return y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) ? 29 : 28;
+            case 4:
+            case 6:
+            case 9:
+            case 11:
+                return 30;
+            default:
+                return 31;
+        }
+    }
+
+    /**
+     * Finds the number of months between two dates, each represented as the 
number of days since
+     * the epoch.
+     */
+    public static int subtractMonths(int date0, int date1) {
+        if (date0 < date1) {
+            return -subtractMonths(date1, date0);
+        }
+        // Start with an estimate.
+        // Since no month has more than 31 days, the estimate is <= the true 
value.
+        int m = (date0 - date1) / 31;
+        while (true) {
+            int date2 = addMonths(date1, m);
+            if (date2 >= date0) {
+                return m;
+            }
+            int date3 = addMonths(date1, m + 1);
+            if (date3 > date0) {
+                return m;
+            }
+            ++m;
+        }
+    }
+
+    public static int subtractMonths(long t0, long t1) {
+        final long millis0 = DateTimeUtils.floorMod(t0, 
DateTimeUtils.MILLIS_PER_DAY);
+        final int d0 = (int) DateTimeUtils.floorDiv(t0 - millis0, 
DateTimeUtils.MILLIS_PER_DAY);
+        final long millis1 = DateTimeUtils.floorMod(t1, 
DateTimeUtils.MILLIS_PER_DAY);
+        final int d1 = (int) DateTimeUtils.floorDiv(t1 - millis1, 
DateTimeUtils.MILLIS_PER_DAY);
+        int x = subtractMonths(d0, d1);
+        final long d2 = addMonths(d1, x);
+        if (d2 == d0 && millis0 < millis1) {
+            --x;
+        }
+        return x;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
     // TimeUnit and TimeUnitRange enums
     // 
--------------------------------------------------------------------------------------------
 
diff --git a/flink-table/flink-table-planner/pom.xml 
b/flink-table/flink-table-planner/pom.xml
index 8dd4dc9..d89cfc3 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -389,10 +389,6 @@ under the License.
                                                                        
<include>com.esri.geometry:esri-geometry-api</include>
                                                                        
<include>com.google.guava:guava</include>
                                                                        
<include>com.google.guava:failureaccess</include>
-                                                                       
<include>com.jayway.jsonpath:json-path</include>
-                                                                       
<include>com.fasterxml.jackson.core:jackson-core</include>
-                                                                       
<include>com.fasterxml.jackson.core:jackson-databind</include>
-                                                                       
<include>com.fasterxml.jackson.core:jackson-annotations</include>
                                                                        
<include>commons-codec:commons-codec</include>
                                                                        
<include>commons-io:commons-io</include>
 
@@ -426,12 +422,14 @@ under the License.
                                                                        
<shadedPattern>org.apache.flink.calcite.shaded.com.google</shadedPattern>
                                                                </relocation>
                                                                <relocation>
+                                                                       <!-- 
Packaged in runtime -->
                                                                        
<pattern>com.jayway</pattern>
                                                                        
<shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern>
                                                                </relocation>
                                                                <relocation>
+                                                                       <!-- 
Packaged in runtime -->
                                                                        
<pattern>com.fasterxml</pattern>
-                                                                       
<shadedPattern>org.apache.flink.calcite.shaded.com.fasterxml</shadedPattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>org.apache.commons.codec</pattern>
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
index 2648d31..8d20453 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.table.planner.plan.abilities.source;
 
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableException;
@@ -32,6 +29,7 @@ import 
org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -41,9 +39,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.calcite.rex.RexNode;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import scala.Option;
 
@@ -86,7 +81,7 @@ public class WatermarkPushDownSpec extends 
SourceAbilitySpecBase {
             Configuration configuration = 
context.getTableConfig().getConfiguration();
 
             WatermarkGeneratorSupplier<RowData> supplier =
-                    new DefaultWatermarkGeneratorSupplier(
+                    new GeneratedWatermarkGeneratorSupplier(
                             configuration, generatedWatermarkGenerator);
 
             WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
@@ -116,87 +111,4 @@ public class WatermarkPushDownSpec extends 
SourceAbilitySpecBase {
         }
         return String.format("watermark=[%s]", expressionStr);
     }
-
-    /**
-     * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to 
create {@link
-     * WatermarkGenerator}. The {@link DefaultWatermarkGeneratorSupplier} uses 
the {@link
-     * WatermarkGeneratorSupplier.Context} to init the generated watermark 
generator.
-     */
-    public static class DefaultWatermarkGeneratorSupplier
-            implements WatermarkGeneratorSupplier<RowData> {
-        private static final long serialVersionUID = 1L;
-
-        private final Configuration configuration;
-        private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
-
-        public DefaultWatermarkGeneratorSupplier(
-                Configuration configuration,
-                GeneratedWatermarkGenerator generatedWatermarkGenerator) {
-            this.configuration = configuration;
-            this.generatedWatermarkGenerator = generatedWatermarkGenerator;
-        }
-
-        @Override
-        public WatermarkGenerator<RowData> createWatermarkGenerator(Context 
context) {
-
-            List<Object> references =
-                    new 
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
-            references.add(context);
-
-            org.apache.flink.table.runtime.generated.WatermarkGenerator 
innerWatermarkGenerator =
-                    new GeneratedWatermarkGenerator(
-                                    generatedWatermarkGenerator.getClassName(),
-                                    generatedWatermarkGenerator.getCode(),
-                                    references.toArray(),
-                                    configuration)
-                            
.newInstance(Thread.currentThread().getContextClassLoader());
-
-            try {
-                innerWatermarkGenerator.open(configuration);
-            } catch (Exception e) {
-                throw new RuntimeException("Fail to instantiate generated 
watermark generator.", e);
-            }
-            return new 
DefaultWatermarkGeneratorSupplier.DefaultWatermarkGenerator(
-                    innerWatermarkGenerator);
-        }
-
-        /**
-         * Wrapper of the code-generated {@link
-         * org.apache.flink.table.runtime.generated.WatermarkGenerator}.
-         */
-        public static class DefaultWatermarkGenerator implements 
WatermarkGenerator<RowData> {
-            private static final long serialVersionUID = 1L;
-
-            private final 
org.apache.flink.table.runtime.generated.WatermarkGenerator
-                    innerWatermarkGenerator;
-            private Long currentWatermark = Long.MIN_VALUE;
-
-            public DefaultWatermarkGenerator(
-                    org.apache.flink.table.runtime.generated.WatermarkGenerator
-                            watermarkGenerator) {
-                this.innerWatermarkGenerator = watermarkGenerator;
-            }
-
-            @Override
-            public void onEvent(RowData event, long eventTimestamp, 
WatermarkOutput output) {
-                try {
-                    Long watermark = 
innerWatermarkGenerator.currentWatermark(event);
-                    if (watermark != null) {
-                        currentWatermark = watermark;
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Generated WatermarkGenerator fails to 
generate for row: %s.",
-                                    event),
-                            e);
-                }
-            }
-
-            @Override
-            public void onPeriodicEmit(WatermarkOutput output) {
-                output.emitWatermark(new Watermark(currentWatermark));
-            }
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index b5a8163..036d71e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamFlatMap;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -47,12 +43,14 @@ import 
org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
 import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import 
org.apache.flink.table.runtime.operators.join.interval.FilterAllFlatMapFunction;
 import 
org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
+import 
org.apache.flink.table.runtime.operators.join.interval.PaddingLeftMapFunction;
+import 
org.apache.flink.table.runtime.operators.join.interval.PaddingRightMapFunction;
 import 
org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin;
 import 
org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
@@ -196,73 +194,6 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
         }
     }
 
-    private static class FilterAllFlatMapFunction
-            implements FlatMapFunction<RowData, RowData>, 
ResultTypeQueryable<RowData> {
-        private static final long serialVersionUID = 1L;
-
-        private final InternalTypeInfo<RowData> outputTypeInfo;
-
-        public FilterAllFlatMapFunction(InternalTypeInfo<RowData> 
inputTypeInfo) {
-            this.outputTypeInfo = inputTypeInfo;
-        }
-
-        @Override
-        public void flatMap(RowData value, Collector<RowData> out) {}
-
-        @Override
-        public TypeInformation<RowData> getProducedType() {
-            return outputTypeInfo;
-        }
-    }
-
-    private static class PaddingLeftMapFunction
-            implements MapFunction<RowData, RowData>, 
ResultTypeQueryable<RowData> {
-        private static final long serialVersionUID = 1L;
-
-        private final OuterJoinPaddingUtil paddingUtil;
-        private final InternalTypeInfo<RowData> outputTypeInfo;
-
-        public PaddingLeftMapFunction(
-                OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> 
returnType) {
-            this.paddingUtil = paddingUtil;
-            this.outputTypeInfo = returnType;
-        }
-
-        @Override
-        public RowData map(RowData value) {
-            return paddingUtil.padLeft(value);
-        }
-
-        @Override
-        public TypeInformation<RowData> getProducedType() {
-            return outputTypeInfo;
-        }
-    }
-
-    private static class PaddingRightMapFunction
-            implements MapFunction<RowData, RowData>, 
ResultTypeQueryable<RowData> {
-        private static final long serialVersionUID = 1L;
-
-        private final OuterJoinPaddingUtil paddingUtil;
-        private final InternalTypeInfo<RowData> outputTypeInfo;
-
-        public PaddingRightMapFunction(
-                OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> 
returnType) {
-            this.paddingUtil = paddingUtil;
-            this.outputTypeInfo = returnType;
-        }
-
-        @Override
-        public RowData map(RowData value) {
-            return paddingUtil.padRight(value);
-        }
-
-        @Override
-        public TypeInformation<RowData> getProducedType() {
-            return outputTypeInfo;
-        }
-    }
-
     private Transformation<RowData> createNegativeWindowSizeJoin(
             JoinSpec joinSpec,
             Transformation<RowData> leftInputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
index 50b87ab..e8b182d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
@@ -25,12 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator;
@@ -41,6 +36,8 @@ import org.apache.flink.table.planner.plan.utils.ScanUtil;
 import org.apache.flink.table.planner.sources.TableSourceUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper;
+import 
org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
@@ -48,9 +45,7 @@ import 
org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
 import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
 import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
@@ -176,77 +171,4 @@ public class StreamExecLegacyTableSourceScan extends 
CommonExecLegacyTableSource
                 .name(tableSource.explainSource())
                 .getTransformation();
     }
-
-    /** Generates periodic watermarks based on a {@link 
PeriodicWatermarkAssigner}. */
-    private static class PeriodicWatermarkAssignerWrapper
-            implements AssignerWithPeriodicWatermarks<RowData> {
-        private static final long serialVersionUID = 1L;
-        private final PeriodicWatermarkAssigner assigner;
-        private final int timeFieldIdx;
-
-        /**
-         * @param timeFieldIdx the index of the rowtime attribute.
-         * @param assigner the watermark assigner.
-         */
-        private PeriodicWatermarkAssignerWrapper(
-                PeriodicWatermarkAssigner assigner, int timeFieldIdx) {
-            this.assigner = assigner;
-            this.timeFieldIdx = timeFieldIdx;
-        }
-
-        @Nullable
-        @Override
-        public Watermark getCurrentWatermark() {
-            return assigner.getWatermark();
-        }
-
-        @Override
-        public long extractTimestamp(RowData row, long recordTimestamp) {
-            long timestamp = row.getTimestamp(timeFieldIdx, 
3).getMillisecond();
-            assigner.nextTimestamp(timestamp);
-            return 0;
-        }
-    }
-
-    /** Generates periodic watermarks based on a 
[[PunctuatedWatermarkAssigner]]. */
-    private static class PunctuatedWatermarkAssignerWrapper
-            implements AssignerWithPunctuatedWatermarks<RowData> {
-        private static final long serialVersionUID = 1L;
-        private final PunctuatedWatermarkAssigner assigner;
-        private final int timeFieldIdx;
-        private final DataFormatConverters.DataFormatConverter<RowData, Row> 
converter;
-
-        /**
-         * @param timeFieldIdx the index of the rowtime attribute.
-         * @param assigner the watermark assigner.
-         * @param sourceType the type of source
-         */
-        @SuppressWarnings("unchecked")
-        private PunctuatedWatermarkAssignerWrapper(
-                PunctuatedWatermarkAssigner assigner, int timeFieldIdx, 
DataType sourceType) {
-            this.assigner = assigner;
-            this.timeFieldIdx = timeFieldIdx;
-            DataType originDataType;
-            if (sourceType instanceof FieldsDataType) {
-                originDataType = sourceType;
-            } else {
-                originDataType = DataTypes.ROW(DataTypes.FIELD("f0", 
sourceType));
-            }
-            converter =
-                    DataFormatConverters.getConverterForDataType(
-                            originDataType.bridgedTo(Row.class));
-        }
-
-        @Nullable
-        @Override
-        public Watermark checkAndGetNextWatermark(RowData row, long 
extractedTimestamp) {
-            long timestamp = row.getLong(timeFieldIdx);
-            return assigner.getWatermark(converter.toExternal(row), timestamp);
-        }
-
-        @Override
-        public long extractTimestamp(RowData element, long recordTimestamp) {
-            return 0;
-        }
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index 08a07c7..2b53463 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -9,10 +9,6 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.google.guava:guava:29.0-jre
 - com.google.guava:failureaccess:1.0.1
 - com.esri.geometry:esri-geometry-api:2.2.0
-- com.fasterxml.jackson.core:jackson-annotations:2.13.0
-- com.fasterxml.jackson.core:jackson-core:2.13.0
-- com.fasterxml.jackson.core:jackson-databind:2.13.0
-- com.jayway.jsonpath:json-path:2.4.0
 - org.apache.calcite:calcite-core:1.26.0
 - org.apache.calcite:calcite-linq4j:1.26.0
 - org.apache.calcite.avatica:avatica-core:1.17.0
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 6009cc1..f083c39 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -18,6 +18,10 @@
 
 package org.apache.flink.table.planner.codegen
 
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
+import org.apache.calcite.sql.{SqlKind, SqlOperator}
+import org.apache.calcite.util.{Sarg, TimestampString}
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.data.RowData
@@ -42,10 +46,6 @@ import 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTem
 import org.apache.flink.table.types.logical._
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, 
isCompositeType}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
-import org.apache.calcite.sql.{SqlKind, SqlOperator}
-import org.apache.calcite.util.{Sarg, TimestampString}
 
 import scala.collection.JavaConversions._
 
@@ -418,15 +418,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
       case _ =>
         literal.getValue3
     }
-    // Make sure to convert avatica time types to flink internal types
-    val convertedValue = value match {
-      case tu: org.apache.calcite.avatica.util.TimeUnit =>
-        org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name())
-      case tur: org.apache.calcite.avatica.util.TimeUnitRange =>
-        
org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name())
-      case _ => value
-    }
-    generateLiteral(ctx, resultType, convertedValue)
+    generateLiteral(ctx, resultType, value)
   }
 
   override def visitCorrelVariable(correlVariable: RexCorrelVariable): 
GeneratedExpression = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 3fa5a27..fcf92b4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType => AtomicTypeInfo}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError, 
JsonQueryWrapper}
 import org.apache.flink.table.data._
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.utils.JoinedRowData
@@ -30,13 +30,14 @@ import 
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL,
 import org.apache.flink.table.planner.codegen.calls.CurrentTimePointCallGen
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec
 import org.apache.flink.table.planner.plan.utils.SortUtil
+import 
org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime
 import 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, 
isReference, isTemporal}
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldCount, 
getFieldTypes}
-import 
org.apache.flink.table.planner.utils.TimestampStringUtils.toLocalDateTime
 
 import org.apache.calcite.avatica.util.ByteString
+import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, 
SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior}
 import org.apache.calcite.util.TimestampString
 import org.apache.commons.lang3.StringEscapeUtils
 
@@ -443,13 +444,34 @@ object GenerateUtils {
     }
   }
 
-  def generateSymbol(enum: Enum[_]): GeneratedExpression = {
+  def generateSymbol(value: Enum[_]): GeneratedExpression = {
+    // Make sure to convert calcite enum types to flink types
+    val convertedValue = value match {
+      case tu: org.apache.calcite.avatica.util.TimeUnit =>
+        org.apache.flink.table.utils.DateTimeUtils.TimeUnit.valueOf(tu.name())
+      case tur: org.apache.calcite.avatica.util.TimeUnitRange =>
+        
org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange.valueOf(tur.name())
+      case jeeb: SqlJsonExistsErrorBehavior =>
+        JsonExistsOnError.valueOf(jeeb.name())
+      case jqeeb: SqlJsonQueryEmptyOrErrorBehavior =>
+        JsonQueryOnEmptyOrError.valueOf(jqeeb.name())
+      case jqwb: SqlJsonQueryWrapperBehavior => jqwb match {
+        case SqlJsonQueryWrapperBehavior.WITHOUT_ARRAY =>
+          JsonQueryWrapper.WITHOUT_ARRAY
+        case SqlJsonQueryWrapperBehavior.WITH_CONDITIONAL_ARRAY =>
+          JsonQueryWrapper.CONDITIONAL_ARRAY
+        case SqlJsonQueryWrapperBehavior.WITH_UNCONDITIONAL_ARRAY =>
+          JsonQueryWrapper.UNCONDITIONAL_ARRAY
+      }
+      case _ => value
+    }
+
     GeneratedExpression(
-      qualifyEnum(enum),
+      qualifyEnum(convertedValue),
       NEVER_NULL,
       NO_CODE,
       new SymbolType(false),
-      literalValue = Some(enum))
+      literalValue = Some(convertedValue))
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 0ee5b96..0b5e6a7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -18,14 +18,15 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
+import org.apache.flink.table.api.{JsonExistsOnError, JsonQueryOnEmptyOrError, 
JsonQueryWrapper, JsonValueOnEmptyOrError}
+import org.apache.flink.table.data.binary.{BinaryStringData, 
BinaryStringDataUtil}
 import org.apache.flink.table.data.{DecimalData, DecimalDataUtils, 
TimestampData}
+import org.apache.flink.table.functions.SqlLikeUtils
 import org.apache.flink.table.runtime.functions._
 import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
+
 import org.apache.calcite.linq4j.tree.Types
-import org.apache.calcite.runtime.{JsonFunctions, SqlFunctions}
-import org.apache.calcite.sql.{SqlJsonExistsErrorBehavior, 
SqlJsonQueryEmptyOrErrorBehavior, SqlJsonQueryWrapperBehavior, 
SqlJsonValueEmptyOrErrorBehavior}
-import org.apache.flink.table.data.binary.{BinaryStringData, 
BinaryStringDataUtil}
 
 import java.lang.reflect.Method
 import java.lang.{Byte => JByte, Integer => JInteger, Long => JLong, Short => 
JShort}
@@ -82,17 +83,22 @@ object BuiltInMethods {
   val POWER_DEC_NUM = Types.lookupMethod(
     classOf[SqlFunctionUtils], "power", classOf[DecimalData], classOf[Double])
 
-
   // TRIGONOMETRIC FUNCTIONS
 
   val LN = Types.lookupMethod(classOf[SqlFunctionUtils], "log", 
classOf[Double])
 
   val LN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "log", 
classOf[DecimalData])
 
-  val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double])
+  val ABS = Types.lookupMethod(classOf[SqlFunctionUtils], "abs", 
classOf[Double])
   val ABS_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "abs", 
classOf[DecimalData])
 
+  val FLOOR = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", 
classOf[Double])
+  val FLOOR_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", 
classOf[Int],
+    classOf[Int])
   val FLOOR_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "floor", 
classOf[DecimalData])
+  val CEIL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", 
classOf[Double])
+  val CEIL_INTEGRAL = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", 
classOf[Int],
+    classOf[Int])
   val CEIL_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "ceil", 
classOf[DecimalData])
 
   val SIN = Types.lookupMethod(classOf[Math], "sin", classOf[Double])
@@ -104,7 +110,7 @@ object BuiltInMethods {
   val TAN = Types.lookupMethod(classOf[Math], "tan", classOf[Double])
   val TAN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "tan", 
classOf[DecimalData])
 
-  val COT = Types.lookupMethod(classOf[SqlFunctions], "cot", classOf[Double])
+  val COT = Types.lookupMethod(classOf[SqlFunctionUtils], "cot", 
classOf[Double])
   val COT_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "cot", 
classOf[DecimalData])
 
   val ASIN = Types.lookupMethod(classOf[Math], "asin", classOf[Double])
@@ -147,10 +153,12 @@ object BuiltInMethods {
   val SIGN_LONG = Types.lookupMethod(classOf[JLong], "signum", classOf[Long])
   val SIGN_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "sign", 
classOf[DecimalData])
 
-  val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "sround", 
classOf[Double],
+  val ROUND_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", 
classOf[Double],
+    classOf[Int])
+  val ROUND_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", 
classOf[Int],
+    classOf[Int])
+  val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctionUtils], "sround", 
classOf[Long],
     classOf[Int])
-  val ROUND_INT = Types.lookupMethod(classOf[SqlFunctions], "sround", 
classOf[Int], classOf[Int])
-  val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctions], "sround", 
classOf[Long], classOf[Int])
   val ROUND_BYTE = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
     classOf[Byte], classOf[Int])
   val ROUND_SHORT = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
@@ -399,24 +407,24 @@ object BuiltInMethods {
   val STRING_TO_TIME = Types.lookupMethod(
     classOf[DateTimeUtils], "parseTime", classOf[String])
 
-  val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], 
"struncate",
+  val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Double])
   val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Float])
-  val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+  val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Int])
-  val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], 
"struncate",
+  val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Long])
   val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[DecimalData])
 
-  val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+  val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Double], classOf[Int])
   val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Float], classOf[Int])
-  val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+  val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
     classOf[Int], classOf[Int])
-  val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+  val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctionUtils], 
"struncate",
     classOf[Long], classOf[Int])
   val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
     classOf[DecimalData], classOf[Int])
@@ -439,29 +447,41 @@ object BuiltInMethods {
   val TRUNCATE_SQL_TIMESTAMP = Types.lookupMethod(classOf[DateTimeUtils], 
"truncate",
     classOf[TimestampData], classOf[Int])
 
-  val ADD_MONTHS = Types.lookupMethod(classOf[SqlFunctions], "addMonths",
+  val ADD_MONTHS = Types.lookupMethod(classOf[DateTimeUtils], "addMonths",
     classOf[Long], classOf[Int])
 
-  val SUBTRACT_MONTHS = Types.lookupMethod(classOf[SqlFunctions], 
"subtractMonths",
+  val SUBTRACT_MONTHS = Types.lookupMethod(classOf[DateTimeUtils], 
"subtractMonths",
     classOf[Long], classOf[Long])
 
   // JSON functions
 
-  val JSON_EXISTS = Types.lookupMethod(classOf[JsonFunctions], "jsonExists",
+  val JSON_EXISTS = Types.lookupMethod(classOf[SqlJsonUtils], "jsonExists",
     classOf[String], classOf[String])
 
-  val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[JsonFunctions], 
"jsonExists",
-    classOf[String], classOf[String], classOf[SqlJsonExistsErrorBehavior])
+  val JSON_EXISTS_ON_ERROR = Types.lookupMethod(classOf[SqlJsonUtils], 
"jsonExists",
+    classOf[String], classOf[String], classOf[JsonExistsOnError])
 
-  val JSON_VALUE = Types.lookupMethod(classOf[JsonFunctions], "jsonValue",
+  val JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "jsonValue",
     classOf[String], classOf[String],
-    classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any],
-    classOf[SqlJsonValueEmptyOrErrorBehavior], classOf[Any]
+    classOf[JsonValueOnEmptyOrError], classOf[Any],
+    classOf[JsonValueOnEmptyOrError], classOf[Any]
   )
 
-  val JSON_QUERY = Types.lookupMethod(classOf[JsonFunctions], "jsonQuery",
-    classOf[String], classOf[String], classOf[SqlJsonQueryWrapperBehavior],
-    classOf[SqlJsonQueryEmptyOrErrorBehavior], 
classOf[SqlJsonQueryEmptyOrErrorBehavior])
+  val JSON_QUERY = Types.lookupMethod(classOf[SqlJsonUtils], "jsonQuery",
+    classOf[String], classOf[String], classOf[JsonQueryWrapper],
+    classOf[JsonQueryOnEmptyOrError], classOf[JsonQueryOnEmptyOrError])
+
+  val IS_JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonValue",
+    classOf[String])
+
+  val IS_JSON_OBJECT = Types.lookupMethod(classOf[SqlJsonUtils], 
"isJsonObject",
+    classOf[String])
+
+  val IS_JSON_ARRAY = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonArray",
+    classOf[String])
+
+  val IS_JSON_SCALAR = Types.lookupMethod(classOf[SqlJsonUtils], 
"isJsonScalar",
+    classOf[String])
 
   // STRING functions
 
@@ -522,6 +542,20 @@ object BuiltInMethods {
   val STRING_DATA_TO_TIMESTAMP_WITH_ZONE = Types.lookupMethod(
     classOf[BinaryStringDataUtil], "toTimestamp", classOf[BinaryStringData], 
classOf[TimeZone])
 
+  val STRING_LIKE = Types.lookupMethod(
+    classOf[SqlLikeUtils], "like", classOf[String], classOf[String])
+
+  val STRING_LIKE_WITH_ESCAPE = Types.lookupMethod(
+    classOf[SqlLikeUtils], "like", classOf[String], classOf[String], 
classOf[String])
+
+  val STRING_SIMILAR = Types.lookupMethod(
+    classOf[SqlLikeUtils], "similar", classOf[String], classOf[String])
+
+  val STRING_SIMILAR_WITH_ESCAPE = Types.lookupMethod(
+    classOf[SqlLikeUtils], "similar", classOf[String], classOf[String], 
classOf[String])
+
+  val STRING_INITCAP = Types.lookupMethod(classOf[SqlFunctionUtils], 
"initcap", classOf[String])
+
   // DecimalData functions
 
   val DECIMAL_TO_DECIMAL = Types.lookupMethod(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
index e9e9df8..8333af7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala
@@ -22,7 +22,6 @@ import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{TIMESTAMP_DATA, getE
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
-
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange._
 
@@ -34,6 +33,8 @@ import java.util.TimeZone
   */
 class FloorCeilCallGen(
     arithmeticMethod: Method,
+    arithmeticIntegralMethod: Option[Method] = None,
+    decimalMethod: Option[Method] = None,
     temporalMethod: Option[Method] = None)
   extends MethodCallGen(arithmeticMethod) {
 
@@ -49,7 +50,7 @@ class FloorCeilCallGen(
         case LogicalTypeRoot.DECIMAL =>
           generateCallIfArgsNotNull(ctx, returnType, operands) {
             operandResultTerms =>
-              
s"${qualifyMethod(arithmeticMethod)}(${operandResultTerms.mkString(", ")})"
+              
s"${qualifyMethod(decimalMethod.get)}(${operandResultTerms.mkString(", ")})"
           }
         case _ =>
           operands.head // no floor/ceil necessary
@@ -98,13 +99,14 @@ class FloorCeilCallGen(
                 case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
                   val longTerm = s"${terms.head}.getMillisecond()"
                   s"""
-                     
|$TIMESTAMP_DATA.fromEpochMillis(${qualifyMethod(arithmeticMethod)}(
-                     |  $longTerm,
-                     |  (long) ${unit.startUnit.multiplier.intValue()}))
+                     |$TIMESTAMP_DATA.fromEpochMillis(
+                     |  ${qualifyMethod(arithmeticIntegralMethod.get)}(
+                     |    $longTerm,
+                     |    (long) ${unit.startUnit.multiplier.intValue()}))
                    """.stripMargin
                 case _ =>
                   s"""
-                     |${qualifyMethod(arithmeticMethod)}(
+                     |${qualifyMethod(arithmeticIntegralMethod.get)}(
                      |  ($internalType) ${terms.head},
                      |  ($internalType) 
${unit.startUnit.multiplier.intValue()})
                      |""".stripMargin
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index b7ce412..22cb1a9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
+import org.apache.calcite.sql.SqlOperator
 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.table.api.TableConfig
@@ -26,11 +27,7 @@ import 
org.apache.flink.table.runtime.types.PlannerTypeUtils.isPrimitive
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
 
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.util.BuiltInMethod
-
 import java.lang.reflect.Method
-
 import scala.collection.mutable
 
 class FunctionGenerator private(config: TableConfig) {
@@ -115,22 +112,22 @@ class FunctionGenerator private(config: TableConfig) {
   addSqlFunction(
     FLOOR,
     Seq(DOUBLE),
-    new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
+    new FloorCeilCallGen(BuiltInMethods.FLOOR))
 
-  addSqlFunction(
+  addSqlFunctionMethod(
     FLOOR,
     Seq(DECIMAL),
-    new FloorCeilCallGen(BuiltInMethods.FLOOR_DEC))
+    BuiltInMethods.FLOOR_DEC)
 
   addSqlFunction(
     CEIL,
     Seq(DOUBLE),
-    new FloorCeilCallGen(BuiltInMethod.CEIL.method))
+    new FloorCeilCallGen(BuiltInMethods.CEIL))
 
-  addSqlFunction(
+  addSqlFunctionMethod(
     CEIL,
     Seq(DECIMAL),
-    new FloorCeilCallGen(BuiltInMethods.CEIL_DEC))
+    BuiltInMethods.CEIL_DEC)
 
   addSqlFunctionMethod(
     SIN,
@@ -462,28 +459,36 @@ class FunctionGenerator private(config: TableConfig) {
     FLOOR,
     Seq(DATE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
+      BuiltInMethods.FLOOR,
+      Some(BuiltInMethods.FLOOR_INTEGRAL),
+      Some(BuiltInMethods.FLOOR_DEC),
       Some(BuiltInMethods.UNIX_DATE_FLOOR)))
 
   addSqlFunction(
     FLOOR,
     Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
+      BuiltInMethods.FLOOR,
+      Some(BuiltInMethods.FLOOR_INTEGRAL),
+      Some(BuiltInMethods.FLOOR_DEC),
       Some(BuiltInMethods.UNIX_DATE_FLOOR)))
 
   addSqlFunction(
     FLOOR,
     Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
+      BuiltInMethods.FLOOR,
+      Some(BuiltInMethods.FLOOR_INTEGRAL),
+      Some(BuiltInMethods.FLOOR_DEC),
       Some(BuiltInMethods.UNIX_TIMESTAMP_FLOOR)))
 
   addSqlFunction(
     FLOOR,
     Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
+      BuiltInMethods.FLOOR,
+      Some(BuiltInMethods.FLOOR_INTEGRAL),
+      Some(BuiltInMethods.FLOOR_DEC),
       Some(BuiltInMethods.TIMESTAMP_FLOOR_TIME_ZONE)))
 
   // TODO: fixme if CALCITE-3199 fixed
@@ -492,28 +497,36 @@ class FunctionGenerator private(config: TableConfig) {
     CEIL,
     Seq(DATE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
+      BuiltInMethods.CEIL,
+      Some(BuiltInMethods.CEIL_INTEGRAL),
+      Some(BuiltInMethods.CEIL_DEC),
       Some(BuiltInMethods.UNIX_DATE_CEIL)))
 
   addSqlFunction(
     CEIL,
     Seq(TIME_WITHOUT_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
+      BuiltInMethods.CEIL,
+      Some(BuiltInMethods.CEIL_INTEGRAL),
+      Some(BuiltInMethods.CEIL_DEC),
       Some(BuiltInMethods.UNIX_DATE_CEIL)))
 
   addSqlFunction(
     CEIL,
     Seq(TIMESTAMP_WITHOUT_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
+      BuiltInMethods.CEIL,
+      Some(BuiltInMethods.CEIL_INTEGRAL),
+      Some(BuiltInMethods.CEIL_DEC),
       Some(BuiltInMethods.UNIX_TIMESTAMP_CEIL)))
 
   addSqlFunction(
     CEIL,
     Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE, SYMBOL),
     new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
+      BuiltInMethods.CEIL,
+      Some(BuiltInMethods.CEIL_INTEGRAL),
+      Some(BuiltInMethods.CEIL_DEC),
       Some(BuiltInMethods.TIMESTAMP_CEIL_TIME_ZONE)))
 
   addSqlFunction(
@@ -821,44 +834,44 @@ class FunctionGenerator private(config: TableConfig) {
     BuiltInMethods.JSON_QUERY)
 
   addSqlFunctionMethod(IS_JSON_VALUE, Seq(CHAR),
-    BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_VALUE, argsNullable = true)
   addSqlFunctionMethod(IS_JSON_VALUE, Seq(VARCHAR),
-    BuiltInMethod.IS_JSON_VALUE.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_VALUE, argsNullable = true)
 
   addSqlFunctionMethod(IS_JSON_OBJECT, Seq(CHAR),
-    BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_OBJECT, argsNullable = true)
   addSqlFunctionMethod(IS_JSON_OBJECT, Seq(VARCHAR),
-    BuiltInMethod.IS_JSON_OBJECT.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_OBJECT, argsNullable = true)
 
   addSqlFunctionMethod(IS_JSON_ARRAY, Seq(CHAR),
-    BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_ARRAY, argsNullable = true)
   addSqlFunctionMethod(IS_JSON_ARRAY, Seq(VARCHAR),
-    BuiltInMethod.IS_JSON_ARRAY.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_ARRAY, argsNullable = true)
 
   addSqlFunctionMethod(IS_JSON_SCALAR, Seq(CHAR),
-    BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_SCALAR, argsNullable = true)
   addSqlFunctionMethod(IS_JSON_SCALAR, Seq(VARCHAR),
-    BuiltInMethod.IS_JSON_SCALAR.method, argsNullable = true)
+    BuiltInMethods.IS_JSON_SCALAR, argsNullable = true)
 
   addSqlFunction(IS_NOT_JSON_VALUE, Seq(CHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE, 
argsNullable = true)))
   addSqlFunction(IS_NOT_JSON_VALUE, Seq(VARCHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_VALUE.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_VALUE, 
argsNullable = true)))
 
   addSqlFunction(IS_NOT_JSON_OBJECT, Seq(CHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT, 
argsNullable = true)))
   addSqlFunction(IS_NOT_JSON_OBJECT, Seq(VARCHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_OBJECT.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_OBJECT, 
argsNullable = true)))
 
   addSqlFunction(IS_NOT_JSON_ARRAY, Seq(CHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY, 
argsNullable = true)))
   addSqlFunction(IS_NOT_JSON_ARRAY, Seq(VARCHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_ARRAY.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_ARRAY, 
argsNullable = true)))
 
   addSqlFunction(IS_NOT_JSON_SCALAR, Seq(CHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, 
argsNullable = true)))
   addSqlFunction(IS_NOT_JSON_SCALAR, Seq(VARCHAR),
-    new NotCallGen(new MethodCallGen(BuiltInMethod.IS_JSON_SCALAR.method, 
argsNullable = true)))
+    new NotCallGen(new MethodCallGen(BuiltInMethods.IS_JSON_SCALAR, 
argsNullable = true)))
 
 
   // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
index 0b54a5c..376b46b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
@@ -17,7 +17,9 @@
  */
 
 package org.apache.flink.table.planner.codegen.calls
+
 import org.apache.calcite.sql.{SqlJsonEmptyOrError, 
SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.flink.table.api.JsonValueOnEmptyOrError
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
qualifyEnum, qualifyMethod}
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
 import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGenUtils, 
CodeGeneratorContext, GeneratedExpression}
@@ -81,16 +83,16 @@ class JsonValueCallGen extends CallGenerator {
    */
   private def getBehavior(
       operands: Seq[GeneratedExpression],
-      mode: SqlJsonEmptyOrError): (SqlJsonValueEmptyOrErrorBehavior, String) = 
{
+      mode: SqlJsonEmptyOrError): (JsonValueOnEmptyOrError, String) = {
     operands.indexWhere(expr => expr.literalValue.contains(mode)) match {
-      case -1 => (SqlJsonValueEmptyOrErrorBehavior.NULL, null)
+      case -1 => (JsonValueOnEmptyOrError.NULL, null)
       case modeIndex => operands(modeIndex - 1).literalValue.get match {
         // Case for [NULL | ERROR] ON [EMPTY | ERROR]
-        case behavior: SqlJsonValueEmptyOrErrorBehavior => (behavior, null)
+        case behavior: SqlJsonValueEmptyOrErrorBehavior => 
(convertCalciteEnum(behavior), null)
         case _ => operands(modeIndex - 2).literalValue.get match {
           // Case for DEFAULT <expr> ON [EMPTY | ERROR]
           case behavior: SqlJsonValueEmptyOrErrorBehavior =>
-            (behavior, operands(modeIndex - 1).resultTerm)
+            (convertCalciteEnum(behavior), operands(modeIndex - 1).resultTerm)
           case _ =>
             throw new CodeGenException("Invalid combination of arguments for 
JSON_VALUE. "
               + "This is a bug. Please consider filing an issue.")
@@ -98,4 +100,11 @@ class JsonValueCallGen extends CallGenerator {
       }
     }
   }
+
+  private def convertCalciteEnum(
+      behavior: SqlJsonValueEmptyOrErrorBehavior): JsonValueOnEmptyOrError = 
behavior match {
+    case SqlJsonValueEmptyOrErrorBehavior.ERROR => 
JsonValueOnEmptyOrError.ERROR
+    case SqlJsonValueEmptyOrErrorBehavior.NULL => JsonValueOnEmptyOrError.NULL
+    case SqlJsonValueEmptyOrErrorBehavior.DEFAULT => 
JsonValueOnEmptyOrError.DEFAULT
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
index 49b7dc2..cd8035d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.functions.SqlLikeUtils
-import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, 
newName, qualifyMethod}
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
 import org.apache.flink.table.runtime.functions.SqlLikeChainChecker
 import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
 
-import org.apache.calcite.runtime.SqlFunctions
-
 import java.util.regex.Pattern
 
 /**
@@ -161,12 +159,11 @@ class LikeCallGen extends CallGenerator {
       terms =>
         val str1 = s"${terms.head}.toString()"
         val str2 = s"${terms(1)}.toString()"
-        val clsName = className[SqlFunctions]
         if (terms.length == 2) {
-          s"$clsName.like($str1, $str2)"
+          s"${qualifyMethod(BuiltInMethods.STRING_LIKE)}($str1, $str2)"
         } else {
           val str3 = s"${terms(2)}.toString()"
-          s"$clsName.like($str1, $str2, $str3)"
+          s"${qualifyMethod(BuiltInMethods.STRING_LIKE_WITH_ESCAPE)}($str1, 
$str2, $str3)"
         }
     }
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 7619486..9de99fc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -29,7 +29,6 @@ import 
org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, 
isTimestamp, isTimestampWithLocalZone}
 import org.apache.flink.table.types.logical._
 
-import org.apache.calcite.runtime.SqlFunctions
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, 
TRAILING}
 
@@ -341,9 +340,8 @@ object StringCallGen {
   def generateSimilarTo(
     ctx: CodeGeneratorContext,
     operands: Seq[GeneratedExpression]): GeneratedExpression = {
-    val className = classOf[SqlFunctions].getCanonicalName
     generateCallIfArgsNotNull(ctx, new BooleanType(), operands) {
-      terms => s"$className.similar(${toStringTerms(terms, operands)})"
+      terms => 
s"${qualifyMethod(BuiltInMethods.STRING_SIMILAR)}(${toStringTerms(terms, 
operands)})"
     }
   }
 
@@ -424,9 +422,8 @@ object StringCallGen {
     ctx: CodeGeneratorContext,
     operands: Seq[GeneratedExpression],
     returnType: LogicalType): GeneratedExpression = {
-    val className = classOf[SqlFunctions].getCanonicalName
     generateStringResultCallIfArgsNotNull(ctx, operands, returnType) {
-      terms => s"$className.initcap(${terms.head}.toString())"
+      terms => 
s"${qualifyMethod(BuiltInMethods.STRING_INITCAP)}(${terms.head}.toString())"
     }
   }
 
diff --git a/flink-table/flink-table-runtime/pom.xml 
b/flink-table/flink-table-runtime/pom.xml
index a6a6f9a..0c85dcc 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -84,6 +84,14 @@ under the License.
                        <version>${janino.version}</version>
                </dependency>
 
+               <!-- Jackson -->
+
+               <dependency>
+                       <groupId>com.jayway.jsonpath</groupId>
+                       <artifactId>json-path</artifactId>
+                       <version>${jsonpath.version}</version>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
@@ -131,6 +139,36 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <includes 
combine.children="append">
+                                                                       
<include>com.jayway.jsonpath:json-path</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.jayway</pattern>
+                                                                       
<shadedPattern>org.apache.flink.calcite.shaded.com.jayway</shadedPattern>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>com.fasterxml</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.jackson2.com.fasterxml</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 </project>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index c9016eb..06628ba 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -47,6 +47,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.table.data.DecimalDataUtils.castFrom;
+import static org.apache.flink.table.data.DecimalDataUtils.castToIntegral;
 import static org.apache.flink.table.data.DecimalDataUtils.doubleValue;
 
 /**
@@ -140,6 +141,11 @@ public class SqlFunctionUtils {
         return Math.tanh(doubleValue(a));
     }
 
+    /** SQL <code>COT</code> operator applied to double values. */
+    public static double cot(double b0) {
+        return 1.0d / Math.tan(b0);
+    }
+
     public static double cot(DecimalData a) {
         return 1.0d / Math.tan(doubleValue(a));
     }
@@ -152,14 +158,92 @@ public class SqlFunctionUtils {
         return Math.toRadians(doubleValue(angdeg));
     }
 
+    /** SQL <code>ABS</code> operator applied to byte values. */
+    public static byte abs(byte b0) {
+        return (byte) Math.abs(b0);
+    }
+
+    /** SQL <code>ABS</code> operator applied to short values. */
+    public static short abs(short b0) {
+        return (short) Math.abs(b0);
+    }
+
+    /** SQL <code>ABS</code> operator applied to int values. */
+    public static int abs(int b0) {
+        return Math.abs(b0);
+    }
+
+    /** SQL <code>ABS</code> operator applied to long values. */
+    public static long abs(long b0) {
+        return Math.abs(b0);
+    }
+
+    /** SQL <code>ABS</code> operator applied to float values. */
+    public static float abs(float b0) {
+        return Math.abs(b0);
+    }
+
+    /** SQL <code>ABS</code> operator applied to double values. */
+    public static double abs(double b0) {
+        return Math.abs(b0);
+    }
+
     public static DecimalData abs(DecimalData a) {
         return DecimalDataUtils.abs(a);
     }
 
+    public static double floor(double b0) {
+        return Math.floor(b0);
+    }
+
+    public static float floor(float b0) {
+        return (float) Math.floor(b0);
+    }
+
+    /** SQL <code>FLOOR</code> operator applied to int values. */
+    public static int floor(int b0, int b1) {
+        int r = b0 % b1;
+        if (r < 0) {
+            r += b1;
+        }
+        return b0 - r;
+    }
+
+    /** SQL <code>FLOOR</code> operator applied to long values. */
+    public static long floor(long b0, long b1) {
+        long r = b0 % b1;
+        if (r < 0) {
+            r += b1;
+        }
+        return b0 - r;
+    }
+
     public static DecimalData floor(DecimalData a) {
         return DecimalDataUtils.floor(a);
     }
 
+    public static double ceil(double b0) {
+        return Math.ceil(b0);
+    }
+
+    public static float ceil(float b0) {
+        return (float) Math.ceil(b0);
+    }
+
+    /** SQL <code>CEIL</code> operator applied to int values. */
+    public static int ceil(int b0, int b1) {
+        int r = b0 % b1;
+        if (r > 0) {
+            r -= b1;
+        }
+        return b0 - r;
+    }
+
+    /** SQL <code>CEIL</code> operator applied to long values. */
+    public static long ceil(long b0, long b1) {
+        return floor(b0 + b1 - 1, b1);
+    }
+
     public static DecimalData ceil(DecimalData a) {
         return DecimalDataUtils.ceil(a);
     }
@@ -1103,6 +1187,24 @@ public class SqlFunctionUtils {
         return UUID.nameUUIDFromBytes(b).toString();
     }
 
+    /** SQL <code>TRUNCATE</code> operator applied to int values. */
+    public static int struncate(int b0) {
+        return struncate(b0, 0);
+    }
+
+    public static int struncate(int b0, int b1) {
+        return (int) struncate((long) b0, b1);
+    }
+
+    /** SQL <code>TRUNCATE</code> operator applied to long values. */
+    public static long struncate(long b0) {
+        return struncate(b0, 0);
+    }
+
+    public static long struncate(long b0, int b1) {
+        return castToIntegral(struncate(castFrom(b0, 38, 18), b1));
+    }
+
     /** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */
     public static DecimalData struncate(DecimalData b0) {
         return struncate(b0, 0);
@@ -1137,6 +1239,15 @@ public class SqlFunctionUtils {
         return (float) doubleValue(struncate(castFrom((double) b0, 38, 18), 
b1));
     }
 
+    /** SQL <code>TRUNCATE</code> operator applied to double values. */
+    public static double struncate(double b0) {
+        return struncate(b0, 0);
+    }
+
+    public static double struncate(double b0, int b1) {
+        return doubleValue(struncate(castFrom(b0, 38, 18), b1));
+    }
+
     /**
      * Compares two byte arrays in lexicographical order.
      *
@@ -1160,4 +1271,41 @@ public class SqlFunctionUtils {
         }
         return array1.length - array2.length;
     }
+
+    /** SQL INITCAP(string) function. */
+    public static String initcap(String s) {
+        // Assumes Alpha as [A-Za-z0-9]
+        // white space is treated as everything else.
+        final int len = s.length();
+        boolean start = true;
+        final StringBuilder newS = new StringBuilder();
+
+        for (int i = 0; i < len; i++) {
+            char curCh = s.charAt(i);
+            final int c = (int) curCh;
+            if (start) { // curCh is whitespace or first character of word.
+                if (c > 47 && c < 58) { // 0-9
+                    start = false;
+                } else if (c > 64 && c < 91) { // A-Z
+                    start = false;
+                } else if (c > 96 && c < 123) { // a-z
+                    start = false;
+                    curCh = (char) (c - 32); // Uppercase this character
+                }
+                // else {} whitespace
+            } else { // Inside of a word or white space after end of word.
+                if (c > 47 && c < 58) { // 0-9
+                    // noop
+                } else if (c > 64 && c < 91) { // A-Z
+                    curCh = (char) (c + 32); // Lowercase this character
+                } else if (c > 96 && c < 123) { // a-z
+                    // noop
+                } else { // whitespace
+                    start = true;
+                }
+            }
+            newS.append(curCh);
+        } // for each character in s
+        return newS.toString();
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
index 40641ee..7602a19 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -19,8 +19,14 @@
 package org.apache.flink.table.runtime.functions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonQueryOnEmptyOrError;
+import org.apache.flink.table.api.JsonQueryWrapper;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.FlinkRuntimeException;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -30,6 +36,23 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Arra
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import com.jayway.jsonpath.spi.mapper.MappingProvider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Utilities for JSON functions.
  *
@@ -42,6 +65,14 @@ public class SqlJsonUtils {
     private static final ObjectMapper MAPPER =
             new ObjectMapper(JSON_FACTORY)
                     .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, 
true);
+    private static final Pattern JSON_PATH_BASE =
+            Pattern.compile(
+                    "^\\s*(?<mode>strict|lax)\\s+(?<spec>.+)$",
+                    Pattern.CASE_INSENSITIVE | Pattern.DOTALL | 
Pattern.MULTILINE);
+    private static final JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new 
JacksonJsonProvider();
+    private static final MappingProvider JSON_PATH_MAPPING_PROVIDER = new 
JacksonMappingProvider();
+
+    private SqlJsonUtils() {}
 
     /** Returns the {@link JsonNodeFactory} for creating nodes. */
     public static JsonNodeFactory getNodeFactory() {
@@ -71,5 +102,489 @@ public class SqlJsonUtils {
         }
     }
 
-    private SqlJsonUtils() {}
+    public static Boolean jsonExists(String input, String pathSpec) {
+        return jsonExists(jsonApiCommonSyntax(input, pathSpec), 
JsonExistsOnError.FALSE);
+    }
+
+    public static Boolean jsonExists(
+            String input, String pathSpec, JsonExistsOnError errorBehavior) {
+        return jsonExists(jsonApiCommonSyntax(input, pathSpec), errorBehavior);
+    }
+
+    private static Boolean jsonExists(JsonPathContext context, 
JsonExistsOnError errorBehavior) {
+        if (context.hasException()) {
+            switch (errorBehavior) {
+                case TRUE:
+                    return Boolean.TRUE;
+                case FALSE:
+                    return Boolean.FALSE;
+                case ERROR:
+                    throw toUnchecked(context.exc);
+                case UNKNOWN:
+                    return null;
+                default:
+                    throw 
illegalErrorBehaviorInJsonExistsFunc(errorBehavior.toString());
+            }
+        } else {
+            return context.obj != null;
+        }
+    }
+
+    public static Object jsonValue(
+            String input,
+            String pathSpec,
+            JsonValueOnEmptyOrError emptyBehavior,
+            Object defaultValueOnEmpty,
+            JsonValueOnEmptyOrError errorBehavior,
+            Object defaultValueOnError) {
+        return jsonValue(
+                jsonApiCommonSyntax(input, pathSpec),
+                emptyBehavior,
+                defaultValueOnEmpty,
+                errorBehavior,
+                defaultValueOnError);
+    }
+
+    private static Object jsonValue(
+            JsonPathContext context,
+            JsonValueOnEmptyOrError emptyBehavior,
+            Object defaultValueOnEmpty,
+            JsonValueOnEmptyOrError errorBehavior,
+            Object defaultValueOnError) {
+        final Exception exc;
+        if (context.hasException()) {
+            exc = context.exc;
+        } else {
+            Object value = context.obj;
+            if (value == null || context.mode == PathMode.LAX && 
!isScalarObject(value)) {
+                switch (emptyBehavior) {
+                    case ERROR:
+                        throw emptyResultOfJsonValueFuncNotAllowed();
+                    case NULL:
+                        return null;
+                    case DEFAULT:
+                        return defaultValueOnEmpty;
+                    default:
+                        throw 
illegalEmptyBehaviorInJsonValueFunc(emptyBehavior.toString());
+                }
+            } else if (context.mode == PathMode.STRICT && 
!isScalarObject(value)) {
+                exc = 
scalarValueRequiredInStrictModeOfJsonValueFunc(value.toString());
+            } else {
+                return value;
+            }
+        }
+        switch (errorBehavior) {
+            case ERROR:
+                throw toUnchecked(exc);
+            case NULL:
+                return null;
+            case DEFAULT:
+                return defaultValueOnError;
+            default:
+                throw 
illegalErrorBehaviorInJsonValueFunc(errorBehavior.toString());
+        }
+    }
+
+    public static String jsonQuery(
+            String input,
+            String pathSpec,
+            JsonQueryWrapper wrapperBehavior,
+            JsonQueryOnEmptyOrError emptyBehavior,
+            JsonQueryOnEmptyOrError errorBehavior) {
+        return jsonQuery(
+                jsonApiCommonSyntax(input, pathSpec),
+                wrapperBehavior,
+                emptyBehavior,
+                errorBehavior);
+    }
+
+    private static String jsonQuery(
+            JsonPathContext context,
+            JsonQueryWrapper wrapperBehavior,
+            JsonQueryOnEmptyOrError emptyBehavior,
+            JsonQueryOnEmptyOrError errorBehavior) {
+        final Exception exc;
+        if (context.hasException()) {
+            exc = context.exc;
+        } else {
+            Object value;
+            if (context.obj == null) {
+                value = null;
+            } else {
+                switch (wrapperBehavior) {
+                    case WITHOUT_ARRAY:
+                        value = context.obj;
+                        break;
+                    case UNCONDITIONAL_ARRAY:
+                        value = Collections.singletonList(context.obj);
+                        break;
+                    case CONDITIONAL_ARRAY:
+                        if (context.obj instanceof Collection) {
+                            value = context.obj;
+                        } else {
+                            value = Collections.singletonList(context.obj);
+                        }
+                        break;
+                    default:
+                        throw 
illegalWrapperBehaviorInJsonQueryFunc(wrapperBehavior.toString());
+                }
+            }
+            if (value == null || context.mode == PathMode.LAX && 
isScalarObject(value)) {
+                switch (emptyBehavior) {
+                    case ERROR:
+                        throw emptyResultOfJsonQueryFuncNotAllowed();
+                    case NULL:
+                        return null;
+                    case EMPTY_ARRAY:
+                        return "[]";
+                    case EMPTY_OBJECT:
+                        return "{}";
+                    default:
+                        throw 
illegalEmptyBehaviorInJsonQueryFunc(emptyBehavior.toString());
+                }
+            } else if (context.mode == PathMode.STRICT && 
isScalarObject(value)) {
+                exc = 
arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc(value.toString());
+            } else {
+                try {
+                    return jsonize(value);
+                } catch (Exception e) {
+                    exc = e;
+                }
+            }
+        }
+        switch (errorBehavior) {
+            case ERROR:
+                throw toUnchecked(exc);
+            case NULL:
+                return null;
+            case EMPTY_ARRAY:
+                return "[]";
+            case EMPTY_OBJECT:
+                return "{}";
+            default:
+                throw 
illegalErrorBehaviorInJsonQueryFunc(errorBehavior.toString());
+        }
+    }
+
+    public static boolean isJsonValue(String input) {
+        try {
+            dejsonize(input);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public static boolean isJsonObject(String input) {
+        try {
+            Object o = dejsonize(input);
+            return o instanceof Map;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public static boolean isJsonArray(String input) {
+        try {
+            Object o = dejsonize(input);
+            return o instanceof Collection;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public static boolean isJsonScalar(String input) {
+        try {
+            Object o = dejsonize(input);
+            return !(o instanceof Map) && !(o instanceof Collection);
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private static boolean isScalarObject(Object obj) {
+        if (obj instanceof Collection) {
+            return false;
+        }
+        if (obj instanceof Map) {
+            return false;
+        }
+        return true;
+    }
+
+    private static String jsonize(Object input) {
+        return JSON_PATH_JSON_PROVIDER.toJson(input);
+    }
+
+    private static Object dejsonize(String input) {
+        return JSON_PATH_JSON_PROVIDER.parse(input);
+    }
+
+    private static JsonValueContext jsonValueExpression(String input) {
+        try {
+            return JsonValueContext.withJavaObj(dejsonize(input));
+        } catch (Exception e) {
+            return JsonValueContext.withException(e);
+        }
+    }
+
+    private static JsonPathContext jsonApiCommonSyntax(String input, String 
pathSpec) {
+        return jsonApiCommonSyntax(jsonValueExpression(input), pathSpec);
+    }
+
+    private static JsonPathContext jsonApiCommonSyntax(JsonValueContext input, 
String pathSpec) {
+        PathMode mode;
+        String pathStr;
+        try {
+            Matcher matcher = JSON_PATH_BASE.matcher(pathSpec);
+            if (!matcher.matches()) {
+                mode = PathMode.STRICT;
+                pathStr = pathSpec;
+            } else {
+                mode = 
PathMode.valueOf(matcher.group(1).toUpperCase(Locale.ROOT));
+                pathStr = matcher.group(2);
+            }
+            DocumentContext ctx;
+            switch (mode) {
+                case STRICT:
+                    if (input.hasException()) {
+                        return JsonPathContext.withStrictException(pathSpec, 
input.exc);
+                    }
+                    ctx =
+                            JsonPath.parse(
+                                    input.obj,
+                                    Configuration.builder()
+                                            
.jsonProvider(JSON_PATH_JSON_PROVIDER)
+                                            
.mappingProvider(JSON_PATH_MAPPING_PROVIDER)
+                                            .build());
+                    break;
+                case LAX:
+                    if (input.hasException()) {
+                        return JsonPathContext.withJavaObj(PathMode.LAX, null);
+                    }
+                    ctx =
+                            JsonPath.parse(
+                                    input.obj,
+                                    Configuration.builder()
+                                            
.options(Option.SUPPRESS_EXCEPTIONS)
+                                            
.jsonProvider(JSON_PATH_JSON_PROVIDER)
+                                            
.mappingProvider(JSON_PATH_MAPPING_PROVIDER)
+                                            .build());
+                    break;
+                default:
+                    throw illegalJsonPathModeInPathSpec(mode.toString(), 
pathSpec);
+            }
+            try {
+                return JsonPathContext.withJavaObj(mode, ctx.read(pathStr));
+            } catch (Exception e) {
+                return JsonPathContext.withStrictException(pathSpec, e);
+            }
+        } catch (Exception e) {
+            return JsonPathContext.withUnknownException(e);
+        }
+    }
+
+    private static RuntimeException toUnchecked(Exception e) {
+        if (e instanceof RuntimeException) {
+            return (RuntimeException) e;
+        }
+        return new RuntimeException(e);
+    }
+
+    private static RuntimeException illegalJsonPathModeInPathSpec(
+            String pathMode, String pathSpec) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal jsonpath mode ''%s'' in jsonpath spec: 
''%s''",
+                        pathMode, pathSpec));
+    }
+
+    private static RuntimeException illegalJsonPathMode(String pathMode) {
+        return new FlinkRuntimeException(String.format("Illegal jsonpath mode 
''%s''", pathMode));
+    }
+
+    private static RuntimeException illegalJsonPathSpec(String pathSpec) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal jsonpath spec ''%s'', format of the spec 
should be: ''<lax|strict> $'{'expr'}'''",
+                        pathSpec));
+    }
+
+    private static RuntimeException strictPathModeRequiresNonEmptyValue() {
+        return new FlinkRuntimeException(
+                "Strict jsonpath mode requires a non empty returned value, but 
is null");
+    }
+
+    private static RuntimeException 
illegalErrorBehaviorInJsonExistsFunc(String errorBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal error behavior ''{0}'' specified in 
JSON_EXISTS function",
+                        errorBehavior));
+    }
+
+    private static RuntimeException emptyResultOfJsonValueFuncNotAllowed() {
+        return new FlinkRuntimeException("Empty result of JSON_VALUE function 
is not allowed");
+    }
+
+    private static RuntimeException illegalEmptyBehaviorInJsonValueFunc(String 
emptyBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal empty behavior ''{0}'' specified in 
JSON_VALUE function",
+                        emptyBehavior));
+    }
+
+    private static RuntimeException illegalErrorBehaviorInJsonValueFunc(String 
errorBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal error behavior ''%s'' specified in JSON_VALUE 
function",
+                        errorBehavior));
+    }
+
+    private static RuntimeException 
scalarValueRequiredInStrictModeOfJsonValueFunc(String value) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Strict jsonpath mode requires scalar value, and the 
actual value is: ''%s''",
+                        value));
+    }
+
+    private static RuntimeException 
illegalWrapperBehaviorInJsonQueryFunc(String wrapperBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal wrapper behavior ''%s'' specified in 
JSON_QUERY function",
+                        wrapperBehavior));
+    }
+
+    private static RuntimeException emptyResultOfJsonQueryFuncNotAllowed() {
+        return new FlinkRuntimeException("Empty result of JSON_QUERY function 
is not allowed");
+    }
+
+    private static RuntimeException illegalEmptyBehaviorInJsonQueryFunc(String 
emptyBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal empty behavior ''%s'' specified in JSON_VALUE 
function",
+                        emptyBehavior));
+    }
+
+    private static RuntimeException 
arrayOrObjectValueRequiredInStrictModeOfJsonQueryFunc(
+            String value) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Strict jsonpath mode requires array or object value, 
and the actual value is: ''%s''",
+                        value));
+    }
+
+    private static RuntimeException illegalErrorBehaviorInJsonQueryFunc(String 
errorBehavior) {
+        return new FlinkRuntimeException(
+                String.format(
+                        "Illegal error behavior ''%s'' specified in JSON_VALUE 
function",
+                        errorBehavior));
+    }
+
+    /**
+     * Path spec has two different modes: lax mode and strict mode. Lax mode 
suppresses any thrown
+     * exception and returns null, whereas strict mode throws exceptions.
+     */
+    public enum PathMode {
+        LAX,
+        STRICT,
+        UNKNOWN,
+        NONE
+    }
+
+    /** Returned path context of JsonApiCommonSyntax, public for testing. */
+    private static class JsonPathContext {
+        public final PathMode mode;
+        public final Object obj;
+        public final Exception exc;
+
+        private JsonPathContext(Object obj, Exception exc) {
+            this(PathMode.NONE, obj, exc);
+        }
+
+        private JsonPathContext(PathMode mode, Object obj, Exception exc) {
+            assert obj == null || exc == null;
+            this.mode = mode;
+            this.obj = obj;
+            this.exc = exc;
+        }
+
+        public boolean hasException() {
+            return exc != null;
+        }
+
+        public static JsonPathContext withUnknownException(Exception exc) {
+            return new JsonPathContext(PathMode.UNKNOWN, null, exc);
+        }
+
+        public static JsonPathContext withStrictException(Exception exc) {
+            return new JsonPathContext(PathMode.STRICT, null, exc);
+        }
+
+        public static JsonPathContext withStrictException(String pathSpec, 
Exception exc) {
+            if (exc.getClass() == InvalidPathException.class) {
+                exc = illegalJsonPathSpec(pathSpec);
+            }
+            return withStrictException(exc);
+        }
+
+        public static JsonPathContext withJavaObj(PathMode mode, Object obj) {
+            if (mode == PathMode.UNKNOWN) {
+                throw illegalJsonPathMode(mode.toString());
+            }
+            if (mode == PathMode.STRICT && obj == null) {
+                throw strictPathModeRequiresNonEmptyValue();
+            }
+            return new JsonPathContext(mode, obj, null);
+        }
+
+        @Override
+        public String toString() {
+            return "JsonPathContext{" + "mode=" + mode + ", obj=" + obj + ", 
exc=" + exc + '}';
+        }
+    }
+
+    private static class JsonValueContext {
+        @JsonValue public final Object obj;
+        public final Exception exc;
+
+        private JsonValueContext(Object obj, Exception exc) {
+            assert obj == null || exc == null;
+            this.obj = obj;
+            this.exc = exc;
+        }
+
+        public static JsonValueContext withJavaObj(Object obj) {
+            return new JsonValueContext(obj, null);
+        }
+
+        public static JsonValueContext withException(Exception exc) {
+            return new JsonValueContext(null, exc);
+        }
+
+        public boolean hasException() {
+            return exc != null;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JsonValueContext jsonValueContext = (JsonValueContext) o;
+            return Objects.equals(obj, jsonValueContext.obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(obj);
+        }
+
+        @Override
+        public String toString() {
+            return Objects.toString(obj);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
new file mode 100644
index 0000000..583b3c2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedWatermarkGeneratorSupplier.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to create 
{@link
+ * org.apache.flink.api.common.eventtime.WatermarkGenerator}. The {@link
+ * GeneratedWatermarkGeneratorSupplier} uses the {@link Context} to init the 
generated watermark
+ * generator.
+ */
+@Internal
+public class GeneratedWatermarkGeneratorSupplier implements 
WatermarkGeneratorSupplier<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private final Configuration configuration;
+    private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
+
+    public GeneratedWatermarkGeneratorSupplier(
+            Configuration configuration, GeneratedWatermarkGenerator 
generatedWatermarkGenerator) {
+        this.configuration = configuration;
+        this.generatedWatermarkGenerator = generatedWatermarkGenerator;
+    }
+
+    @Override
+    public org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData>
+            createWatermarkGenerator(Context context) {
+
+        List<Object> references =
+                new 
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
+        references.add(context);
+
+        WatermarkGenerator innerWatermarkGenerator =
+                new GeneratedWatermarkGenerator(
+                                generatedWatermarkGenerator.getClassName(),
+                                generatedWatermarkGenerator.getCode(),
+                                references.toArray(),
+                                configuration)
+                        
.newInstance(Thread.currentThread().getContextClassLoader());
+
+        try {
+            innerWatermarkGenerator.open(configuration);
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to instantiate generated 
watermark generator.", e);
+        }
+        return new 
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator(
+                innerWatermarkGenerator);
+    }
+
+    /** Wrapper of the code-generated {@link WatermarkGenerator}. */
+    public static class DefaultWatermarkGenerator
+            implements 
org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> {
+        private static final long serialVersionUID = 1L;
+
+        private final WatermarkGenerator innerWatermarkGenerator;
+        private Long currentWatermark = Long.MIN_VALUE;
+
+        public DefaultWatermarkGenerator(WatermarkGenerator 
watermarkGenerator) {
+            this.innerWatermarkGenerator = watermarkGenerator;
+        }
+
+        @Override
+        public void onEvent(RowData event, long eventTimestamp, 
WatermarkOutput output) {
+            try {
+                Long watermark = 
innerWatermarkGenerator.currentWatermark(event);
+                if (watermark != null) {
+                    currentWatermark = watermark;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        String.format(
+                                "Generated WatermarkGenerator fails to 
generate for row: %s.",
+                                event),
+                        e);
+            }
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {
+            output.emitWatermark(new Watermark(currentWatermark));
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
new file mode 100644
index 0000000..a379b2b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/FilterAllFlatMapFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+/** Function filtering out all the input records. */
+@Internal
+public class FilterAllFlatMapFunction
+        implements FlatMapFunction<RowData, RowData>, 
ResultTypeQueryable<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private final InternalTypeInfo<RowData> outputTypeInfo;
+
+    public FilterAllFlatMapFunction(InternalTypeInfo<RowData> inputTypeInfo) {
+        this.outputTypeInfo = inputTypeInfo;
+    }
+
+    @Override
+    public void flatMap(RowData value, Collector<RowData> out) {}
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return outputTypeInfo;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
new file mode 100644
index 0000000..ecd84cb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingLeftMapFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+/** Function performing left padding. */
+@Internal
+public class PaddingLeftMapFunction
+        implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> 
{
+    private static final long serialVersionUID = 1L;
+
+    private final OuterJoinPaddingUtil paddingUtil;
+    private final InternalTypeInfo<RowData> outputTypeInfo;
+
+    public PaddingLeftMapFunction(
+            OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> 
returnType) {
+        this.paddingUtil = paddingUtil;
+        this.outputTypeInfo = returnType;
+    }
+
+    @Override
+    public RowData map(RowData value) {
+        return paddingUtil.padLeft(value);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return outputTypeInfo;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
new file mode 100644
index 0000000..5cb02dd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/PaddingRightMapFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.interval;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+/** Function performing right padding. */
+@Internal
+public class PaddingRightMapFunction
+        implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> 
{
+    private static final long serialVersionUID = 1L;
+
+    private final OuterJoinPaddingUtil paddingUtil;
+    private final InternalTypeInfo<RowData> outputTypeInfo;
+
+    public PaddingRightMapFunction(
+            OuterJoinPaddingUtil paddingUtil, InternalTypeInfo<RowData> 
returnType) {
+        this.paddingUtil = paddingUtil;
+        this.outputTypeInfo = returnType;
+    }
+
+    @Override
+    public RowData map(RowData value) {
+        return paddingUtil.padRight(value);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return outputTypeInfo;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
new file mode 100644
index 0000000..3ed9556
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.wmassigners;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
+
+import javax.annotation.Nullable;
+
+/** Generates periodic watermarks based on a {@link 
PeriodicWatermarkAssigner}. */
+@Internal
+public class PeriodicWatermarkAssignerWrapper implements 
AssignerWithPeriodicWatermarks<RowData> {
+    private static final long serialVersionUID = 1L;
+    private final PeriodicWatermarkAssigner assigner;
+    private final int timeFieldIdx;
+
+    /**
+     * @param timeFieldIdx the index of the rowtime attribute.
+     * @param assigner the watermark assigner.
+     */
+    public PeriodicWatermarkAssignerWrapper(PeriodicWatermarkAssigner 
assigner, int timeFieldIdx) {
+        this.assigner = assigner;
+        this.timeFieldIdx = timeFieldIdx;
+    }
+
+    @Nullable
+    @Override
+    public Watermark getCurrentWatermark() {
+        return assigner.getWatermark();
+    }
+
+    @Override
+    public long extractTimestamp(RowData row, long recordTimestamp) {
+        long timestamp = row.getTimestamp(timeFieldIdx, 3).getMillisecond();
+        assigner.nextTimestamp(timestamp);
+        return 0;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
new file mode 100644
index 0000000..a3f99e2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.wmassigners;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+/** Generates periodic watermarks based on a {@link 
PunctuatedWatermarkAssigner}. */
+@Internal
+public class PunctuatedWatermarkAssignerWrapper
+        implements AssignerWithPunctuatedWatermarks<RowData> {
+    private static final long serialVersionUID = 1L;
+    private final PunctuatedWatermarkAssigner assigner;
+    private final int timeFieldIdx;
+    private final DataFormatConverters.DataFormatConverter<RowData, Row> 
converter;
+
+    /**
+     * @param timeFieldIdx the index of the rowtime attribute.
+     * @param assigner the watermark assigner.
+     * @param sourceType the type of source
+     */
+    @SuppressWarnings("unchecked")
+    public PunctuatedWatermarkAssignerWrapper(
+            PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType 
sourceType) {
+        this.assigner = assigner;
+        this.timeFieldIdx = timeFieldIdx;
+        DataType originDataType;
+        if (sourceType instanceof FieldsDataType) {
+            originDataType = sourceType;
+        } else {
+            originDataType = DataTypes.ROW(DataTypes.FIELD("f0", sourceType));
+        }
+        converter =
+                
DataFormatConverters.getConverterForDataType(originDataType.bridgedTo(Row.class));
+    }
+
+    @Nullable
+    @Override
+    public Watermark checkAndGetNextWatermark(RowData row, long 
extractedTimestamp) {
+        long timestamp = row.getLong(timeFieldIdx);
+        return assigner.getWatermark(converter.toExternal(row), timestamp);
+    }
+
+    @Override
+    public long extractTimestamp(RowData element, long recordTimestamp) {
+        return 0;
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..33fe52a
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,9 @@
+flink-table-runtime
+Copyright 2014-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.jayway.jsonpath:json-path:2.6.0
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 63f6371..850931e 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -72,9 +72,10 @@ under the License.
        </dependencyManagement>
 
        <properties>
-               <!-- When updating Janino, make sure that Calcite supports it 
as well. -->
-               <janino.version>3.0.11</janino.version>
                <calcite.version>1.26.0</calcite.version>
+               <!-- Keep Janino in sync with calcite. -->
+               <janino.version>3.0.11</janino.version>
+               <jsonpath.version>2.6.0</jsonpath.version>
                <guava.version>29.0-jre</guava.version>
        </properties>
 </project>

Reply via email to