This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8c40849 [FLINK-25075][table] Refactored PlannerExpressionParser to avoid instantiation through reflections 8c40849 is described below commit 8c40849c5249d96532305d6b88302ee863371541 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Fri Nov 26 11:44:26 2021 +0100 [FLINK-25075][table] Refactored PlannerExpressionParser to avoid instantiation through reflections This closes #17931. --- .../java/internal/StreamTableEnvironmentImpl.java | 4 +- .../main/java/org/apache/flink/table/api/Over.java | 5 +- .../flink/table/api/OverWindowPartitioned.java | 4 +- .../table/api/OverWindowPartitionedOrdered.java | 6 +- .../api/OverWindowPartitionedOrderedPreceding.java | 6 +- .../java/org/apache/flink/table/api/Session.java | 4 +- .../org/apache/flink/table/api/SessionWithGap.java | 4 +- .../flink/table/api/SessionWithGapOnTime.java | 4 +- .../java/org/apache/flink/table/api/Slide.java | 4 +- .../org/apache/flink/table/api/SlideWithSize.java | 4 +- .../flink/table/api/SlideWithSizeAndSlide.java | 4 +- .../table/api/SlideWithSizeAndSlideOnTime.java | 4 +- .../java/org/apache/flink/table/api/Tumble.java | 4 +- .../org/apache/flink/table/api/TumbleWithSize.java | 4 +- .../flink/table/api/TumbleWithSizeOnTime.java | 4 +- .../apache/flink/table/api/internal/TableImpl.java | 93 +++++++++++++--------- .../ExpressionParser.java | 24 +++--- .../ExpressionParserFactory.java} | 29 ++++--- .../table/delegation/PlannerExpressionParser.java | 73 ----------------- .../table/api/WindowCreationValidationTest.java | 57 ------------- .../delegation/DefaultExpressionParserFactory.java | 53 ++++++++++++ .../org.apache.flink.table.factories.Factory | 1 + ...ParserImpl.scala => ExpressionParserImpl.scala} | 19 +++-- .../planner/expressions/KeywordParseTest.scala | 13 ++- .../planner/expressions/ScalarFunctionsTest.scala | 10 +-- .../expressions/utils/ExpressionTestBase.scala | 7 +- .../planner/plan/utils/RexNodeExtractorTest.scala | 48 +++++------ .../planner/runtime/utils/BatchTableEnvUtil.scala | 6 +- .../runtime/utils/CollectionBatchExecTable.scala | 5 +- .../planner/runtime/utils/StreamTableEnvUtil.scala | 5 +- 30 files changed, 228 insertions(+), 280 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 72a92d0..b0957f6 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -49,10 +49,10 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.functions.AggregateFunction; @@ -458,7 +458,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl @Override public <T> Table fromDataStream(DataStream<T> dataStream, String fields) { - List<Expression> expressions = ExpressionParser.parseExpressionList(fields); + List<Expression> expressions = ExpressionParser.INSTANCE.parseExpressionList(fields); return fromDataStream(dataStream, expressions.toArray(new Expression[0])); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java index 7862816..97a1eec 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import java.util.Arrays; @@ -53,7 +53,8 @@ public final class Over { * @return an over window with defined partitioning */ public static OverWindowPartitioned partitionBy(String partitionBy) { - return new OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy)); + return new OverWindowPartitioned( + ExpressionParser.INSTANCE.parseExpressionList(partitionBy)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java index 7c47444..2d35689 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import java.util.List; @@ -49,7 +49,7 @@ public final class OverWindowPartitioned { */ @Deprecated public OverWindowPartitionedOrdered orderBy(String orderBy) { - return this.orderBy(ExpressionParser.parseExpression(orderBy)); + return this.orderBy(ExpressionParser.INSTANCE.parseExpression(orderBy)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java index d1d82a8..92a2360 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import java.util.List; @@ -49,7 +49,7 @@ public final class OverWindowPartitionedOrdered { */ @Deprecated public OverWindowPartitionedOrderedPreceding preceding(String preceding) { - return this.preceding(ExpressionParser.parseExpression(preceding)); + return this.preceding(ExpressionParser.INSTANCE.parseExpression(preceding)); } /** @@ -69,7 +69,7 @@ public final class OverWindowPartitionedOrdered { * @return the fully defined over window */ public OverWindow as(String alias) { - return as(ExpressionParser.parseExpression(alias)); + return as(ExpressionParser.INSTANCE.parseExpression(alias)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java index 885306e..32511fb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import java.util.List; import java.util.Optional; @@ -48,7 +48,7 @@ public final class OverWindowPartitionedOrderedPreceding { * @return the fully defined over window */ public OverWindow as(String alias) { - return as(ExpressionParser.parseExpression(alias)); + return as(ExpressionParser.INSTANCE.parseExpression(alias)); } /** @@ -70,7 +70,7 @@ public final class OverWindowPartitionedOrderedPreceding { */ @Deprecated public OverWindowPartitionedOrderedPreceding following(String following) { - return this.following(ExpressionParser.parseExpression(following)); + return this.following(ExpressionParser.INSTANCE.parseExpression(following)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java index 50cb165..e075c6d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Helper class for creating a session window. The boundary of session windows are defined by @@ -53,7 +53,7 @@ public final class Session { */ @Deprecated public static SessionWithGap withGap(String gap) { - return withGap(ExpressionParser.parseExpression(gap)); + return withGap(ExpressionParser.INSTANCE.parseExpression(gap)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java index 49521bb..0859cd2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Session window. @@ -54,7 +54,7 @@ public final class SessionWithGap { */ @Deprecated public SessionWithGapOnTime on(String timeField) { - return on(ExpressionParser.parseExpression(timeField)); + return on(ExpressionParser.INSTANCE.parseExpression(timeField)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java index 202a940..d53c1ed 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** Session window on time. */ @PublicEvolving @@ -44,7 +44,7 @@ public final class SessionWithGapOnTime { * @return this window */ public SessionWithGapOnTimeWithAlias as(String alias) { - return as(ExpressionParser.parseExpression(alias)); + return as(ExpressionParser.INSTANCE.parseExpression(alias)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java index 6c4673d..4d1bb81 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by a @@ -61,7 +61,7 @@ public final class Slide { */ @Deprecated public static SlideWithSize over(String size) { - return over(ExpressionParser.parseExpression(size)); + return over(ExpressionParser.INSTANCE.parseExpression(size)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java index 1e29478..7b0227f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Partially specified sliding window. The size of the window either as time or row-count interval. @@ -52,7 +52,7 @@ public final class SlideWithSize { */ @Deprecated public SlideWithSizeAndSlide every(String slide) { - return every(ExpressionParser.parseExpression(slide)); + return every(ExpressionParser.INSTANCE.parseExpression(slide)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java index 3713500..d101d6e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Sliding window. The size of the window either as time or row-count interval. @@ -57,7 +57,7 @@ public final class SlideWithSizeAndSlide { */ @Deprecated public SlideWithSizeAndSlideOnTime on(String timeField) { - return on(ExpressionParser.parseExpression(timeField)); + return on(ExpressionParser.INSTANCE.parseExpression(timeField)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java index 4a57bdb..d21a01c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** Sliding window on time. */ @PublicEvolving @@ -46,7 +46,7 @@ public final class SlideWithSizeAndSlideOnTime { * @return this window */ public SlideWithSizeAndSlideOnTimeWithAlias as(String alias) { - return as(ExpressionParser.parseExpression(alias)); + return as(ExpressionParser.INSTANCE.parseExpression(alias)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java index 87743c9..9a3489b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java @@ -19,8 +19,8 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -53,7 +53,7 @@ public final class Tumble { */ @Deprecated public static TumbleWithSize over(String size) { - return over(ExpressionParser.parseExpression(size)); + return over(ExpressionParser.INSTANCE.parseExpression(size)); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java index 563d0c6..20e34c5 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** * Tumbling window. @@ -69,6 +69,6 @@ public final class TumbleWithSize { */ @Deprecated public TumbleWithSizeOnTime on(String timeField) { - return on(ExpressionParser.parseExpression(timeField)); + return on(ExpressionParser.INSTANCE.parseExpression(timeField)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java index 8f06421..083a835 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java @@ -19,9 +19,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; /** Tumbling window on time. */ @PublicEvolving @@ -56,6 +56,6 @@ public final class TumbleWithSizeOnTime { * @return this window */ public TumbleWithSizeOnTimeWithAlias as(String alias) { - return as(ExpressionParser.parseExpression(alias)); + return as(ExpressionParser.INSTANCE.parseExpression(alias)); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 682fd93..a1030b26c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -39,8 +39,8 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaTranslator; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; import org.apache.flink.table.expressions.UnresolvedReferenceExpression; import org.apache.flink.table.expressions.resolver.LookupCallResolver; import org.apache.flink.table.functions.TemporalTableFunction; @@ -120,7 +120,8 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(new Expression[0])); } @Override @@ -151,8 +152,8 @@ public class TableImpl implements Table { public TemporalTableFunction createTemporalTableFunction( String timeAttribute, String primaryKey) { return createTemporalTableFunction( - ExpressionParser.parseExpression(timeAttribute), - ExpressionParser.parseExpression(primaryKey)); + ExpressionParser.INSTANCE.parseExpression(timeAttribute), + ExpressionParser.INSTANCE.parseExpression(primaryKey)); } @Override @@ -171,7 +172,7 @@ public class TableImpl implements Table { public Table as(String field, String... fields) { final List<Expression> fieldsExprs; if (fields.length == 0 && operationTree.getResolvedSchema().getColumnCount() > 1) { - fieldsExprs = ExpressionParser.parseExpressionList(field); + fieldsExprs = ExpressionParser.INSTANCE.parseExpressionList(field); } else { fieldsExprs = new ArrayList<>(); fieldsExprs.add(lit(field)); @@ -189,7 +190,7 @@ public class TableImpl implements Table { @Override public Table filter(String predicate) { - return filter(ExpressionParser.parseExpression(predicate)); + return filter(ExpressionParser.INSTANCE.parseExpression(predicate)); } @Override @@ -210,7 +211,7 @@ public class TableImpl implements Table { @Override public GroupedTable groupBy(String fields) { - return new GroupedTableImpl(this, ExpressionParser.parseExpressionList(fields)); + return new GroupedTableImpl(this, ExpressionParser.INSTANCE.parseExpressionList(fields)); } @Override @@ -230,7 +231,7 @@ public class TableImpl implements Table { @Override public Table join(Table right, String joinPredicate) { - return join(right, ExpressionParser.parseExpression(joinPredicate)); + return join(right, ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -245,7 +246,7 @@ public class TableImpl implements Table { @Override public Table leftOuterJoin(Table right, String joinPredicate) { - return leftOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + return leftOuterJoin(right, ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -255,7 +256,7 @@ public class TableImpl implements Table { @Override public Table rightOuterJoin(Table right, String joinPredicate) { - return rightOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + return rightOuterJoin(right, ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -265,7 +266,7 @@ public class TableImpl implements Table { @Override public Table fullOuterJoin(Table right, String joinPredicate) { - return fullOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)); + return fullOuterJoin(right, ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -297,7 +298,7 @@ public class TableImpl implements Table { @Override public Table joinLateral(String tableFunctionCall) { - return joinLateral(ExpressionParser.parseExpression(tableFunctionCall)); + return joinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall)); } @Override @@ -308,8 +309,8 @@ public class TableImpl implements Table { @Override public Table joinLateral(String tableFunctionCall, String joinPredicate) { return joinLateral( - ExpressionParser.parseExpression(tableFunctionCall), - ExpressionParser.parseExpression(joinPredicate)); + ExpressionParser.INSTANCE.parseExpression(tableFunctionCall), + ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -319,7 +320,7 @@ public class TableImpl implements Table { @Override public Table leftOuterJoinLateral(String tableFunctionCall) { - return leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall)); + return leftOuterJoinLateral(ExpressionParser.INSTANCE.parseExpression(tableFunctionCall)); } @Override @@ -330,8 +331,8 @@ public class TableImpl implements Table { @Override public Table leftOuterJoinLateral(String tableFunctionCall, String joinPredicate) { return leftOuterJoinLateral( - ExpressionParser.parseExpression(tableFunctionCall), - ExpressionParser.parseExpression(joinPredicate)); + ExpressionParser.INSTANCE.parseExpression(tableFunctionCall), + ExpressionParser.INSTANCE.parseExpression(joinPredicate)); } @Override @@ -406,7 +407,7 @@ public class TableImpl implements Table { public Table orderBy(String fields) { return createTable( operationTreeBuilder.sort( - ExpressionParser.parseExpressionList(fields), operationTree)); + ExpressionParser.INSTANCE.parseExpressionList(fields), operationTree)); } @Override @@ -449,7 +450,7 @@ public class TableImpl implements Table { @Override public Table addColumns(String fields) { - return addColumnsOperation(false, ExpressionParser.parseExpressionList(fields)); + return addColumnsOperation(false, ExpressionParser.INSTANCE.parseExpressionList(fields)); } @Override @@ -459,7 +460,7 @@ public class TableImpl implements Table { @Override public Table addOrReplaceColumns(String fields) { - return addColumnsOperation(true, ExpressionParser.parseExpressionList(fields)); + return addColumnsOperation(true, ExpressionParser.INSTANCE.parseExpressionList(fields)); } @Override @@ -490,7 +491,7 @@ public class TableImpl implements Table { public Table renameColumns(String fields) { return createTable( operationTreeBuilder.renameColumns( - ExpressionParser.parseExpressionList(fields), operationTree)); + ExpressionParser.INSTANCE.parseExpressionList(fields), operationTree)); } @Override @@ -503,7 +504,7 @@ public class TableImpl implements Table { public Table dropColumns(String fields) { return createTable( operationTreeBuilder.dropColumns( - ExpressionParser.parseExpressionList(fields), operationTree)); + ExpressionParser.INSTANCE.parseExpressionList(fields), operationTree)); } @Override @@ -513,7 +514,7 @@ public class TableImpl implements Table { @Override public Table map(String mapFunction) { - return map(ExpressionParser.parseExpression(mapFunction)); + return map(ExpressionParser.INSTANCE.parseExpression(mapFunction)); } @Override @@ -523,7 +524,7 @@ public class TableImpl implements Table { @Override public Table flatMap(String tableFunction) { - return flatMap(ExpressionParser.parseExpression(tableFunction)); + return flatMap(ExpressionParser.INSTANCE.parseExpression(tableFunction)); } @Override @@ -533,7 +534,7 @@ public class TableImpl implements Table { @Override public AggregatedTable aggregate(String aggregateFunction) { - return aggregate(ExpressionParser.parseExpression(aggregateFunction)); + return aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction)); } @Override @@ -642,7 +643,10 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -666,7 +670,7 @@ public class TableImpl implements Table { @Override public AggregatedTable aggregate(String aggregateFunction) { - return aggregate(ExpressionParser.parseExpression(aggregateFunction)); + return aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction)); } @Override @@ -676,7 +680,7 @@ public class TableImpl implements Table { @Override public FlatAggregateTable flatAggregate(String tableAggFunction) { - return flatAggregate(ExpressionParser.parseExpression(tableAggFunction)); + return flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggFunction)); } @Override @@ -699,7 +703,10 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -729,7 +736,7 @@ public class TableImpl implements Table { public Table select(String fields) { return table.createTable( table.operationTreeBuilder.project( - ExpressionParser.parseExpressionList(fields), + ExpressionParser.INSTANCE.parseExpressionList(fields), table.operationTreeBuilder.tableAggregate( groupKey, tableAggregateFunction.accept(table.lookupResolver), @@ -759,7 +766,10 @@ public class TableImpl implements Table { @Override public WindowGroupedTable groupBy(String fields) { - return groupBy(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return groupBy( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -791,7 +801,10 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -816,7 +829,7 @@ public class TableImpl implements Table { @Override public AggregatedTable aggregate(String aggregateFunction) { - return aggregate(ExpressionParser.parseExpression(aggregateFunction)); + return aggregate(ExpressionParser.INSTANCE.parseExpression(aggregateFunction)); } @Override @@ -826,7 +839,7 @@ public class TableImpl implements Table { @Override public FlatAggregateTable flatAggregate(String tableAggregateFunction) { - return flatAggregate(ExpressionParser.parseExpression(tableAggregateFunction)); + return flatAggregate(ExpressionParser.INSTANCE.parseExpression(tableAggregateFunction)); } @Override @@ -855,7 +868,10 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -914,7 +930,10 @@ public class TableImpl implements Table { @Override public Table select(String fields) { - return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0])); + return select( + ExpressionParser.INSTANCE + .parseExpressionList(fields) + .toArray(new Expression[0])); } @Override @@ -969,7 +988,7 @@ public class TableImpl implements Table { public Table select(String fields) { return table.createTable( table.operationTreeBuilder.project( - ExpressionParser.parseExpressionList(fields), + ExpressionParser.INSTANCE.parseExpressionList(fields), table.operationTree, overWindows)); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java similarity index 56% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java index 2fbe87d..efd424a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParser.java @@ -16,27 +16,27 @@ * limitations under the License. */ -package org.apache.flink.table.expressions; +package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.delegation.PlannerExpressionParser; +import org.apache.flink.table.expressions.Expression; import java.util.List; /** - * Parser for expressions inside a String. This parses exactly the same expressions that would be - * accepted by the Scala Expression DSL. + * {@link Expression} parser used by Table API to parse strings in AST expression. This parses + * exactly the same expressions that would be accepted by the Scala Expression DSL. * - * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse expressions. + * @deprecated The Java String Expression DSL is deprecated. */ @Internal -public final class ExpressionParser { +@Deprecated +public interface ExpressionParser { - public static Expression parseExpression(String exprString) { - return PlannerExpressionParser.create().parseExpression(exprString); - } + /** Default instance of the {@link ExpressionParser}. */ + ExpressionParser INSTANCE = ExpressionParserFactory.getDefault().create(); - public static List<Expression> parseExpressionList(String expression) { - return PlannerExpressionParser.create().parseExpressionList(expression); - } + Expression parseExpression(String exprString); + + List<Expression> parseExpressionList(String expression); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java similarity index 55% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java index 2fbe87d..a61c1cb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExpressionParserFactory.java @@ -16,27 +16,30 @@ * limitations under the License. */ -package org.apache.flink.table.expressions; +package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.delegation.PlannerExpressionParser; - -import java.util.List; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; /** - * Parser for expressions inside a String. This parses exactly the same expressions that would be - * accepted by the Scala Expression DSL. + * Factory for {@link ExpressionParser}. * - * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse expressions. + * @deprecated The Java String Expression DSL is deprecated. */ @Internal -public final class ExpressionParser { +@Deprecated +public interface ExpressionParserFactory extends Factory { - public static Expression parseExpression(String exprString) { - return PlannerExpressionParser.create().parseExpression(exprString); - } + /** {@link #factoryIdentifier()} for the default {@link ExpressionParserFactory}. */ + String DEFAULT_IDENTIFIER = "default"; + + ExpressionParser create(); - public static List<Expression> parseExpressionList(String expression) { - return PlannerExpressionParser.create().parseExpressionList(expression); + static ExpressionParserFactory getDefault() { + return FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + ExpressionParserFactory.class, + DEFAULT_IDENTIFIER); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java deleted file mode 100644 index 9b0e6e9..0000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.delegation; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.ExpressionParser; - -import java.lang.reflect.Constructor; -import java.util.List; - -/** - * Temporary utility for parsing expressions inside a String. This parses exactly the same - * expressions that would be accepted by the Scala Expression DSL. - * - * <p>{@link PlannerExpressionParser} is used by {@link ExpressionParser} to parse expressions. - */ -@Internal -public interface PlannerExpressionParser { - - static PlannerExpressionParser create() { - return SingletonPlannerExpressionParser.getExpressionParser(); - } - - Expression parseExpression(String exprString); - - List<Expression> parseExpressionList(String expression); - - /** - * Util class to create {@link PlannerExpressionParser} instance. Use singleton pattern to avoid - * creating many {@link PlannerExpressionParser}. - */ - class SingletonPlannerExpressionParser { - - private static volatile PlannerExpressionParser expressionParser; - - private SingletonPlannerExpressionParser() {} - - public static PlannerExpressionParser getExpressionParser() { - - if (expressionParser == null) { - try { - Class<?> clazz = - Class.forName( - "org.apache.flink.table.expressions.PlannerExpressionParserImpl"); - Constructor<?> con = clazz.getConstructor(); - expressionParser = (PlannerExpressionParser) con.newInstance(); - } catch (Throwable t) { - throw new TableException( - "Construction of PlannerExpressionParserImpl class failed.", t); - } - } - return expressionParser; - } - } -} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java deleted file mode 100644 index ef1e1f0..0000000 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -/** Test failures for the creation of window. */ -public class WindowCreationValidationTest { - - @Rule public final ExpectedException exception = ExpectedException.none(); - - @Test - public void testTumbleOverForString() { - exception.expect(TableException.class); - exception.expectMessage("Construction of PlannerExpressionParserImpl class failed."); - Tumble.over("4.hours"); - } - - @Test - public void testSlideOverForString() { - exception.expect(TableException.class); - exception.expectMessage("Construction of PlannerExpressionParserImpl class failed."); - Slide.over("4.hours"); - } - - @Test - public void testSessionWithGapForString() { - exception.expect(TableException.class); - exception.expectMessage("Construction of PlannerExpressionParserImpl class failed."); - Session.withGap("4.hours"); - } - - @Test - public void testOverWithPartitionByForString() { - exception.expect(TableException.class); - exception.expectMessage("Construction of PlannerExpressionParserImpl class failed."); - Over.partitionBy("a"); - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java new file mode 100644 index 0000000..fe1753b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExpressionParserFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.delegation.ExpressionParser; +import org.apache.flink.table.delegation.ExpressionParserFactory; +import org.apache.flink.table.expressions.ExpressionParserImpl; + +import java.util.Collections; +import java.util.Set; + +/** Default factory for {@link ExpressionParser}. */ +@Internal +public final class DefaultExpressionParserFactory implements ExpressionParserFactory { + + @Override + public ExpressionParser create() { + return new ExpressionParserImpl(); + } + + @Override + public String factoryIdentifier() { + return ExpressionParserFactory.DEFAULT_IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index ac5f9b6..6d9eff6 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,3 +16,4 @@ org.apache.flink.table.planner.delegation.DefaultExecutorFactory org.apache.flink.table.planner.delegation.DefaultParserFactory org.apache.flink.table.planner.delegation.DefaultPlannerFactory +org.apache.flink.table.planner.delegation.DefaultExpressionParserFactory diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala similarity index 97% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala rename to flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala index 811e4f3..56a688b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParserImpl.scala @@ -19,32 +19,31 @@ package org.apache.flink.table.expressions import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.api._ -import org.apache.flink.table.delegation.PlannerExpressionParser import ApiExpressionUtils._ +import org.apache.flink.table.delegation.ExpressionParser import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType import _root_.java.math.{BigDecimal => JBigDecimal} import _root_.java.util.{List => JList} - import _root_.scala.collection.JavaConversions._ import _root_.scala.language.implicitConversions import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} /** - * The implementation of a [[PlannerExpressionParser]] which parsers expressions inside a String. + * The implementation of a [[ExpressionParser]] which parsers expressions inside a String. * * <p><strong>WARNING</strong>: please keep this class in sync with PlannerExpressionParserImpl * variant in flink-table-planner module. */ -class PlannerExpressionParserImpl extends PlannerExpressionParser { +class ExpressionParserImpl extends ExpressionParser { - def parseExpression(exprString: String): Expression = { - PlannerExpressionParserImpl.parseExpression(exprString) + override def parseExpression(exprString: String): Expression = { + ExpressionParserImpl.parseExpression(exprString) } override def parseExpressionList(expression: String): JList[Expression] = { - PlannerExpressionParserImpl.parseExpressionList(expression) + ExpressionParserImpl.parseExpressionList(expression) } } @@ -56,9 +55,9 @@ class PlannerExpressionParserImpl extends PlannerExpressionParser { * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL * lazy valined in the above files. */ -object PlannerExpressionParserImpl extends JavaTokenParsers +object ExpressionParserImpl extends JavaTokenParsers with PackratParsers - with PlannerExpressionParser { + with ExpressionParser { case class Keyword(key: String) @@ -140,7 +139,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH") lazy val TO: Keyword = Keyword("TO") - def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident + def functionIdent: ExpressionParserImpl.Parser[String] = super.ident // symbols diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala index f9a6ced..2e613f9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/KeywordParseTest.scala @@ -18,10 +18,9 @@ package org.apache.flink.table.planner.expressions -import org.apache.flink.table.expressions.ExpressionParser +import org.apache.flink.table.delegation.ExpressionParser import org.apache.flink.table.expressions.ApiExpressionUtils.{lookupCall, unresolvedCall, unresolvedRef} import org.apache.flink.table.functions.BuiltInFunctionDefinitions - import org.junit.Assert.assertEquals import org.junit.Test @@ -34,30 +33,30 @@ class KeywordParseTest { def testKeyword(): Unit = { assertEquals( unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")), - ExpressionParser.parseExpression("f0.asc")) + ExpressionParser.INSTANCE.parseExpression("f0.asc")) assertEquals( unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")), - ExpressionParser.parseExpression("f0.asc()")) + ExpressionParser.INSTANCE.parseExpression("f0.asc()")) } @Test def testKeywordAsPrefixInFunctionName(): Unit = { assertEquals( lookupCall("ascii", unresolvedRef("f0")), - ExpressionParser.parseExpression("f0.ascii()")) + ExpressionParser.INSTANCE.parseExpression("f0.ascii()")) } @Test def testKeywordAsInfixInFunctionName(): Unit = { assertEquals( lookupCall("iiascii", unresolvedRef("f0")), - ExpressionParser.parseExpression("f0.iiascii()")) + ExpressionParser.INSTANCE.parseExpression("f0.iiascii()")) } @Test def testKeywordAsSuffixInFunctionName(): Unit = { assertEquals( lookupCall("iiasc", unresolvedRef("f0")), - ExpressionParser.parseExpression("f0.iiasc()")) + ExpressionParser.INSTANCE.parseExpression("f0.iiasc()")) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index d6b7101..396ebca 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -19,9 +19,9 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.table.api._ -import org.apache.flink.table.expressions.{Expression, ExpressionParser, TimeIntervalUnit, TimePointUnit} +import org.apache.flink.table.delegation.ExpressionParser +import org.apache.flink.table.expressions.{Expression, TimeIntervalUnit, TimePointUnit} import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase - import org.junit.Test class ScalarFunctionsTest extends ScalarTypesTestBase { @@ -112,7 +112,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { cases.foreach(x => { testAllApis( - ExpressionParser.parseExpression(x._1), + ExpressionParser.INSTANCE.parseExpression(x._1), x._1, x._2, x._3 @@ -3959,7 +3959,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { val tableApiString = x._1.format(field) val sqlApiString = x._2.format(field) testAllApis( - ExpressionParser.parseExpression(tableApiString), + ExpressionParser.INSTANCE.parseExpression(tableApiString), tableApiString, sqlApiString, "null" @@ -3985,7 +3985,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { .format(x) testAllApis( - ExpressionParser.parseExpression(tableApiString), + ExpressionParser.INSTANCE.parseExpression(tableApiString), tableApiString, sqlApiString, "null" diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index 7cec8b0..5204b94 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -39,7 +39,8 @@ import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.data.conversion.{DataStructureConverter, DataStructureConverters} import org.apache.flink.table.data.util.DataFormatConverters import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.delegation.ExpressionParser +import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} import org.apache.flink.table.planner.delegation.PlannerBase @@ -55,7 +56,6 @@ import org.junit.{After, Before, Rule} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ListBuffer abstract class ExpressionTestBase { @@ -276,7 +276,8 @@ abstract class ExpressionTestBase { } private def testTableApiTestExpr(tableApiString: String, expected: String): Unit = { - addTableApiTestExpr(ExpressionParser.parseExpression(tableApiString), expected, validExprs) + addTableApiTestExpr( + ExpressionParser.INSTANCE.parseExpression(tableApiString), expected, validExprs) } private def addSqlTestExpr( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 0388d32..6658c36 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.table.api.{DataTypes, TableConfig} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral} -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL} import org.apache.flink.table.functions.{AggregateFunctionDefinition, FunctionIdentifier} import org.apache.flink.table.module.ModuleManager @@ -32,13 +32,13 @@ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction} import org.apache.flink.table.utils.CatalogManagerMocks - import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.SqlPostfixOperator import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction} import org.apache.calcite.util.{DateString, TimeString, TimestampString} +import org.apache.flink.table.delegation.ExpressionParser import org.hamcrest.CoreMatchers.is import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue} import org.junit.Test @@ -46,7 +46,6 @@ import org.junit.Test import java.math.BigDecimal import java.time.ZoneId import java.util.{Arrays, TimeZone, List => JList} - import scala.collection.JavaConverters._ /** @@ -112,8 +111,8 @@ class RexNodeExtractorTest extends RexNodeTestBase { val builder: RexBuilder = new FlinkRexBuilder(typeFactory) val expr = buildConditionExpr() - val firstExp = ExpressionParser.parseExpression("id > 6") - val secondExp = ExpressionParser.parseExpression("amount * price < 100") + val firstExp = ExpressionParser.INSTANCE.parseExpression("id > 6") + val secondExp = ExpressionParser.INSTANCE.parseExpression("amount * price < 100") val expected: Array[Expression] = Array(firstExp, secondExp) val (convertedExpressions, unconvertedRexNodes) = @@ -147,7 +146,8 @@ class RexNodeExtractorTest extends RexNodeTestBase { relBuilder, functionCatalog) - val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) + val expected: Array[Expression] = Array( + ExpressionParser.INSTANCE.parseExpression("amount >= id")) assertExpressionArrayEquals(expected, convertedExpressions) assertEquals(0, unconvertedRexNodes.length) } @@ -197,9 +197,9 @@ class RexNodeExtractorTest extends RexNodeTestBase { functionCatalog) val expected: Array[Expression] = Array( - ExpressionParser.parseExpression("amount < 100 || price == 100 || price === 200"), - ExpressionParser.parseExpression("id > 100 || price == 100 || price === 200"), - ExpressionParser.parseExpression("!(amount <= id)")) + ExpressionParser.INSTANCE.parseExpression("amount < 100 || price == 100 || price === 200"), + ExpressionParser.INSTANCE.parseExpression("id > 100 || price == 100 || price === 200"), + ExpressionParser.INSTANCE.parseExpression("!(amount <= id)")) assertExpressionArrayEquals(expected, convertedExpressions) assertEquals(0, unconvertedRexNodes.length) } @@ -237,10 +237,10 @@ class RexNodeExtractorTest extends RexNodeTestBase { functionCatalog) val expected: Array[Expression] = Array( - ExpressionParser.parseExpression("amount < 100"), - ExpressionParser.parseExpression("amount <= id"), - ExpressionParser.parseExpression("id > 100"), - ExpressionParser.parseExpression("price === 100") + ExpressionParser.INSTANCE.parseExpression("amount < 100"), + ExpressionParser.INSTANCE.parseExpression("amount <= id"), + ExpressionParser.INSTANCE.parseExpression("id > 100"), + ExpressionParser.INSTANCE.parseExpression("price === 100") ) assertExpressionArrayEquals(expected, convertedExpressions) @@ -393,16 +393,16 @@ class RexNodeExtractorTest extends RexNodeTestBase { functionCatalog) val expected: Array[Expression] = Array( - ExpressionParser.parseExpression("amount < id"), - ExpressionParser.parseExpression("amount <= id"), - ExpressionParser.parseExpression("amount <> id"), - ExpressionParser.parseExpression("amount == id"), - ExpressionParser.parseExpression("amount >= id"), - ExpressionParser.parseExpression("amount > id"), - ExpressionParser.parseExpression("amount + id == 100"), - ExpressionParser.parseExpression("amount - id == 100"), - ExpressionParser.parseExpression("amount * id == 100"), - ExpressionParser.parseExpression("amount / id == 100") + ExpressionParser.INSTANCE.parseExpression("amount < id"), + ExpressionParser.INSTANCE.parseExpression("amount <= id"), + ExpressionParser.INSTANCE.parseExpression("amount <> id"), + ExpressionParser.INSTANCE.parseExpression("amount == id"), + ExpressionParser.INSTANCE.parseExpression("amount >= id"), + ExpressionParser.INSTANCE.parseExpression("amount > id"), + ExpressionParser.INSTANCE.parseExpression("amount + id == 100"), + ExpressionParser.INSTANCE.parseExpression("amount - id == 100"), + ExpressionParser.INSTANCE.parseExpression("amount * id == 100"), + ExpressionParser.INSTANCE.parseExpression("amount / id == 100") ) assertExpressionArrayEquals(expected, convertedExpressions) assertEquals(0, unconvertedRexNodes.length) @@ -508,7 +508,7 @@ class RexNodeExtractorTest extends RexNodeTestBase { assertEquals(0, unconvertedRexNodes.length) assertExpressionArrayEquals( - Array(ExpressionParser.parseExpression("amount <= id")), + Array(ExpressionParser.INSTANCE.parseExpression("amount <= id")), Array(convertedExpressions(1))) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala index d801362..f389897 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala @@ -24,14 +24,13 @@ import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.{Table, TableEnvironment} -import org.apache.flink.table.expressions.ExpressionParser +import org.apache.flink.table.delegation.ExpressionParser import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.utils.TableTestUtil import _root_.java.util.UUID - import _root_.scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -222,7 +221,8 @@ object BatchTableEnvUtil { fieldNames: Option[Array[String]], fieldNullables: Option[Array[Boolean]], statistic: Option[FlinkStatistic]): Unit = { - val fields = fieldNames.map((f: Array[String]) => f.map(ExpressionParser.parseExpression)) + val fields = fieldNames.map((f: Array[String]) => + f.map(ExpressionParser.INSTANCE.parseExpression)) // for tests we know that this stream is definitely bounded ExecNodeUtil.makeLegacySourceTransformationsBounded(boundedStream.getTransformation) TableTestUtil.createTemporaryView( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala index b5bc5b8..c30a090 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/CollectionBatchExecTable.scala @@ -19,7 +19,8 @@ package org.apache.flink.table.planner.runtime.utils import org.apache.flink.api.scala._ import org.apache.flink.table.api.{Table, TableEnvironment} -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.delegation.ExpressionParser +import org.apache.flink.table.expressions.Expression import scala.collection.mutable import scala.util.Random @@ -284,7 +285,7 @@ object CollectionBatchExecTable { if (fields == null) { null } else { - ExpressionParser.parseExpressionList(fields).toArray(Array[Expression]()) + ExpressionParser.INSTANCE.parseExpressionList(fields).toArray(Array[Expression]()) } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala index 27dd611..1bebf6d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTableEnvUtil.scala @@ -21,7 +21,8 @@ package org.apache.flink.table.planner.runtime.utils import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.delegation.ExpressionParser +import org.apache.flink.table.expressions.Expression import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.utils.TableTestUtil @@ -44,7 +45,7 @@ object StreamTableEnvUtil { fieldNullables: Option[Array[Boolean]], statistic: Option[FlinkStatistic]): Unit = { val fields: Option[Array[Expression]] = fieldNames match { - case Some(names) => Some(names.map(ExpressionParser.parseExpression)) + case Some(names) => Some(names.map(ExpressionParser.INSTANCE.parseExpression)) case _ => None } TableTestUtil.createTemporaryView(tEnv, name, dataStream, fields, fieldNullables, statistic)