Hi

We're using multi sink in sql with view, the TestCase is

"""java
  @Test
  def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
    createLookupTable("LookupTableAsync1", new AsyncTableFunction1)

    util.addTable(
      """
        |CREATE TEMPORARY VIEW v_vvv AS
        |SELECT * FROM MyTable AS T
        |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
        |ON T.a = D.id
        |""".stripMargin)

    val stmtSet = util.tableEnv.createStatementSet()

    val appendSink1 = util.createRetractTableSink(
      Array("a", "b","id","name"),
      Array(INT, STRING, INT, STRING))

util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
      "appendSink1", appendSink1)
    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT
a,b,id,name FROM v_vvv
                                                              |WHERE age =
10
                         """.stripMargin))

    stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
                                                              |SELECT
a,b,id,name FROM v_vvv
                                                              |WHERE age =
30

""".stripMargin))

//    util.verifyPlan(stmtSet)
    util.verifyExecPlan(stmtSet)
  }

class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a:
Integer): Unit = {
  }
}
"""

the optimized exec plan is

"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age =
10)], select=[a, b, id, name])
   +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age =
30)], select=[a, b, id, name])
   +- Reused(reference_id=[1])
"""

*I have 2 questions*
*1. The lookup table function execute twice, which is very expensive*
*2. the age filter is push down to LookupJoin with lookup=[age=10, id=a],
which result to function signature mismatch (exception follows blow)*

org.apache.flink.table.api.ValidationException: Could not find an
implementation method 'eval' in class
'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for
function 'default_catalog.default_database.LookupTableAsync1, source:
[TestInvalidTemporalTable(id, name, age, ts)]' that matches the following
signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer,
java.lang.Integer)

Is the optimizer wrong or I'm wrong ?

Cooper.Luan

Reply via email to