This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c40849  [FLINK-25075][table] Refactored PlannerExpressionParser to 
avoid instantiation through reflections
8c40849 is described below

commit 8c40849c5249d96532305d6b88302ee863371541
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Fri Nov 26 11:44:26 2021 +0100

    [FLINK-25075][table] Refactored PlannerExpressionParser to avoid 
instantiation through reflections
    
    This closes #17931.
---
 .../java/internal/StreamTableEnvironmentImpl.java  |  4 +-
 .../main/java/org/apache/flink/table/api/Over.java |  5 +-
 .../flink/table/api/OverWindowPartitioned.java     |  4 +-
 .../table/api/OverWindowPartitionedOrdered.java    |  6 +-
 .../api/OverWindowPartitionedOrderedPreceding.java |  6 +-
 .../java/org/apache/flink/table/api/Session.java   |  4 +-
 .../org/apache/flink/table/api/SessionWithGap.java |  4 +-
 .../flink/table/api/SessionWithGapOnTime.java      |  4 +-
 .../java/org/apache/flink/table/api/Slide.java     |  4 +-
 .../org/apache/flink/table/api/SlideWithSize.java  |  4 +-
 .../flink/table/api/SlideWithSizeAndSlide.java     |  4 +-
 .../table/api/SlideWithSizeAndSlideOnTime.java     |  4 +-
 .../java/org/apache/flink/table/api/Tumble.java    |  4 +-
 .../org/apache/flink/table/api/TumbleWithSize.java |  4 +-
 .../flink/table/api/TumbleWithSizeOnTime.java      |  4 +-
 .../apache/flink/table/api/internal/TableImpl.java | 93 +++++++++++++---------
 .../ExpressionParser.java                          | 24 +++---
 .../ExpressionParserFactory.java}                  | 29 ++++---
 .../table/delegation/PlannerExpressionParser.java  | 73 -----------------
 .../table/api/WindowCreationValidationTest.java    | 57 -------------
 .../delegation/DefaultExpressionParserFactory.java | 53 ++++++++++++
 .../org.apache.flink.table.factories.Factory       |  1 +
 ...ParserImpl.scala => ExpressionParserImpl.scala} | 19 +++--
 .../planner/expressions/KeywordParseTest.scala     | 13 ++-
 .../planner/expressions/ScalarFunctionsTest.scala  | 10 +--
 .../expressions/utils/ExpressionTestBase.scala     |  7 +-
 .../planner/plan/utils/RexNodeExtractorTest.scala  | 48 +++++------
 .../planner/runtime/utils/BatchTableEnvUtil.scala  |  6 +-
 .../runtime/utils/CollectionBatchExecTable.scala   |  5 +-
 .../planner/runtime/utils/StreamTableEnvUtil.scala |  5 +-
 30 files changed, 228 insertions(+), 280 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 72a92d0..b0957f6 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -49,10 +49,10 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.functions.AggregateFunction;
@@ -458,7 +458,7 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
 
     @Override
     public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
-        List<Expression> expressions = 
ExpressionParser.parseExpressionList(fields);
+        List<Expression> expressions = 
ExpressionParser.INSTANCE.parseExpressionList(fields);
         return fromDataStream(dataStream, expressions.toArray(new 
Expression[0]));
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
index 7862816..97a1eec 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 import java.util.Arrays;
 
@@ -53,7 +53,8 @@ public final class Over {
      * @return an over window with defined partitioning
      */
     public static OverWindowPartitioned partitionBy(String partitionBy) {
-        return new 
OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy));
+        return new OverWindowPartitioned(
+                ExpressionParser.INSTANCE.parseExpressionList(partitionBy));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
index 7c47444..2d35689 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 import java.util.List;
 
