This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dcd8c74 [hotfix][table-api] Remove deprecated table function code dcd8c74 is described below commit dcd8c74b504046802cebf278b718e4753928a260 Author: sunjincheng121 <sunjincheng...@gmail.com> AuthorDate: Wed Feb 20 20:09:46 2019 +0800 [hotfix][table-api] Remove deprecated table function code This commit removes the table constructor (deprecated in FLINK-11447) for lateral table function joins and simplifies related code. --- .../flink/table/api/scala/expressionDsl.scala | 8 -- .../scala/org/apache/flink/table/api/table.scala | 143 +++------------------ .../expressions/PlannerExpressionConverter.scala | 8 +- .../org/apache/flink/table/expressions/call.scala | 65 +--------- .../functions/utils/UserDefinedFunctionUtils.scala | 29 ++++- .../api/batch/table/TemporalTableJoinTest.scala | 11 -- .../stringexpr/CorrelateStringExpressionTest.scala | 62 --------- .../table/validation/CorrelateValidationTest.scala | 2 +- .../api/stream/table/TemporalTableJoinTest.scala | 14 +- .../stringexpr/CorrelateStringExpressionTest.scala | 62 --------- .../table/validation/CorrelateValidationTest.scala | 89 +------------ .../table/plan/TimeIndicatorConversionTest.scala | 6 +- .../table/runtime/batch/table/JoinITCase.scala | 2 +- 13 files changed, 55 insertions(+), 446 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index ed0f4c1..23d54a5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -1071,14 +1071,6 @@ trait ImplicitExpressionConversions { } } - @deprecated("Please use Table.joinLateral() or Table.leftOuterJoinLateral() instead.", "1.8") - implicit def tableFunctionCall2Table(tfc: TableFunctionCall): Table = { - new Table( - tableEnv = null, // table environment will be set later. - tfc.toLogicalTableFunctionCall(child = null) // child will be set later. - ) - } - implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) implicit def byte2Literal(b: Byte): Expression = Literal(b) implicit def short2Literal(s: Short): Expression = Literal(s) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala index e2f1161..44dac79 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty} +import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, WindowProperty} import org.apache.flink.table.functions.TemporalTableFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.plan.ProjectionTranslator._ @@ -65,44 +65,13 @@ class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { - // Check if the plan has an unbounded TableFunctionCall as child node. - // A TableFunctionCall is tolerated as root node because the Table holds the initial call. - if (containsUnboundedUDTFCall(logicalPlan) && - !logicalPlan.isInstanceOf[LogicalTableFunctionCall]) { - throw new ValidationException( - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral().") - } - - /** - * Creates a [[Table]] for a TableFunction call from a String expression. - * - * @param tableEnv The TableEnvironment in which the call is created. - * @param tableFunctionCall A string expression of a table function call. - * - * @deprecated This constructor will be removed. Use table.joinLateral() or - * table.leftOuterJoinLateral() instead. - */ - @Deprecated - @deprecated( - "This constructor will be removed. Use table.joinLateral() or " + - "table.leftOuterJoinLateral() instead.", - "1.8") - def this(tableEnv: TableEnvironment, tableFunctionCall: String) { - this(tableEnv, UserDefinedFunctionUtils - .createLogicalFunctionCall(tableEnv, ExpressionParser.parseExpression(tableFunctionCall))) - } - private lazy val tableSchema: TableSchema = new TableSchema( logicalPlan.output.map(_.name).toArray, logicalPlan.output.map(_.resultType).toArray) def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder - def getRelNode: RelNode = if (containsUnboundedUDTFCall(logicalPlan)) { - throw new ValidationException("Cannot translate a query with an unbounded table function call.") - } else { - logicalPlan.toRelNode(relBuilder) - } + def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) /** * Returns the schema of this table. @@ -246,34 +215,7 @@ class Table( * }}} */ def as(fields: Expression*): Table = { - - logicalPlan match { - case functionCall: LogicalTableFunctionCall if functionCall.child == null => - // If the logical plan is a TableFunctionCall, we replace its field names to avoid special - // cases during the validation. - if (fields.length != functionCall.output.length) { - throw new ValidationException( - "List of column aliases must have same degree as TableFunction's output") - } - if (!fields.forall(_.isInstanceOf[UnresolvedFieldReference])) { - throw new ValidationException( - "Alias field must be an instance of UnresolvedFieldReference" - ) - } - new Table( - tableEnv, - LogicalTableFunctionCall( - functionCall.functionName, - functionCall.tableFunction, - functionCall.parameters, - functionCall.resultType, - fields.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray, - functionCall.child) - ) - case _ => - // prepend an AliasNode - new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv)) - } + new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv)) } /** @@ -564,46 +506,16 @@ class Table( } private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { - - // check if we join with a table or a table function - if (!containsUnboundedUDTFCall(right.logicalPlan)) { - // regular table-table join - - // check that the TableEnvironment of right table is not null - // and right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be joined.") - } - - new Table( - tableEnv, - Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false) - .validate(tableEnv)) - - } else { - // join with a table function - - // check join type - if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) { - throw new ValidationException( - "TableFunctions are currently supported for join and leftOuterJoin.") - } - - val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall] - val udtfCall = LogicalTableFunctionCall( - udtf.functionName, - udtf.tableFunction, - udtf.parameters, - udtf.resultType, - udtf.fieldNames, - this.logicalPlan - ).validate(tableEnv) - - new Table( - tableEnv, - Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true) - .validate(tableEnv)) + // check that the TableEnvironment of right table is not null + // and right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + + new Table( + tableEnv, + Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false) + .validate(tableEnv)) } /** @@ -799,21 +711,16 @@ class Table( "Table functions are currently only supported for inner and left outer lateral joins.") } - val logicalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, callExpr) - - val validatedLogicalCall = LogicalTableFunctionCall( - logicalCall.functionName, - logicalCall.tableFunction, - logicalCall.parameters, - logicalCall.resultType, - logicalCall.fieldNames, - this.logicalPlan - ).validate(tableEnv) + val logicalCall = UserDefinedFunctionUtils.createLogicalFunctionCall( + tableEnv, + callExpr, + logicalPlan) + val validatedLogicalCall = logicalCall.validate(tableEnv) new Table( tableEnv, Join( - this.logicalPlan, + logicalPlan, validatedLogicalCall, joinType, joinPredicate, @@ -1221,20 +1128,6 @@ class Table( } tableName } - - /** - * Checks if the plan represented by a [[LogicalNode]] contains an unbounded UDTF call. - * @param n the node to check - * @return true if the plan contains an unbounded UDTF call, false otherwise. - */ - private def containsUnboundedUDTFCall(n: LogicalNode): Boolean = { - n match { - case functionCall: LogicalTableFunctionCall if functionCall.child == null => true - case u: UnaryNode => containsUnboundedUDTFCall(u.child) - case b: BinaryNode => containsUnboundedUDTFCall(b.left) || containsUnboundedUDTFCall(b.right) - case _: LeafNode => false - } - } } /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala index 628ef8c..2afc5f6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala @@ -66,13 +66,7 @@ class PlannerExpressionConverter private extends ExpressionVisitor[PlannerExpres val extraNames = args .drop(2) .map(e => e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String]) - val plannerExpression = args.head - plannerExpression match { - case tfc: TableFunctionCall => - tfc.setAliases(name +: extraNames) - case _ => - Alias(plannerExpression, name, extraNames) - } + Alias(args.head, name, extraNames) case FLATTEN => assert(args.size == 1) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala index de423c9..3f0597f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -29,11 +29,10 @@ import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api._ import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions._ -import org.apache.flink.table.plan.logical.{LogicalNode, LogicalTableFunctionCall} -import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} import _root_.scala.collection.JavaConverters._ @@ -316,68 +315,8 @@ case class TableFunctionCall( resultType: TypeInformation[_]) extends PlannerExpression { - private var aliases: Option[Seq[String]] = None - override private[flink] def children: Seq[Expression] = parameters - /** - * Assigns an alias for this table function's returned fields that the following operator - * can refer to. - * - * @param aliasList alias for this table function's returned fields - * @return this table function call - */ - private[flink] def setAliases(aliasList: Seq[String]): TableFunctionCall = { - this.aliases = Some(aliasList) - this - } - - /** - * Specifies the field names for a join with a table function. - * - * @param name name for one field - * @param extraNames additional names if the expression expands to multiple fields - * @return field with an alias - */ - def as(name: Symbol, extraNames: Symbol*): TableFunctionCall = { - // NOTE: this method is only a temporary solution until we - // remove the deprecated table constructor. Otherwise Scala would be confused - // about Table.as() and Expression.as(). In the future, we can rely on Expression.as() only. - this.aliases = Some(name.name +: extraNames.map(_.name)) - this - } - - /** - * Converts an API class to a logical node for planning. - */ - private[flink] def toLogicalTableFunctionCall(child: LogicalNode): LogicalTableFunctionCall = { - val originNames = getFieldInfo(resultType)._1 - - // determine the final field names - val fieldNames = if (aliases.isDefined) { - val aliasList = aliases.get - if (aliasList.length != originNames.length) { - throw new ValidationException( - s"List of column aliases must have same degree as table; " + - s"the returned table of function '$functionName' has ${originNames.length} " + - s"columns (${originNames.mkString(",")}), " + - s"whereas alias list has ${aliasList.length} columns") - } else { - aliasList.toArray - } - } else { - originNames - } - - LogicalTableFunctionCall( - functionName, - tableFunction, - parameters, - resultType, - fieldNames, - child) - } - override def toString = s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})" } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index 4711dc3..456fa96 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -771,11 +771,13 @@ object UserDefinedFunctionUtils { * * @param tableEnv The table environment to lookup the function. * @param callExpr an expression of a TableFunctionCall, such as "split(c)" + * @param logicalNode child logical node * @return A LogicalTableFunctionCall. */ def createLogicalFunctionCall( tableEnv: TableEnvironment, - callExpr: Expression) + callExpr: Expression, + logicalNode: LogicalNode) : LogicalTableFunctionCall = { var alias: Option[Seq[String]] = None @@ -797,10 +799,29 @@ object UserDefinedFunctionUtils { val tableFunctionCall = unwrap(callExpr) - // aliases defined in an expression have highest precedence - alias.foreach(a => tableFunctionCall.setAliases(a)) + val originNames = getFieldInfo(tableFunctionCall.resultType)._1 + + // determine the final field names + val fieldNames = alias match { + case Some(aliasList) if aliasList.length != originNames.length => + throw new ValidationException( + s"List of column aliases must have same degree as table; " + + s"the returned table of function '${tableFunctionCall.functionName}' has " + + s"${originNames.length} columns (${originNames.mkString(",")}), " + + s"whereas alias list has ${aliasList.length} columns") + case Some(aliasList) => + aliasList.toArray + case _ => + originNames + } - tableFunctionCall.toLogicalTableFunctionCall(child = null) + LogicalTableFunctionCall( + tableFunctionCall.functionName, + tableFunctionCall.tableFunction, + tableFunctionCall.parameters, + tableFunctionCall.resultType, + fieldNames, + logicalNode) } def getOperandTypeInfo(callBinding: SqlCallBinding): Seq[TypeInformation[_]] = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala index 032a0a2..04823ea 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala @@ -65,15 +65,4 @@ class TemporalTableJoinTest extends TableTestBase { util.printTable(result) } - - @Test - def testTemporalTableFunctionScan(): Unit = { - expectedException.expect(classOf[ValidationException]) - expectedException.expectMessage( - "Cannot translate a query with an unbounded table function call.") - - val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")) - - util.printTable(result) - } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala index a7466ab..4d4abaa 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala @@ -30,68 +30,6 @@ import org.junit.Test class CorrelateStringExpressionTest extends TableTestBase { @Test - @deprecated("Test only verifies the deprecated table constructor.") - def testCorrelateJoins(): Unit = { - val util = batchTestUtil() - - val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) - val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c) - val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c") - - // test cross join - val func1 = new TableFunc1 - util.javaTableEnv.registerFunction("func1", func1) - var scalaTable = sTab.join(func1('c) as 's).select('c, 's) - var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test left outer join - scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's) - javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test overloading - scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's) - javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test custom result type - val func2 = new TableFunc2 - util.javaTableEnv.registerFunction("func2", func2) - scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len) - javaTable = jTab.join( - new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len") - verifyTableEquals(scalaTable, javaTable) - - // test hierarchy generic type - val hierarchy = new HierarchyTableFunction - util.javaTableEnv.registerFunction("hierarchy", hierarchy) - scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult) - javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)")) - .select("c, name, len, adult") - verifyTableEquals(scalaTable, javaTable) - - // test pojo type - val pojo = new PojoTableFunc - util.javaTableEnv.registerFunction("pojo", pojo) - scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age) - javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age") - verifyTableEquals(scalaTable, javaTable) - - // test with filter - scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2) - javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)")) - .select("c, name, len").filter("len > 2") - verifyTableEquals(scalaTable, javaTable) - - // test with scalar function - scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's) - javaTable = jTab.join( - new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s") - verifyTableEquals(scalaTable, javaTable) - } - - @Test def testCorrelateJoinsWithJoinLateral(): Unit = { val util = batchTestUtil() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala index 5564731..ce22b0e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala @@ -37,7 +37,7 @@ class CorrelateValidationTest extends TableTestBase { val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = util.addFunction("func1", new TableFunc1) val result = table - .leftOuterJoin(function('c) as 's, 'c === 's) + .leftOuterJoinLateral(function('c) as 's, 'c === 's) .select('c, 's) util.verifyTable(result, "") } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala index 8ae98ea..d98248d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala @@ -132,25 +132,15 @@ class TemporalTableJoinTest extends TableTestBase { expectedException.expectMessage(startsWith("Unsupported argument")) val result = orders - .join(rates( + .joinLateral(rates( java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")), - "o_currency = currency") + 'o_currency === 'currency) .select("o_amount * rate") util.printTable(result) } @Test - def testTemporalTableFunctionScan(): Unit = { - expectedException.expect(classOf[ValidationException]) - expectedException.expectMessage( - "Cannot translate a query with an unbounded table function call") - - val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")) - util.printTable(result) - } - - @Test def testProcessingTimeIndicatorVersion(): Unit = { assertRatesFunction(proctimeRatesHistory.getSchema, proctimeRates, true) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala index 9303963..3954a44 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala @@ -29,68 +29,6 @@ import org.junit.Test class CorrelateStringExpressionTest extends TableTestBase { @Test - @deprecated("Test only verifies the deprecated table constructor.") - def testCorrelateJoins(): Unit = { - - val util = streamTestUtil() - val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c) - val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) - val jTab = util.addJavaTable[Row](typeInfo,"MyTab","a, b, c") - - // test cross join - val func1 = new TableFunc1 - util.javaTableEnv.registerFunction("func1", func1) - var scalaTable = sTab.join(func1('c) as 's).select('c, 's) - var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test left outer join - scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's) - javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "func1(c)").as("s")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test overloading - scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's) - javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s") - verifyTableEquals(scalaTable, javaTable) - - // test custom result type - val func2 = new TableFunc2 - util.javaTableEnv.registerFunction("func2", func2) - scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len) - javaTable = jTab.join( - new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len") - verifyTableEquals(scalaTable, javaTable) - - // test hierarchy generic type - val hierarchy = new HierarchyTableFunction - util.javaTableEnv.registerFunction("hierarchy", hierarchy) - scalaTable = sTab.join(hierarchy('c) as ('name, 'adult, 'len)).select('c, 'name, 'len, 'adult) - javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)")) - .select("c, name, len, adult") - verifyTableEquals(scalaTable, javaTable) - - // test pojo type - val pojo = new PojoTableFunc - util.javaTableEnv.registerFunction("pojo", pojo) - scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age) - javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age") - verifyTableEquals(scalaTable, javaTable) - - // test with filter - scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len).filter('len > 2) - javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)")) - .select("c, name, len").filter("len > 2") - verifyTableEquals(scalaTable, javaTable) - - // test with scalar function - scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's) - javaTable = jTab.join( - new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s") - verifyTableEquals(scalaTable, javaTable) - } - - @Test def testCorrelateJoinsWithJoinLateral(): Unit = { val util = streamTestUtil() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala index a01ae78..dac2236 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala @@ -17,12 +17,10 @@ */ package org.apache.flink.table.api.stream.table.validation -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils._ -import org.apache.flink.table.runtime.stream.table.TestAppendSink import org.apache.flink.table.utils.{ObjectTableFunction, TableFunc1, TableFunc2, TableTestBase} import org.junit.Assert.{assertTrue, fail} import org.junit.Test @@ -44,90 +42,6 @@ class CorrelateValidationTest extends TableTestBase { } @Test - def testInvalidTableFunctions(): Unit = { - val util = streamTestUtil() - - val func1 = new TableFunc1 - util.javaTableEnv.registerFunction("func1", func1) - util.javaTableEnv.registerTableSink( - "testSink", new TestAppendSink().configure( - Array[String]("f"), Array[TypeInformation[_]](Types.INT))) - - // table function call select - expectExceptionThrown( - func1('c).select("f0"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call select - expectExceptionThrown( - func1('c).select('f0), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call insertInto - expectExceptionThrown( - func1('c).insertInto("testSink"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call distinct - expectExceptionThrown( - func1('c).distinct(), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call filter - expectExceptionThrown( - func1('c).filter('f0 === "?"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call filter - expectExceptionThrown( - func1('c).filter("f0 = '?'"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call limit - expectExceptionThrown( - func1('c).orderBy('f0).offset(3), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call limit - expectExceptionThrown( - func1('c).orderBy('f0).fetch(3), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call orderBy - expectExceptionThrown( - func1('c).orderBy("f0"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call orderBy - expectExceptionThrown( - func1('c).orderBy('f0), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call where - expectExceptionThrown( - func1('c).where("f0 = '?'"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - // table function call where - expectExceptionThrown( - func1('c).where('f0 === "?"), - "Table functions can only be used in table.joinLateral() and table.leftOuterJoinLateral()." - ) - - } - - @Test def testInvalidTableFunction(): Unit = { val util = streamTestUtil() val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -190,7 +104,8 @@ class CorrelateValidationTest extends TableTestBase { val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = util.addFunction("func1", new TableFunc1) - val result = table.leftOuterJoin(function('c) as 's, 'c === 's).select('c, 's).where('a > 10) + val result = table.leftOuterJoinLateral(function('c) as 's, 'c === 's) + .select('c, 's).where('a > 10) util.verifyTable(result, "") } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 095cd04..b7087d9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -407,7 +407,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val proctimeRates = proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency) val result = proctimeOrders - .join(proctimeRates('o_proctime), "currency = o_currency") + .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency) .select("o_amount * rate, currency, proctime").as("converted_amount") .window(Tumble over 1.second on 'proctime as 'w) .groupBy('w, 'currency) @@ -444,7 +444,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val proctimeRates = proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency) val result = proctimeOrders - .join(proctimeRates('o_proctime), "currency = o_currency") + .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency) .select("o_amount * rate, currency, o_proctime").as("converted_amount") .window(Tumble over 1.second on 'o_proctime as 'w) .groupBy('w, 'currency) @@ -481,7 +481,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val proctimeRates = proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency) val result = proctimeOrders - .join(proctimeRates('o_proctime), "currency = o_currency") + .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency) .select("o_amount * rate, currency, o_proctime, o_rowtime").as("converted_amount") .window(Tumble over 1.second on 'o_rowtime as 'w) .groupBy('w, 'currency) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala index 6a5944d..dffe59f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala @@ -466,7 +466,7 @@ class JoinITCase( val ds1 = env.fromCollection(data).toTable(tEnv, 'a) val func2 = new TableFunc2 - val joinDs = ds1.join(func2('a) as ('name, 'len)) + val joinDs = ds1.joinLateral(func2('a) as ('name, 'len)) val results = joinDs.toDataSet[Row].collect() val expected = Seq(