[ https://issues.apache.org/jira/browse/FLINK-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kaibo.zhou reassigned FLINK-5827: --------------------------------- Assignee: kaibo.zhou > Exception when do filter after join a udtf which returns a POJO type > -------------------------------------------------------------------- > > Key: FLINK-5827 > URL: https://issues.apache.org/jira/browse/FLINK-5827 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: kaibo.zhou > Assignee: kaibo.zhou > > The test case: > {code:title=testFilterUdtfOutputPojo} > @Test > def testFilterUdtfOutputPojo(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val pojoFunc1 = new PojoTableFunc() > tEnv.registerFunction("pojo1", pojoFunc1) > val result = CollectionDataSets.getSmall3TupleDataSet(env) > .toTable(tEnv, 'a, 'b, 'c) > .join(pojoFunc1('c)) > .where(('age > 0) && ('name !== "")) > .select('a, 'b, 'c, 'age, 'name) > val results = result.toDataSet[Row].collect() > } > {code} > It will throw exception: > {code} > org.apache.flink.table.codegen.CodeGenException: No input mapping is > specified for input1 of type POJO. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.table.codegen.CodeGenerator.<init>(CodeGenerator.scala:79) > at > org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:191) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.generateCollector(DataSetCorrelate.scala:37) > at > org.apache.flink.table.plan.nodes.CommonCorrelate$class.correlateMapFunction(CommonCorrelate.scala:70) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.correlateMapFunction(DataSetCorrelate.scala:37) > at > org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.translateToPlan(DataSetCorrelate.scala:101) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:277) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:256) > at > org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) > at > org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40) > at > org.apache.flink.table.api.scala.stream.table.UserDefinedTableFunctionTest.testFilterUdtfOutputPojo(UserDefinedTableFunctionTest.scala:399) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)