@@ -49,7 +49,7 @@ public final class OverWindowPartitioned {
      */
     @Deprecated
     public OverWindowPartitionedOrdered orderBy(String orderBy) {
-        return this.orderBy(ExpressionParser.parseExpression(orderBy));
+        return 
this.orderBy(ExpressionParser.INSTANCE.parseExpression(orderBy));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
index d1d82a8..92a2360 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
 import java.util.List;
@@ -49,7 +49,7 @@ public final class OverWindowPartitionedOrdered {
      */
     @Deprecated
     public OverWindowPartitionedOrderedPreceding preceding(String preceding) {
-        return this.preceding(ExpressionParser.parseExpression(preceding));
+        return 
this.preceding(ExpressionParser.INSTANCE.parseExpression(preceding));
     }
 
     /**
@@ -69,7 +69,7 @@ public final class OverWindowPartitionedOrdered {
      * @return the fully defined over window
      */
     public OverWindow as(String alias) {
-        return as(ExpressionParser.parseExpression(alias));
+        return as(ExpressionParser.INSTANCE.parseExpression(alias));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
index 885306e..32511fb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 import java.util.List;
 import java.util.Optional;
@@ -48,7 +48,7 @@ public final class OverWindowPartitionedOrderedPreceding {
      * @return the fully defined over window
      */
     public OverWindow as(String alias) {
-        return as(ExpressionParser.parseExpression(alias));
+        return as(ExpressionParser.INSTANCE.parseExpression(alias));
     }
 
     /**
@@ -70,7 +70,7 @@ public final class OverWindowPartitionedOrderedPreceding {
      */
     @Deprecated
     public OverWindowPartitionedOrderedPreceding following(String following) {
-        return this.following(ExpressionParser.parseExpression(following));
+        return 
this.following(ExpressionParser.INSTANCE.parseExpression(following));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
index 50cb165..e075c6d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Helper class for creating a session window. The boundary of session windows 
are defined by
@@ -53,7 +53,7 @@ public final class Session {
      */
     @Deprecated
     public static SessionWithGap withGap(String gap) {
-        return withGap(ExpressionParser.parseExpression(gap));
+        return withGap(ExpressionParser.INSTANCE.parseExpression(gap));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
index 49521bb..0859cd2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Session window.
@@ -54,7 +54,7 @@ public final class SessionWithGap {
      */
     @Deprecated
     public SessionWithGapOnTime on(String timeField) {
-        return on(ExpressionParser.parseExpression(timeField));
+        return on(ExpressionParser.INSTANCE.parseExpression(timeField));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
index 202a940..d53c1ed 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /** Session window on time. */
 @PublicEvolving
@@ -44,7 +44,7 @@ public final class SessionWithGapOnTime {
      * @return this window
      */
     public SessionWithGapOnTimeWithAlias as(String alias) {
-        return as(ExpressionParser.parseExpression(alias));
+        return as(ExpressionParser.INSTANCE.parseExpression(alias));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
index 6c4673d..4d1bb81 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by a
@@ -61,7 +61,7 @@ public final class Slide {
      */
     @Deprecated
     public static SlideWithSize over(String size) {
-        return over(ExpressionParser.parseExpression(size));
+        return over(ExpressionParser.INSTANCE.parseExpression(size));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
index 1e29478..7b0227f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Partially specified sliding window. The size of the window either as time 
or row-count interval.
@@ -52,7 +52,7 @@ public final class SlideWithSize {
      */
     @Deprecated
     public SlideWithSizeAndSlide every(String slide) {
-        return every(ExpressionParser.parseExpression(slide));
+        return every(ExpressionParser.INSTANCE.parseExpression(slide));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
index 3713500..d101d6e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Sliding window. The size of the window either as time or row-count interval.
@@ -57,7 +57,7 @@ public final class SlideWithSizeAndSlide {
      */
     @Deprecated
     public SlideWithSizeAndSlideOnTime on(String timeField) {
-        return on(ExpressionParser.parseExpression(timeField));
+        return on(ExpressionParser.INSTANCE.parseExpression(timeField));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
index 4a57bdb..d21a01c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /** Sliding window on time. */
 @PublicEvolving
@@ -46,7 +46,7 @@ public final class SlideWithSizeAndSlideOnTime {
      * @return this window
      */
     public SlideWithSizeAndSlideOnTimeWithAlias as(String alias) {
-        return as(ExpressionParser.parseExpression(alias));
+        return as(ExpressionParser.INSTANCE.parseExpression(alias));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
index 87743c9..9a3489b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
@@ -53,7 +53,7 @@ public final class Tumble {
      */
     @Deprecated
     public static TumbleWithSize over(String size) {
-        return over(ExpressionParser.parseExpression(size));
+        return over(ExpressionParser.INSTANCE.parseExpression(size));
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
index 563d0c6..20e34c5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /**
  * Tumbling window.
@@ -69,6 +69,6 @@ public final class TumbleWithSize {
      */
     @Deprecated
     public TumbleWithSizeOnTime on(String timeField) {
-        return on(ExpressionParser.parseExpression(timeField));
+        return on(ExpressionParser.INSTANCE.parseExpression(timeField));
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
index 8f06421..083a835 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 
 /** Tumbling window on time. */
 @PublicEvolving
@@ -56,6 +56,6 @@ public final class TumbleWithSizeOnTime {
      * @return this window
      */
     public TumbleWithSizeOnTimeWithAlias as(String alias) {
-        return as(ExpressionParser.parseExpression(alias));
+        return as(ExpressionParser.INSTANCE.parseExpression(alias));
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 682fd93..a1030b26c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -39,8 +39,8 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaTranslator;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.resolver.LookupCallResolver;
 import org.apache.flink.table.functions.TemporalTableFunction;
@@ -120,7 +120,8 @@ public class TableImpl implements Table {
 
     @Override
     public Table select(String fields) {
-        return select(ExpressionParser.parseExpressionList(fields).toArray(new 
Expression[0]));
+        return select(
+                
ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(new 
Expression[0]));
     }
 
     @Override
@@ -151,8 +152,8 @@ public class TableImpl implements Table {
     public TemporalTableFunction createTemporalTableFunction(
             String timeAttribute, String primaryKey) {
         return createTemporalTableFunction(
-                ExpressionParser.parseExpression(timeAttribute),
-                ExpressionParser.parseExpression(primaryKey));
+                ExpressionParser.INSTANCE.parseExpression(timeAttribute),
+                ExpressionParser.INSTANCE.parseExpression(primaryKey));
     }
 
     @Override
@@ -171,7 +172,7 @@ public class TableImpl implements Table {
     public Table as(String field, String... fields) {
         final List<Expression> fieldsExprs;
         if (fields.length == 0 && 
operationTree.getResolvedSchema().getColumnCount() > 1) {
-            fieldsExprs = ExpressionParser.parseExpressionList(field);
+            fieldsExprs = ExpressionParser.INSTANCE.parseExpressionList(field);
         } else {
             fieldsExprs = new ArrayList<>();
             fieldsExprs.add(lit(field));
@@ -189,7 +190,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table filter(String predicate) {
-        return filter(ExpressionParser.parseExpression(predicate));
+        return filter(ExpressionParser.INSTANCE.parseExpression(predicate));
     }
 
     @Override
@@ -210,7 +211,7 @@ public class TableImpl implements Table {
 
     @Override
     public GroupedTable groupBy(String fields) {
-        return new GroupedTableImpl(this, 
ExpressionParser.parseExpressionList(fields));
+        return new GroupedTableImpl(this, 
ExpressionParser.INSTANCE.parseExpressionList(fields));
     }
 
     @Override
@@ -230,7 +231,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table join(Table right, String joinPredicate) {
-        return join(right, ExpressionParser.parseExpression(joinPredicate));
+        return join(right, 
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -245,7 +246,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table leftOuterJoin(Table right, String joinPredicate) {
-        return leftOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+        return leftOuterJoin(right, 
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -255,7 +256,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table rightOuterJoin(Table right, String joinPredicate) {
-        return rightOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+        return rightOuterJoin(right, 
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -265,7 +266,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table fullOuterJoin(Table right, String joinPredicate) {
-        return fullOuterJoin(right, 
ExpressionParser.parseExpression(joinPredicate));
+        return fullOuterJoin(right, 
ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -297,7 +298,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table joinLateral(String tableFunctionCall) {
-        return 
joinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+        return 
joinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall));
     }
 
     @Override
@@ -308,8 +309,8 @@ public class TableImpl implements Table {
     @Override
     public Table joinLateral(String tableFunctionCall, String joinPredicate) {
         return joinLateral(
-                ExpressionParser.parseExpression(tableFunctionCall),
-                ExpressionParser.parseExpression(joinPredicate));
+                ExpressionParser.INSTANCE.parseExpression(tableFunctionCall),
+                ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -319,7 +320,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table leftOuterJoinLateral(String tableFunctionCall) {
-        return 
leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall));
+        return 
leftOuterJoinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall));
     }
 
     @Override
@@ -330,8 +331,8 @@ public class TableImpl implements Table {
     @Override
     public Table leftOuterJoinLateral(String tableFunctionCall, String 
joinPredicate) {
         return leftOuterJoinLateral(
-                ExpressionParser.parseExpression(tableFunctionCall),
-                ExpressionParser.parseExpression(joinPredicate));
+                ExpressionParser.INSTANCE.parseExpression(tableFunctionCall),
+                ExpressionParser.INSTANCE.parseExpression(joinPredicate));
     }
 
     @Override
@@ -406,7 +407,7 @@ public class TableImpl implements Table {
     public Table orderBy(String fields) {
         return createTable(
                 operationTreeBuilder.sort(
-                        ExpressionParser.parseExpressionList(fields), 
operationTree));
+                        ExpressionParser.INSTANCE.parseExpressionList(fields), 
operationTree));
     }
 
     @Override
@@ -449,7 +450,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table addColumns(String fields) {
-        return addColumnsOperation(false, 
ExpressionParser.parseExpressionList(fields));
+        return addColumnsOperation(false, 
ExpressionParser.INSTANCE.parseExpressionList(fields));
     }
 
     @Override
@@ -459,7 +460,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table addOrReplaceColumns(String fields) {
-        return addColumnsOperation(true, 
ExpressionParser.parseExpressionList(fields));
+        return addColumnsOperation(true, 
ExpressionParser.INSTANCE.parseExpressionList(fields));
     }
 
     @Override
@@ -490,7 +491,7 @@ public class TableImpl implements Table {
     public Table renameColumns(String fields) {
         return createTable(
                 operationTreeBuilder.renameColumns(
-                        ExpressionParser.parseExpressionList(fields), 
operationTree));
+                        ExpressionParser.INSTANCE.parseExpressionList(fields), 
operationTree));
     }
 
     @Override
@@ -503,7 +504,7 @@ public class TableImpl implements Table {
     public Table dropColumns(String fields) {
         return createTable(
                 operationTreeBuilder.dropColumns(
-                        ExpressionParser.parseExpressionList(fields), 
operationTree));
+                        ExpressionParser.INSTANCE.parseExpressionList(fields), 
operationTree));
     }
 
     @Override
@@ -513,7 +514,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table map(String mapFunction) {
-        return map(ExpressionParser.parseExpression(mapFunction));
+        return map(ExpressionParser.INSTANCE.parseExpression(mapFunction));
     }
 
     @Override
@@ -523,7 +524,7 @@ public class TableImpl implements Table {
 
     @Override
     public Table flatMap(String tableFunction) {
-        return flatMap(ExpressionParser.parseExpression(tableFunction));
+        return 
flatMap(ExpressionParser.INSTANCE.parseExpression(tableFunction));
     }
 
     @Override
@@ -533,7 +534,7 @@ public class TableImpl implements Table {
 
     @Override
     public AggregatedTable aggregate(String aggregateFunction) {
-        return aggregate(ExpressionParser.parseExpression(aggregateFunction));
+        return 
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
     }
 
     @Override
@@ -642,7 +643,10 @@ public class TableImpl implements Table {
 
         @Override
         public Table select(String fields) {
-            return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+            return select(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -666,7 +670,7 @@ public class TableImpl implements Table {
 
         @Override
         public AggregatedTable aggregate(String aggregateFunction) {
-            return 
aggregate(ExpressionParser.parseExpression(aggregateFunction));
+            return 
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
         }
 
         @Override
@@ -676,7 +680,7 @@ public class TableImpl implements Table {
 
         @Override
         public FlatAggregateTable flatAggregate(String tableAggFunction) {
-            return 
flatAggregate(ExpressionParser.parseExpression(tableAggFunction));
+            return 
flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggFunction));
         }
 
         @Override
@@ -699,7 +703,10 @@ public class TableImpl implements Table {
 
         @Override
         public Table select(String fields) {
-            return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+            return select(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -729,7 +736,7 @@ public class TableImpl implements Table {
         public Table select(String fields) {
             return table.createTable(
                     table.operationTreeBuilder.project(
-                            ExpressionParser.parseExpressionList(fields),
+                            
ExpressionParser.INSTANCE.parseExpressionList(fields),
                             table.operationTreeBuilder.tableAggregate(
                                     groupKey,
                                     
tableAggregateFunction.accept(table.lookupResolver),
@@ -759,7 +766,10 @@ public class TableImpl implements Table {
 
         @Override
         public WindowGroupedTable groupBy(String fields) {
-            return 
groupBy(ExpressionParser.parseExpressionList(fields).toArray(new 
Expression[0]));
+            return groupBy(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -791,7 +801,10 @@ public class TableImpl implements Table {
 
         @Override
         public Table select(String fields) {
-            return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+            return select(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -816,7 +829,7 @@ public class TableImpl implements Table {
 
         @Override
         public AggregatedTable aggregate(String aggregateFunction) {
-            return 
aggregate(ExpressionParser.parseExpression(aggregateFunction));
+            return 
aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction));
         }
 
         @Override
@@ -826,7 +839,7 @@ public class TableImpl implements Table {
 
         @Override
         public FlatAggregateTable flatAggregate(String tableAggregateFunction) 
{
-            return 
flatAggregate(ExpressionParser.parseExpression(tableAggregateFunction));
+            return 
flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggregateFunction));
         }
 
         @Override
@@ -855,7 +868,10 @@ public class TableImpl implements Table {
 
         @Override
         public Table select(String fields) {
-            return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+            return select(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -914,7 +930,10 @@ public class TableImpl implements Table {
 
         @Override
         public Table select(String fields) {
-            return 
select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
+            return select(
+                    ExpressionParser.INSTANCE
+                            .parseExpressionList(fields)
+                            .toArray(new Expression[0]));
         }
 
         @Override
@@ -969,7 +988,7 @@ public class TableImpl implements Table {
         public Table select(String fields) {
             return table.createTable(
                     table.operationTreeBuilder.project(
-                            ExpressionParser.parseExpressionList(fields),
+                            
ExpressionParser.INSTANCE.parseExpressionList(fields),
                             table.operationTree,
                             overWindows));
         }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
similarity index 56%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
index 2fbe87d..efd424a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java
@@ -16,27 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.delegation.PlannerExpressionParser;
+import org.apache.flink.table.expressions.Expression;
 
 import java.util.List;
 
 /**
- * Parser for expressions inside a String. This parses exactly the same 
expressions that would be
- * accepted by the Scala Expression DSL.
+ * {@link Expression} parser used by Table API to parse strings in AST 
expression. This parses
+ * exactly the same expressions that would be accepted by the Scala Expression 
DSL.
  *
- * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse 
expressions.
+ * @deprecated The Java String Expression DSL is deprecated.
  */
 @Internal
-public final class ExpressionParser {
+@Deprecated
+public interface ExpressionParser {
 
-    public static Expression parseExpression(String exprString) {
-        return PlannerExpressionParser.create().parseExpression(exprString);
-    }
+    /** Default instance of the {@link ExpressionParser}. */
+    ExpressionParser INSTANCE = ExpressionParserFactory.getDefault().create();
 
-    public static List<Expression> parseExpressionList(String expression) {
-        return 
PlannerExpressionParser.create().parseExpressionList(expression);
-    }
+    Expression parseExpression(String exprString);
+
+    List<Expression> parseExpressionList(String expression);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
similarity index 55%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
index 2fbe87d..a61c1cb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java
@@ -16,27 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.delegation.PlannerExpressionParser;
-
-import java.util.List;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
 
 /**
- * Parser for expressions inside a String. This parses exactly the same 
expressions that would be
- * accepted by the Scala Expression DSL.
+ * Factory for {@link ExpressionParser}.
  *
- * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse 
expressions.
+ * @deprecated The Java String Expression DSL is deprecated.
  */
 @Internal
-public final class ExpressionParser {
+@Deprecated
+public interface ExpressionParserFactory extends Factory {
 
-    public static Expression parseExpression(String exprString) {
-        return PlannerExpressionParser.create().parseExpression(exprString);
-    }
+    /** {@link #factoryIdentifier()} for the default {@link 
ExpressionParserFactory}. */
+    String DEFAULT_IDENTIFIER = "default";
+
+    ExpressionParser create();
 
-    public static List<Expression> parseExpressionList(String expression) {
-        return 
PlannerExpressionParser.create().parseExpressionList(expression);
+    static ExpressionParserFactory getDefault() {
+        return FactoryUtil.discoverFactory(
+                Thread.currentThread().getContextClassLoader(),
+                ExpressionParserFactory.class,
+                DEFAULT_IDENTIFIER);
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
deleted file mode 100644
index 9b0e6e9..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.delegation;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionParser;
-
-import java.lang.reflect.Constructor;
-import java.util.List;
-
-/**
- * Temporary utility for parsing expressions inside a String. This parses 
exactly the same
- * expressions that would be accepted by the Scala Expression DSL.
- *
- * <p>{@link PlannerExpressionParser} is used by {@link ExpressionParser} to 
parse expressions.
- */
-@Internal
-public interface PlannerExpressionParser {
-
-    static PlannerExpressionParser create() {
-        return SingletonPlannerExpressionParser.getExpressionParser();
-    }
-
-    Expression parseExpression(String exprString);
-
-    List<Expression> parseExpressionList(String expression);
-
-    /**
-     * Util class to create {@link PlannerExpressionParser} instance. Use 
singleton pattern to avoid
-     * creating many {@link PlannerExpressionParser}.
-     */
-    class SingletonPlannerExpressionParser {
-
-        private static volatile PlannerExpressionParser expressionParser;
-
-        private SingletonPlannerExpressionParser() {}
-
-        public static PlannerExpressionParser getExpressionParser() {
-
-            if (expressionParser == null) {
-                try {
-                    Class<?> clazz =
-                            Class.forName(
-                                    
"org.apache.flink.table.expressions.PlannerExpressionParserImpl");
-                    Constructor<?> con = clazz.getConstructor();
-                    expressionParser = (PlannerExpressionParser) 
con.newInstance();
-                } catch (Throwable t) {
-                    throw new TableException(
-                            "Construction of PlannerExpressionParserImpl class 
failed.", t);
-                }
-            }
-            return expressionParser;
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
deleted file mode 100644
index ef1e1f0..0000000
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/** Test failures for the creation of window. */
-public class WindowCreationValidationTest {
-
-    @Rule public final ExpectedException exception = ExpectedException.none();
-
-    @Test
-    public void testTumbleOverForString() {
-        exception.expect(TableException.class);
-        exception.expectMessage("Construction of PlannerExpressionParserImpl 
class failed.");
-        Tumble.over("4.hours");
-    }
-
-    @Test
-    public void testSlideOverForString() {
-        exception.expect(TableException.class);
-        exception.expectMessage("Construction of PlannerExpressionParserImpl 
class failed.");
-        Slide.over("4.hours");
-    }
-
-    @Test
-    public void testSessionWithGapForString() {
-        exception.expect(TableException.class);
-        exception.expectMessage("Construction of PlannerExpressionParserImpl 
class failed.");
-        Session.withGap("4.hours");
-    }
-
-    @Test
-    public void testOverWithPartitionByForString() {
-        exception.expect(TableException.class);
-        exception.expectMessage("Construction of PlannerExpressionParserImpl 
class failed.");
-        Over.partitionBy("a");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
new file mode 100644
index 0000000..fe1753b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.delegation.ExpressionParser;
+import org.apache.flink.table.delegation.ExpressionParserFactory;
+import org.apache.flink.table.expressions.ExpressionParserImpl;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Default factory for {@link ExpressionParser}. */
+@Internal
+public final class DefaultExpressionParserFactory implements 
ExpressionParserFactory {
+
+    @Override
+    public ExpressionParser create() {
+        return new ExpressionParserImpl();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return ExpressionParserFactory.DEFAULT_IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index ac5f9b6..6d9eff6 100644
--- 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,3 +16,4 @@
 org.apache.flink.table.planner.delegation.DefaultExecutorFactory
 org.apache.flink.table.planner.delegation.DefaultParserFactory
 org.apache.flink.table.planner.delegation.DefaultPlannerFactory
+org.apache.flink.table.planner.delegation.DefaultExpressionParserFactory
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
similarity index 97%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
rename to 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
index 811e4f3..56a688b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala
@@ -19,32 +19,31 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
-import org.apache.flink.table.delegation.PlannerExpressionParser
 import ApiExpressionUtils._
+import org.apache.flink.table.delegation.ExpressionParser
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import _root_.java.math.{BigDecimal => JBigDecimal}
 import _root_.java.util.{List => JList}
-
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.language.implicitConversions
 import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
 
 /**
-  * The implementation of a [[PlannerExpressionParser]] which parsers 
expressions inside a String.
+  * The implementation of a [[ExpressionParser]] which parsers expressions 
inside a String.
   *
   * <p><strong>WARNING</strong>: please keep this class in sync with 
PlannerExpressionParserImpl
   * variant in flink-table-planner module.
   */
-class PlannerExpressionParserImpl extends PlannerExpressionParser {
+class ExpressionParserImpl extends ExpressionParser {
 
-  def parseExpression(exprString: String): Expression = {
-    PlannerExpressionParserImpl.parseExpression(exprString)
+  override def parseExpression(exprString: String): Expression = {
+    ExpressionParserImpl.parseExpression(exprString)
   }
 
   override def parseExpressionList(expression: String): JList[Expression] = {
-    PlannerExpressionParserImpl.parseExpressionList(expression)
+    ExpressionParserImpl.parseExpressionList(expression)
   }
 }
 
@@ -56,9 +55,9 @@ class PlannerExpressionParserImpl extends 
PlannerExpressionParser {
  * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
  * lazy valined in the above files.
  */
-object PlannerExpressionParserImpl extends JavaTokenParsers
+object ExpressionParserImpl extends JavaTokenParsers
   with PackratParsers
-  with PlannerExpressionParser {
+  with ExpressionParser {
 
   case class Keyword(key: String)
 
@@ -140,7 +139,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
   lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH")
   lazy val TO: Keyword = Keyword("TO")
 
-  def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident
+  def functionIdent: ExpressionParserImpl.Parser[String] = super.ident
 
   // symbols
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
index f9a6ced..2e613f9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala
@@ -18,10 +18,9 @@
 
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.delegation.ExpressionParser
 import org.apache.flink.table.expressions.ApiExpressionUtils.{lookupCall, 
unresolvedCall, unresolvedRef}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -34,30 +33,30 @@ class KeywordParseTest {
   def testKeyword(): Unit = {
     assertEquals(
       unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, 
unresolvedRef("f0")),
-      ExpressionParser.parseExpression("f0.asc"))
+      ExpressionParser.INSTANCE.parseExpression("f0.asc"))
     assertEquals(
       unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, 
unresolvedRef("f0")),
-      ExpressionParser.parseExpression("f0.asc()"))
+      ExpressionParser.INSTANCE.parseExpression("f0.asc()"))
   }
 
   @Test
   def testKeywordAsPrefixInFunctionName(): Unit = {
     assertEquals(
       lookupCall("ascii", unresolvedRef("f0")),
-      ExpressionParser.parseExpression("f0.ascii()"))
+      ExpressionParser.INSTANCE.parseExpression("f0.ascii()"))
   }
 
   @Test
   def testKeywordAsInfixInFunctionName(): Unit = {
     assertEquals(
       lookupCall("iiascii", unresolvedRef("f0")),
-      ExpressionParser.parseExpression("f0.iiascii()"))
+      ExpressionParser.INSTANCE.parseExpression("f0.iiascii()"))
   }
 
   @Test
   def testKeywordAsSuffixInFunctionName(): Unit = {
     assertEquals(
       lookupCall("iiasc", unresolvedRef("f0")),
-      ExpressionParser.parseExpression("f0.iiasc()"))
+      ExpressionParser.INSTANCE.parseExpression("f0.iiasc()"))
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index d6b7101..396ebca 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.{Expression, ExpressionParser, 
TimeIntervalUnit, TimePointUnit}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.{Expression, TimeIntervalUnit, 
TimePointUnit}
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
 import org.junit.Test
 
 class ScalarFunctionsTest extends ScalarTypesTestBase {
@@ -112,7 +112,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
     cases.foreach(x => {
       testAllApis(
-        ExpressionParser.parseExpression(x._1),
+        ExpressionParser.INSTANCE.parseExpression(x._1),
         x._1,
         x._2,
         x._3
@@ -3959,7 +3959,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       val tableApiString = x._1.format(field)
       val sqlApiString = x._2.format(field)
       testAllApis(
-        ExpressionParser.parseExpression(tableApiString),
+        ExpressionParser.INSTANCE.parseExpression(tableApiString),
         tableApiString,
         sqlApiString,
         "null"
@@ -3985,7 +3985,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
           .format(x)
 
       testAllApis(
-        ExpressionParser.parseExpression(tableApiString),
+        ExpressionParser.INSTANCE.parseExpression(tableApiString),
         tableApiString,
         sqlApiString,
         "null"
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 7cec8b0..5204b94 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -39,7 +39,8 @@ import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.conversion.{DataStructureConverter, 
DataStructureConverters}
 import org.apache.flink.table.data.util.DataFormatConverters
 import 
org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.PlannerBase
@@ -55,7 +56,6 @@ import org.junit.{After, Before, Rule}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 abstract class ExpressionTestBase {
 
@@ -276,7 +276,8 @@ abstract class ExpressionTestBase {
   }
 
   private def testTableApiTestExpr(tableApiString: String, expected: String): 
Unit = {
-    addTableApiTestExpr(ExpressionParser.parseExpression(tableApiString), 
expected, validExprs)
+    addTableApiTestExpr(
+      ExpressionParser.INSTANCE.parseExpression(tableApiString), expected, 
validExprs)
   }
 
   private def addSqlTestExpr(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 0388d32..6658c36 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.table.api.{DataTypes, TableConfig}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, 
unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, 
GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL}
 import org.apache.flink.table.functions.{AggregateFunctionDefinition, 
FunctionIdentifier}
 import org.apache.flink.table.module.ModuleManager
@@ -32,13 +32,13 @@ import 
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.planner.utils.{DateTimeTestUtil, 
IntSumAggFunction}
 import org.apache.flink.table.utils.CatalogManagerMocks
-
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.SqlPostfixOperator
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+import org.apache.flink.table.delegation.ExpressionParser
 import org.hamcrest.CoreMatchers.is
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, 
assertTrue}
 import org.junit.Test
@@ -46,7 +46,6 @@ import org.junit.Test
 import java.math.BigDecimal
 import java.time.ZoneId
 import java.util.{Arrays, TimeZone, List => JList}
-
 import scala.collection.JavaConverters._
 
 /**
@@ -112,8 +111,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     val builder: RexBuilder = new FlinkRexBuilder(typeFactory)
     val expr = buildConditionExpr()
 
-    val firstExp = ExpressionParser.parseExpression("id > 6")
-    val secondExp = ExpressionParser.parseExpression("amount * price < 100")
+    val firstExp = ExpressionParser.INSTANCE.parseExpression("id > 6")
+    val secondExp = ExpressionParser.INSTANCE.parseExpression("amount * price 
< 100")
     val expected: Array[Expression] = Array(firstExp, secondExp)
 
     val (convertedExpressions, unconvertedRexNodes) =
@@ -147,7 +146,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         relBuilder,
         functionCatalog)
 
-    val expected: Array[Expression] = 
Array(ExpressionParser.parseExpression("amount >= id"))
+    val expected: Array[Expression] = Array(
+      ExpressionParser.INSTANCE.parseExpression("amount >= id"))
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
@@ -197,9 +197,9 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("amount < 100 || price == 100 || price 
=== 200"),
-      ExpressionParser.parseExpression("id > 100 || price == 100 || price === 
200"),
-      ExpressionParser.parseExpression("!(amount <= id)"))
+      ExpressionParser.INSTANCE.parseExpression("amount < 100 || price == 100 
|| price === 200"),
+      ExpressionParser.INSTANCE.parseExpression("id > 100 || price == 100 || 
price === 200"),
+      ExpressionParser.INSTANCE.parseExpression("!(amount <= id)"))
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
@@ -237,10 +237,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("amount < 100"),
-      ExpressionParser.parseExpression("amount <= id"),
-      ExpressionParser.parseExpression("id > 100"),
-      ExpressionParser.parseExpression("price === 100")
+      ExpressionParser.INSTANCE.parseExpression("amount < 100"),
+      ExpressionParser.INSTANCE.parseExpression("amount <= id"),
+      ExpressionParser.INSTANCE.parseExpression("id > 100"),
+      ExpressionParser.INSTANCE.parseExpression("price === 100")
     )
 
     assertExpressionArrayEquals(expected, convertedExpressions)
@@ -393,16 +393,16 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("amount < id"),
-      ExpressionParser.parseExpression("amount <= id"),
-      ExpressionParser.parseExpression("amount <> id"),
-      ExpressionParser.parseExpression("amount == id"),
-      ExpressionParser.parseExpression("amount >= id"),
-      ExpressionParser.parseExpression("amount > id"),
-      ExpressionParser.parseExpression("amount + id == 100"),
-      ExpressionParser.parseExpression("amount - id == 100"),
-      ExpressionParser.parseExpression("amount * id == 100"),
-      ExpressionParser.parseExpression("amount / id == 100")
+      ExpressionParser.INSTANCE.parseExpression("amount < id"),
+      ExpressionParser.INSTANCE.parseExpression("amount <= id"),
+      ExpressionParser.INSTANCE.parseExpression("amount <> id"),
+      ExpressionParser.INSTANCE.parseExpression("amount == id"),
+      ExpressionParser.INSTANCE.parseExpression("amount >= id"),
+      ExpressionParser.INSTANCE.parseExpression("amount > id"),
+      ExpressionParser.INSTANCE.parseExpression("amount + id == 100"),
+      ExpressionParser.INSTANCE.parseExpression("amount - id == 100"),
+      ExpressionParser.INSTANCE.parseExpression("amount * id == 100"),
+      ExpressionParser.INSTANCE.parseExpression("amount / id == 100")
     )
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
@@ -508,7 +508,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     assertEquals(0, unconvertedRexNodes.length)
 
     assertExpressionArrayEquals(
-      Array(ExpressionParser.parseExpression("amount <= id")),
+      Array(ExpressionParser.INSTANCE.parseExpression("amount <= id")),
       Array(convertedExpressions(1)))
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
index d801362..f389897 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
@@ -24,14 +24,13 @@ import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.delegation.ExpressionParser
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.utils.TableTestUtil
 
 import _root_.java.util.UUID
-
 import _root_.scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -222,7 +221,8 @@ object BatchTableEnvUtil {
       fieldNames: Option[Array[String]],
       fieldNullables: Option[Array[Boolean]],
       statistic: Option[FlinkStatistic]): Unit = {
-    val fields = fieldNames.map((f: Array[String]) => 
f.map(ExpressionParser.parseExpression))
+    val fields = fieldNames.map((f: Array[String]) =>
+      f.map(ExpressionParser.INSTANCE.parseExpression))
     // for tests we know that this stream is definitely bounded
     
ExecNodeUtil.makeLegacySourceTransformationsBounded(boundedStream.getTransformation)
     TableTestUtil.createTemporaryView(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
index b5bc5b8..c30a090 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.planner.runtime.utils
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
 
 import scala.collection.mutable
 import scala.util.Random
@@ -284,7 +285,7 @@ object CollectionBatchExecTable {
     if (fields == null) {
       null
     } else {
-      ExpressionParser.parseExpressionList(fields).toArray(Array[Expression]())
+      
ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(Array[Expression]())
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
index 27dd611..1bebf6d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala
@@ -21,7 +21,8 @@ package org.apache.flink.table.planner.runtime.utils
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.delegation.ExpressionParser
+import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.utils.TableTestUtil
 
@@ -44,7 +45,7 @@ object StreamTableEnvUtil {
       fieldNullables: Option[Array[Boolean]],
       statistic: Option[FlinkStatistic]): Unit = {
     val fields: Option[Array[Expression]] = fieldNames match {
-      case Some(names) => Some(names.map(ExpressionParser.parseExpression))
+      case Some(names) => 
Some(names.map(ExpressionParser.INSTANCE.parseExpression))
       case _ => None
     }
     TableTestUtil.createTemporaryView(tEnv, name, dataStream, fields, 
fieldNullables, statistic)

Reply via email to