This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ef29f30 [FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case ef29f30 is described below commit ef29f305cd3d907d7c445c271b314ea643baaeeb Author: godfreyhe <godfre...@163.com> AuthorDate: Thu Jul 25 17:06:50 2019 +0800 [FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case This closes #9227. --- .../scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala | 2 +- .../main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala | 2 ++ .../org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala | 1 + .../nodes/datastream/DataStreamJoinToCoProcessTranslator.scala | 8 +++++--- .../flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala | 2 ++ .../org/apache/flink/table/runtime/join/WindowJoinUtil.scala | 4 ++-- 6 files changed, 13 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index bd2e8fa..923307c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -100,7 +100,7 @@ trait CommonCorrelate { |} |""".stripMargin } else if (joinType != JoinRelType.INNER) { - throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.") + throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.") } functionGenerator.generateFunction( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala index 3d98a4d..753ec40 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala @@ -45,6 +45,8 @@ trait CommonJoin { case JoinRelType.LEFT=> "LeftOuterJoin" case JoinRelType.RIGHT => "RightOuterJoin" case JoinRelType.FULL => "FullOuterJoin" + case JoinRelType.SEMI => "SemiJoin" + case JoinRelType.ANTI => "AntiJoin" } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala index 1df75e6..2e319f4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala @@ -199,6 +199,7 @@ class DataSetJoin( rightKeys.toArray, returnType, config) + case _ => throw new TableException(s"$joinType is not supported.") } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala index 846e452..8c418a3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala @@ -18,13 +18,11 @@ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} -import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.operators.TwoInputStreamOperator import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator -import org.apache.flink.table.api.{StreamQueryConfig, TableConfig} +import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, ValidationException} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.CRowKeySelector @@ -32,6 +30,9 @@ import org.apache.flink.table.runtime.join._ import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row +import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} +import org.apache.calcite.rex.{RexBuilder, RexNode} + class DataStreamJoinToCoProcessTranslator( config: TableConfig, returnType: TypeInformation[Row], @@ -145,6 +146,7 @@ class DataStreamJoinToCoProcessTranslator( genFunction.name, genFunction.code, queryConfig) + case _ => throw new ValidationException(s"$joinType is not supported.") } new LegacyKeyedCoProcessOperator(joinFunction) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala index 8c06265..0b34df9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala @@ -150,6 +150,7 @@ class DataStreamWindowJoin( case JoinRelType.FULL => JoinType.FULL_OUTER case JoinRelType.LEFT => JoinType.LEFT_OUTER case JoinRelType.RIGHT => JoinType.RIGHT_OUTER + case _ => throw new TableException(s"$joinType is not supported.") } if (relativeWindowSize < 0) { @@ -232,6 +233,7 @@ class DataStreamWindowJoin( case JoinType.FULL_OUTER => leftDataStream.map(leftPadder).name("Full Outer Join").setParallelism(leftP) .union(rightDataStream.map(rightPadder).name("Full Outer Join").setParallelism(rightP)) + case _ => throw new TableException(s"$joinType is not supported.") } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 3e355e8..84d45f1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.join import java.util - import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType @@ -27,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.{SqlKind, SqlOperatorTable} import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter} import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.functions.sql.ProctimeSqlFunction @@ -444,6 +443,7 @@ object WindowJoinUtil { case JoinRelType.LEFT => true case JoinRelType.RIGHT => true case JoinRelType.FULL => true + case _ => throw new TableException(s"$joinType is not supported.") } // generate other non-equi function code