This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd8c74  [hotfix][table-api] Remove deprecated table function code
dcd8c74 is described below

commit dcd8c74b504046802cebf278b718e4753928a260
Author: sunjincheng121 <sunjincheng...@gmail.com>
AuthorDate: Wed Feb 20 20:09:46 2019 +0800

    [hotfix][table-api] Remove deprecated table function code
    
    This commit removes the table constructor (deprecated in
    FLINK-11447) for lateral table function joins and simplifies
    related code.
---
 .../flink/table/api/scala/expressionDsl.scala      |   8 --
 .../scala/org/apache/flink/table/api/table.scala   | 143 +++------------------
 .../expressions/PlannerExpressionConverter.scala   |   8 +-
 .../org/apache/flink/table/expressions/call.scala  |  65 +---------
 .../functions/utils/UserDefinedFunctionUtils.scala |  29 ++++-
 .../api/batch/table/TemporalTableJoinTest.scala    |  11 --
 .../stringexpr/CorrelateStringExpressionTest.scala |  62 ---------
 .../table/validation/CorrelateValidationTest.scala |   2 +-
 .../api/stream/table/TemporalTableJoinTest.scala   |  14 +-
 .../stringexpr/CorrelateStringExpressionTest.scala |  62 ---------
 .../table/validation/CorrelateValidationTest.scala |  89 +------------
 .../table/plan/TimeIndicatorConversionTest.scala   |   6 +-
 .../table/runtime/batch/table/JoinITCase.scala     |   2 +-
 13 files changed, 55 insertions(+), 446 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index ed0f4c1..23d54a5 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1071,14 +1071,6 @@ trait ImplicitExpressionConversions {
     }
   }
 
-  @deprecated("Please use Table.joinLateral() or Table.leftOuterJoinLateral() 
instead.", "1.8")
-  implicit def tableFunctionCall2Table(tfc: TableFunctionCall): Table = {
-    new Table(
-      tableEnv = null, // table environment will be set later.
-      tfc.toLogicalTableFunctionCall(child = null) // child will be set later.
-    )
-  }
-
   implicit def symbol2FieldExpression(sym: Symbol): Expression = 
UnresolvedFieldReference(sym.name)
   implicit def byte2Literal(b: Byte): Expression = Literal(b)
   implicit def short2Literal(s: Short): Expression = Literal(s)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
index e2f1161..44dac79 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
@@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, 
UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, 
ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, 
WindowProperty}
 import org.apache.flink.table.functions.TemporalTableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
