[ 
https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-22954:
------------------------------
    Description: 
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
consuming update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
"main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:92){code}
 

UDF code:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<word INT>"))
public class GenerateSeriesUdf extends TableFunction<Row> {

    public void eval(int from, int to) {
        for (int i = from; i < to; i++) {
            Row row = new Row(1);
            row.setField(0, i);
            collect(row);
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.INT());
    }
}

{code}
 

`JOIN` is ok,  `LEFT JOIN` has error. 

 

 
{code:java}
CREATE TABLE kafkaTableSource (
    name string,
    age int,
    sex string,
    address string,
    pt as PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'json'
);

CREATE TABLE kafkaTableSink (
    name string,
    sname string,
    sno string,
    sclass string,
    address string
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan2',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);
INSERT INTO kafkaTableSink
SELECT name, name, name, name, word
FROM kafkaTableSource
LEFT JOIN LATERAL TABLE(GENERATE_SERIES(1,5)) AS T(word) ON TRUE;
{code}
 

 
{code:java}
// UDF is constant , two inut
optimize result:
Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, 
name0, name1, name2, word])
+- Calc(select=[name, name AS name0, name AS name1, name AS name2, word])
   +- Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :- Exchange(distribution=[single])
      :  +- Calc(select=[name])
      :     +- TableSourceScan(table=[[default_catalog, default_database, 
kafkaTableSource]], fields=[name, age, sex, address])
      +- Exchange(distribution=[single])
         +- Correlate(invocation=[GENERATE_SERIES(1, 5)], 
correlate=[table(GENERATE_SERIES(1,5))], select=[word], 
rowType=[RecordType:peek_no_expand(INTEGER word)], joinType=[INNER])
            +- Values(tuples=[[{  }]])

// UDF is constant , one inut
optimize result:
 Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, 
name0, name1, name2, province])
+- Calc(select=[name, name AS name0, name AS name1, name AS name2, word AS 
province])
   +- Correlate(invocation=[JSON_TUPLE($cor0.address, _UTF-16LE'province')], 
correlate=[table(JSON_TUPLE($cor0.address,_UTF-16LE'province'))], 
select=[name,age,sex,address,pt,word], rowType=[RecordType(VARCHAR(2147483647) 
name, INTEGER age, VARCHAR(2147483647) sex, VARCHAR(2147483647) address, TIME 
ATTRIBUTE(PROCTIME) pt, VARCHAR(2147483647) word)], joinType=[LEFT])
      +- Calc(select=[name, age, sex, address, PROCTIME() AS pt])
         +- TableSourceScan(table=[[default_catalog, default_database, 
kafkaTableSource]], fields=[name, age, sex, address])
{code}
 

  was:
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
consuming update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
"main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
 at 
org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:92){code}
 

UDF code:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<word INT>"))
public class GenerateSeriesUdf extends TableFunction<Row> {

    public void eval(int from, int to) {
        for (int i = from; i < to; i++) {
            Row row = new Row(1);
            row.setField(0, i);
            collect(row);
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return Types.ROW(Types.INT());
    }
}

{code}
 

`JOIN` is ok,  `LEFT JOIN` has error. 

INSERT INTO kafkaTableSink

SELECT name, name, name, name, word

FROM kafkaTableSource

LEFT JOIN LATERAL TABLE(GENERATE_SERIES(1,5)) AS T(word) ON TRUE;

 
{code:java}
CREATE TABLE kafkaTableSource (
    name string,
    age int,
    sex string,
    address string,
    pt as PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.client.id' = 'test-consumer-group',
    'properties.group.id' = 'test-consumer-group',
    'format' = 'json'
);

CREATE TABLE kafkaTableSink (
    name string,
    sname string,
    sno string,
    sclass string,
    address string
) WITH (
    'connector' = 'kafka',
    'topic' = 'hehuiyuan2',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);
{code}
 

 


> Don't support consuming update and delete changes when use table function 
> that does not contain table field
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22954
>                 URL: https://issues.apache.org/jira/browse/FLINK-22954
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Major
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
> consuming update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
> "main" org.apache.flink.table.api.TableException: Table sink 
> 'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
> update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>  at 
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:92){code}
>  
> UDF code:
> {code:java}
> @FunctionHint(output = @DataTypeHint("ROW<word INT>"))
> public class GenerateSeriesUdf extends TableFunction<Row> {
>     public void eval(int from, int to) {
>         for (int i = from; i < to; i++) {
>             Row row = new Row(1);
>             row.setField(0, i);
>             collect(row);
>         }
>     }
>     @Override
>     public TypeInformation<Row> getResultType() {
>         return Types.ROW(Types.INT());
>     }
> }
> {code}
>  
> `JOIN` is ok,  `LEFT JOIN` has error. 
>  
>  
> {code:java}
> CREATE TABLE kafkaTableSource (
>     name string,
>     age int,
>     sex string,
>     address string,
>     pt as PROCTIME()
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'hehuiyuan1',
>     'scan.startup.mode' = 'latest-offset',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.client.id' = 'test-consumer-group',
>     'properties.group.id' = 'test-consumer-group',
>     'format' = 'json'
> );
> CREATE TABLE kafkaTableSink (
>     name string,
>     sname string,
>     sno string,
>     sclass string,
>     address string
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'hehuiyuan2',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'format' = 'json'
> );
> INSERT INTO kafkaTableSink
> SELECT name, name, name, name, word
> FROM kafkaTableSource
> LEFT JOIN LATERAL TABLE(GENERATE_SERIES(1,5)) AS T(word) ON TRUE;
> {code}
>  
>  
> {code:java}
> // UDF is constant , two inut
> optimize result:
> Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, 
> name0, name1, name2, word])
> +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word])
>    +- Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>       :- Exchange(distribution=[single])
>       :  +- Calc(select=[name])
>       :     +- TableSourceScan(table=[[default_catalog, default_database, 
> kafkaTableSource]], fields=[name, age, sex, address])
>       +- Exchange(distribution=[single])
>          +- Correlate(invocation=[GENERATE_SERIES(1, 5)], 
> correlate=[table(GENERATE_SERIES(1,5))], select=[word], 
> rowType=[RecordType:peek_no_expand(INTEGER word)], joinType=[INNER])
>             +- Values(tuples=[[{  }]])
> // UDF is constant , one inut
> optimize result:
>  Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, 
> name0, name1, name2, province])
> +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word AS 
> province])
>    +- Correlate(invocation=[JSON_TUPLE($cor0.address, _UTF-16LE'province')], 
> correlate=[table(JSON_TUPLE($cor0.address,_UTF-16LE'province'))], 
> select=[name,age,sex,address,pt,word], 
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) sex, VARCHAR(2147483647) address, TIME 
> ATTRIBUTE(PROCTIME) pt, VARCHAR(2147483647) word)], joinType=[LEFT])
>       +- Calc(select=[name, age, sex, address, PROCTIME() AS pt])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> kafkaTableSource]], fields=[name, age, sex, address])
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to