@@ -65,44 +65,13 @@ class Table(
     private[flink] val tableEnv: TableEnvironment,
     private[flink] val logicalPlan: LogicalNode) {
 
-  // Check if the plan has an unbounded TableFunctionCall as child node.
-  //   A TableFunctionCall is tolerated as root node because the Table holds 
the initial call.
-  if (containsUnboundedUDTFCall(logicalPlan) &&
-    !logicalPlan.isInstanceOf[LogicalTableFunctionCall]) {
-    throw new ValidationException(
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral().")
-  }
-
-  /**
-    * Creates a [[Table]] for a TableFunction call from a String expression.
-    *
-    * @param tableEnv The TableEnvironment in which the call is created.
-    * @param tableFunctionCall A string expression of a table function call.
-    *
-    * @deprecated This constructor will be removed. Use table.joinLateral() or
-    *             table.leftOuterJoinLateral() instead.
-    */
-  @Deprecated
-  @deprecated(
-    "This constructor will be removed. Use table.joinLateral() or " +
-      "table.leftOuterJoinLateral() instead.",
-    "1.8")
-  def this(tableEnv: TableEnvironment, tableFunctionCall: String) {
-    this(tableEnv, UserDefinedFunctionUtils
-      .createLogicalFunctionCall(tableEnv, 
ExpressionParser.parseExpression(tableFunctionCall)))
-  }
-
   private lazy val tableSchema: TableSchema = new TableSchema(
     logicalPlan.output.map(_.name).toArray,
     logicalPlan.output.map(_.resultType).toArray)
 
   def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder
 
-  def getRelNode: RelNode = if (containsUnboundedUDTFCall(logicalPlan)) {
-    throw new ValidationException("Cannot translate a query with an unbounded 
table function call.")
-  } else {
-    logicalPlan.toRelNode(relBuilder)
-  }
+  def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
 
   /**
     * Returns the schema of this table.
@@ -246,34 +215,7 @@ class Table(
     * }}}
     */
   def as(fields: Expression*): Table = {
-
-    logicalPlan match {
-      case functionCall: LogicalTableFunctionCall if functionCall.child == 
null =>
-        // If the logical plan is a TableFunctionCall, we replace its field 
names to avoid special
-        //   cases during the validation.
-        if (fields.length != functionCall.output.length) {
-          throw new ValidationException(
-            "List of column aliases must have same degree as TableFunction's 
output")
-        }
-        if (!fields.forall(_.isInstanceOf[UnresolvedFieldReference])) {
-          throw new ValidationException(
-            "Alias field must be an instance of UnresolvedFieldReference"
-          )
-        }
-        new Table(
-          tableEnv,
-          LogicalTableFunctionCall(
-            functionCall.functionName,
-            functionCall.tableFunction,
-            functionCall.parameters,
-            functionCall.resultType,
-            fields.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray,
-            functionCall.child)
-        )
-      case _ =>
-        // prepend an AliasNode
-        new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
-    }
+    new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
   }
 
   /**
@@ -564,46 +506,16 @@ class Table(
   }
 
   private def join(right: Table, joinPredicate: Option[Expression], joinType: 
JoinType): Table = {
-
-    // check if we join with a table or a table function
-    if (!containsUnboundedUDTFCall(right.logicalPlan)) {
-      // regular table-table join
-
-      // check that the TableEnvironment of right table is not null
-      // and right table belongs to the same TableEnvironment
-      if (right.tableEnv != this.tableEnv) {
-        throw new ValidationException("Only tables from the same 
TableEnvironment can be joined.")
-      }
-
-      new Table(
-        tableEnv,
-        Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, 
correlated = false)
-          .validate(tableEnv))
-
-    } else {
-      // join with a table function
-
-      // check join type
-      if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
-        throw new ValidationException(
-          "TableFunctions are currently supported for join and leftOuterJoin.")
-      }
-
-      val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
-      val udtfCall = LogicalTableFunctionCall(
-        udtf.functionName,
-        udtf.tableFunction,
-        udtf.parameters,
-        udtf.resultType,
-        udtf.fieldNames,
-        this.logicalPlan
-      ).validate(tableEnv)
-
-      new Table(
-        tableEnv,
-        Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = 
true)
-          .validate(tableEnv))
+    // check that the TableEnvironment of right table is not null
+    // and right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be joined.")
     }
+
+    new Table(
+      tableEnv,
+      Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, 
correlated = false)
+        .validate(tableEnv))
   }
 
   /**
@@ -799,21 +711,16 @@ class Table(
         "Table functions are currently only supported for inner and left outer 
lateral joins.")
     }
 
-    val logicalCall = 
UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, callExpr)
-
-    val validatedLogicalCall = LogicalTableFunctionCall(
-      logicalCall.functionName,
-      logicalCall.tableFunction,
-      logicalCall.parameters,
-      logicalCall.resultType,
-      logicalCall.fieldNames,
-      this.logicalPlan
-    ).validate(tableEnv)
+    val logicalCall = UserDefinedFunctionUtils.createLogicalFunctionCall(
+      tableEnv,
+      callExpr,
+      logicalPlan)
+    val validatedLogicalCall = logicalCall.validate(tableEnv)
 
     new Table(
       tableEnv,
       Join(
-        this.logicalPlan,
+        logicalPlan,
         validatedLogicalCall,
         joinType,
         joinPredicate,
@@ -1221,20 +1128,6 @@ class Table(
     }
     tableName
   }
-
-  /**
-    * Checks if the plan represented by a [[LogicalNode]] contains an 
unbounded UDTF call.
-    * @param n the node to check
-    * @return true if the plan contains an unbounded UDTF call, false 
otherwise.
-    */
-  private def containsUnboundedUDTFCall(n: LogicalNode): Boolean = {
-    n match {
-      case functionCall: LogicalTableFunctionCall if functionCall.child == 
null => true
-      case u: UnaryNode => containsUnboundedUDTFCall(u.child)
-      case b: BinaryNode => containsUnboundedUDTFCall(b.left) || 
containsUnboundedUDTFCall(b.right)
-      case _: LeafNode => false
-    }
-  }
 }
 
 /**
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 628ef8c..2afc5f6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -66,13 +66,7 @@ class PlannerExpressionConverter private extends 
ExpressionVisitor[PlannerExpres
             val extraNames = args
               .drop(2)
               .map(e => 
e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String])
-            val plannerExpression = args.head
-            plannerExpression match {
-              case tfc: TableFunctionCall =>
-                tfc.setAliases(name +: extraNames)
-              case _ =>
-                Alias(plannerExpression, name, extraNames)
-            }
+            Alias(args.head, name, extraNames)
 
           case FLATTEN =>
             assert(args.size == 1)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index de423c9..3f0597f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -29,11 +29,10 @@ import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions._
-import org.apache.flink.table.plan.logical.{LogicalNode, 
LogicalTableFunctionCall}
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
 import _root_.scala.collection.JavaConverters._
 
@@ -316,68 +315,8 @@ case class TableFunctionCall(
     resultType: TypeInformation[_])
   extends PlannerExpression {
 
-  private var aliases: Option[Seq[String]] = None
-
   override private[flink] def children: Seq[Expression] = parameters
 
-  /**
-    * Assigns an alias for this table function's returned fields that the 
following operator
-    * can refer to.
-    *
-    * @param aliasList alias for this table function's returned fields
-    * @return this table function call
-    */
-  private[flink] def setAliases(aliasList: Seq[String]): TableFunctionCall = {
-    this.aliases = Some(aliasList)
-    this
-  }
-
-  /**
-    * Specifies the field names for a join with a table function.
-    *
-    * @param name name for one field
-    * @param extraNames additional names if the expression expands to multiple 
fields
-    * @return field with an alias
-    */
-  def as(name: Symbol, extraNames: Symbol*): TableFunctionCall = {
-    // NOTE: this method is only a temporary solution until we
-    // remove the deprecated table constructor. Otherwise Scala would be 
confused
-    // about Table.as() and Expression.as(). In the future, we can rely on 
Expression.as() only.
-    this.aliases = Some(name.name +: extraNames.map(_.name))
-    this
-  }
-
-  /**
-    * Converts an API class to a logical node for planning.
-    */
-  private[flink] def toLogicalTableFunctionCall(child: LogicalNode): 
LogicalTableFunctionCall = {
-    val originNames = getFieldInfo(resultType)._1
-
-    // determine the final field names
-    val fieldNames = if (aliases.isDefined) {
-      val aliasList = aliases.get
-      if (aliasList.length != originNames.length) {
-        throw new ValidationException(
-          s"List of column aliases must have same degree as table; " +
-            s"the returned table of function '$functionName' has 
${originNames.length} " +
-            s"columns (${originNames.mkString(",")}), " +
-            s"whereas alias list has ${aliasList.length} columns")
-      } else {
-        aliasList.toArray
-      }
-    } else {
-      originNames
-    }
-
-    LogicalTableFunctionCall(
-      functionName,
-      tableFunction,
-      parameters,
-      resultType,
-      fieldNames,
-      child)
-  }
-
   override def toString =
     s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 4711dc3..456fa96 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -771,11 +771,13 @@ object UserDefinedFunctionUtils {
     *
     * @param tableEnv The table environment to lookup the function.
     * @param callExpr an expression of a TableFunctionCall, such as "split(c)"
+    * @param logicalNode child logical node
     * @return A LogicalTableFunctionCall.
     */
   def createLogicalFunctionCall(
       tableEnv: TableEnvironment,
-      callExpr: Expression)
+      callExpr: Expression,
+      logicalNode: LogicalNode)
     : LogicalTableFunctionCall = {
 
     var alias: Option[Seq[String]] = None
@@ -797,10 +799,29 @@ object UserDefinedFunctionUtils {
 
     val tableFunctionCall = unwrap(callExpr)
 
-    // aliases defined in an expression have highest precedence
-    alias.foreach(a => tableFunctionCall.setAliases(a))
+    val originNames = getFieldInfo(tableFunctionCall.resultType)._1
+
+    // determine the final field names
+    val fieldNames = alias match {
+      case Some(aliasList) if aliasList.length != originNames.length =>
+        throw new ValidationException(
+          s"List of column aliases must have same degree as table; " +
+            s"the returned table of function 
'${tableFunctionCall.functionName}' has " +
+            s"${originNames.length} columns (${originNames.mkString(",")}), " +
+            s"whereas alias list has ${aliasList.length} columns")
+      case Some(aliasList) =>
+        aliasList.toArray
+      case _ =>
+        originNames
+    }
 
-    tableFunctionCall.toLogicalTableFunctionCall(child = null)
+    LogicalTableFunctionCall(
+      tableFunctionCall.functionName,
+      tableFunctionCall.tableFunction,
+      tableFunctionCall.parameters,
+      tableFunctionCall.resultType,
+      fieldNames,
+      logicalNode)
   }
 
   def getOperandTypeInfo(callBinding: SqlCallBinding): Seq[TypeInformation[_]] 
= {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala
index 032a0a2..04823ea 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/TemporalTableJoinTest.scala
@@ -65,15 +65,4 @@ class TemporalTableJoinTest extends TableTestBase {
 
     util.printTable(result)
   }
-
-  @Test
-  def testTemporalTableFunctionScan(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage(
-      "Cannot translate a query with an unbounded table function call.")
-
-    val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123"))
-
-    util.printTable(result)
-  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
index a7466ab..4d4abaa 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -30,68 +30,6 @@ import org.junit.Test
 class CorrelateStringExpressionTest extends TableTestBase {
 
   @Test
-  @deprecated("Test only verifies the deprecated table constructor.")
-  def testCorrelateJoins(): Unit = {
-    val util = batchTestUtil()
-
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): 
_*)
-    val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
-    val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
-
-    // test cross join
-    val func1 = new TableFunc1
-    util.javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
-    var javaTable = jTab.join(new Table(util.javaTableEnv, 
"func1(c).as(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), 
s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as 
(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    util.javaTableEnv.registerFunction("func2", func2)
-    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, 
len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 
'name, 'len, 'adult)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, 
adult, len)"))
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    util.javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, 
name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 
'len).filter('len > 2)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, 
len)"))
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func1(substring(c, 2)) as 
(s)")).select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-
-  @Test
   def testCorrelateJoinsWithJoinLateral(): Unit = {
     val util = batchTestUtil()
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala
index 5564731..ce22b0e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CorrelateValidationTest.scala
@@ -37,7 +37,7 @@ class CorrelateValidationTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
     val result = table
-      .leftOuterJoin(function('c) as 's, 'c === 's)
+      .leftOuterJoinLateral(function('c) as 's, 'c === 's)
       .select('c, 's)
     util.verifyTable(result, "")
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
index 8ae98ea..d98248d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -132,25 +132,15 @@ class TemporalTableJoinTest extends TableTestBase {
     expectedException.expectMessage(startsWith("Unsupported argument"))
 
     val result = orders
-      .join(rates(
+      .joinLateral(rates(
         java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")),
-        "o_currency = currency")
+        'o_currency === 'currency)
       .select("o_amount * rate")
 
     util.printTable(result)
   }
 
   @Test
-  def testTemporalTableFunctionScan(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage(
-      "Cannot translate a query with an unbounded table function call")
-
-    val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123"))
-    util.printTable(result)
-  }
-
-  @Test
   def testProcessingTimeIndicatorVersion(): Unit = {
     assertRatesFunction(proctimeRatesHistory.getSchema, proctimeRates, true)
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
index 9303963..3954a44 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -29,68 +29,6 @@ import org.junit.Test
 class CorrelateStringExpressionTest extends TableTestBase {
 
   @Test
-  @deprecated("Test only verifies the deprecated table constructor.")
-  def testCorrelateJoins(): Unit = {
-
-    val util = streamTestUtil()
-    val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): 
_*)
-    val jTab = util.addJavaTable[Row](typeInfo,"MyTab","a, b, c")
-
-    // test cross join
-    val func1 = new TableFunc1
-    util.javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
-    var javaTable = jTab.join(new Table(util.javaTableEnv, 
"func1(c).as(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, 
"func1(c)").as("s")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as 
(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    util.javaTableEnv.registerFunction("func2", func2)
-    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, 
len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = sTab.join(hierarchy('c) as ('name, 'adult, 'len)).select('c, 
'name, 'len, 'adult)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, 
adult, len)"))
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    util.javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, 
name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 
'len).filter('len > 2)
-    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, 
len)"))
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
-    javaTable = jTab.join(
-      new Table(util.javaTableEnv, "func1(substring(c, 2)) as 
(s)")).select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-
-  @Test
   def testCorrelateJoinsWithJoinLateral(): Unit = {
 
     val util = streamTestUtil()
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index a01ae78..dac2236 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.table.api.stream.table.validation
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils._
-import org.apache.flink.table.runtime.stream.table.TestAppendSink
 import org.apache.flink.table.utils.{ObjectTableFunction, TableFunc1, 
TableFunc2, TableTestBase}
 import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
@@ -44,90 +42,6 @@ class CorrelateValidationTest extends TableTestBase {
   }
 
   @Test
-  def testInvalidTableFunctions(): Unit = {
-    val util = streamTestUtil()
-
-    val func1 = new TableFunc1
-    util.javaTableEnv.registerFunction("func1", func1)
-    util.javaTableEnv.registerTableSink(
-      "testSink", new TestAppendSink().configure(
-        Array[String]("f"), Array[TypeInformation[_]](Types.INT)))
-
-    // table function call select
-    expectExceptionThrown(
-      func1('c).select("f0"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call select
-    expectExceptionThrown(
-      func1('c).select('f0),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call insertInto
-    expectExceptionThrown(
-      func1('c).insertInto("testSink"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call distinct
-    expectExceptionThrown(
-      func1('c).distinct(),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call filter
-    expectExceptionThrown(
-      func1('c).filter('f0 === "?"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call filter
-    expectExceptionThrown(
-      func1('c).filter("f0 = '?'"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call limit
-    expectExceptionThrown(
-      func1('c).orderBy('f0).offset(3),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call limit
-    expectExceptionThrown(
-      func1('c).orderBy('f0).fetch(3),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call orderBy
-    expectExceptionThrown(
-      func1('c).orderBy("f0"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call orderBy
-    expectExceptionThrown(
-      func1('c).orderBy('f0),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call where
-    expectExceptionThrown(
-      func1('c).where("f0 = '?'"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-    // table function call where
-    expectExceptionThrown(
-      func1('c).where('f0 === "?"),
-      "Table functions can only be used in table.joinLateral() and 
table.leftOuterJoinLateral()."
-    )
-
-  }
-
-  @Test
   def testInvalidTableFunction(): Unit = {
     val util = streamTestUtil()
     val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -190,7 +104,8 @@ class CorrelateValidationTest extends TableTestBase {
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func1", new TableFunc1)
 
-    val result = table.leftOuterJoin(function('c) as 's, 'c === 's).select('c, 
's).where('a > 10)
+    val result = table.leftOuterJoinLateral(function('c) as 's, 'c === 's)
+      .select('c, 's).where('a > 10)
 
     util.verifyTable(result, "")
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 095cd04..b7087d9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -407,7 +407,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val proctimeRates = 
proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency)
 
     val result = proctimeOrders
-      .join(proctimeRates('o_proctime), "currency = o_currency")
+      .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency)
       .select("o_amount * rate, currency, proctime").as("converted_amount")
       .window(Tumble over 1.second on 'proctime as 'w)
       .groupBy('w, 'currency)
@@ -444,7 +444,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val proctimeRates = 
proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency)
 
     val result = proctimeOrders
-      .join(proctimeRates('o_proctime), "currency = o_currency")
+      .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency)
       .select("o_amount * rate, currency, o_proctime").as("converted_amount")
       .window(Tumble over 1.second on 'o_proctime as 'w)
       .groupBy('w, 'currency)
@@ -481,7 +481,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val proctimeRates = 
proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency)
 
     val result = proctimeOrders
-      .join(proctimeRates('o_proctime), "currency = o_currency")
+      .joinLateral(proctimeRates('o_proctime), 'currency === 'o_currency)
       .select("o_amount * rate, currency, o_proctime, 
o_rowtime").as("converted_amount")
       .window(Tumble over 1.second on 'o_rowtime as 'w)
       .groupBy('w, 'currency)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index 6a5944d..dffe59f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -466,7 +466,7 @@ class JoinITCase(
     val ds1 = env.fromCollection(data).toTable(tEnv, 'a)
     val func2 = new TableFunc2
 
-    val joinDs = ds1.join(func2('a) as ('name, 'len))
+    val joinDs = ds1.joinLateral(func2('a) as ('name, 'len))
 
     val results = joinDs.toDataSet[Row].collect()
     val expected = Seq(

Reply via email to