[ 
https://issues.apache.org/jira/browse/FLINK-20314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bugboy updated FLINK-20314:
---------------------------
    Description: 
My DDL:
{code:java}
create table if not exists t_order(
id int PRIMARY KEY comment '订单id',
timestamps bigint comment '订单创建时间',
orderInformationId string comment '订单信息ID',
userId string comment '用户ID',
categoryId int comment '商品类别',
productId int comment '商品ID',
price decimal(10,2) comment '单价',
productCount int comment '购买数量',
priceSum decimal(10,2) comment '订单总价',
shipAddress string comment '商家地址',
receiverAddress string comment '收货地址',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
)with(
'connector' = 'kafka',
'format' = 'debezium-avro-confluent', 
'debezium-avro-confluent.schema-registry.url' = 'http://hostname:8081',
'topic' = 'ods.userAnalysis.order',
'properties.bootstrap.servers' = 'hostname:9092',
'properties.group.id' = 'flink-analysis',
'scan.startup.mode' = 'latest-offset'
)

{code}
 

query is ok  when using the following SQLs:
{code:java}
select * from t_order{code}
{code:java}
select receiverAddress from t_order{code}
{code:java}
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress
from t_order{code}
but when I add the *receiveraddress* field to the third sql like:
{code:java}
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress,
receiverAddress
from t_order
{code}
it throws an exception:
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: This calc 
has no useful projection and no filter. It should be removed by 
CalcRemoveRule.Exception in thread "main" 
org.apache.flink.table.api.TableException: This calc has no useful projection 
and no filter. It should be removed by CalcRemoveRule. at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike.map(TraversableLike.scala:233) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
 at com.bugboy.analysis.AnalysisCase$.main(AnalysisCase.scala:161) at 
com.bugboy.analysis.AnalysisCase.main(AnalysisCase.scala){code}
 

 
 
 

  was:
DDL语句:
{code:java}

create table if not exists t_order(
id int PRIMARY KEY comment '订单id',
timestamps bigint comment '订单创建时间',
orderInformationId string comment '订单信息ID',
userId string comment '用户ID',
categoryId int comment '商品类别',
productId int comment '商品ID',
price decimal(10,2) comment '单价',
productCount int comment '购买数量',
priceSum decimal(10,2) comment '订单总价',
shipAddress string comment '商家地址',
receiverAddress string comment '收货地址',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
)with(
'connector' = 'kafka',
'format' = 'debezium-avro-confluent', 
'debezium-avro-confluent.schema-registry.url' = 'http://hostname:8081',
'topic' = 'ods.userAnalysis.order',
'properties.bootstrap.servers' = 'hostname:9092',
'properties.group.id' = 'flink-analysis',
'scan.startup.mode' = 'latest-offset'
)

{code}
 

使用以下SQL查询时可以正常查询
{code:java}
select * from t_order{code}
{code:java}
select receiverAddress from t_order{code}
{code:java}
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress
from t_order{code}

但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
{code:java}
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress,
receiverAddress
from t_order
{code}
报错信息如下:
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: This calc 
has no useful projection and no filter. It should be removed by 
CalcRemoveRule.Exception in thread "main" 
org.apache.flink.table.api.TableException: This calc has no useful projection 
and no filter. It should be removed by CalcRemoveRule. at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike.map(TraversableLike.scala:233) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
 at com.bugboy.analysis.AnalysisCase$.main(AnalysisCase.scala:161) at 
com.bugboy.analysis.AnalysisCase.main(AnalysisCase.scala){code}
 

 


> Empty Calc is not removed by CalcRemoveRule
> -------------------------------------------
>
>                 Key: FLINK-20314
>                 URL: https://issues.apache.org/jira/browse/FLINK-20314
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: bugboy
>            Priority: Major
>
> My DDL:
> {code:java}
> create table if not exists t_order(
> id int PRIMARY KEY comment '订单id',
> timestamps bigint comment '订单创建时间',
> orderInformationId string comment '订单信息ID',
> userId string comment '用户ID',
> categoryId int comment '商品类别',
> productId int comment '商品ID',
> price decimal(10,2) comment '单价',
> productCount int comment '购买数量',
> priceSum decimal(10,2) comment '订单总价',
> shipAddress string comment '商家地址',
> receiverAddress string comment '收货地址',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> )with(
> 'connector' = 'kafka',
> 'format' = 'debezium-avro-confluent', 
> 'debezium-avro-confluent.schema-registry.url' = 'http://hostname:8081',
> 'topic' = 'ods.userAnalysis.order',
> 'properties.bootstrap.servers' = 'hostname:9092',
> 'properties.group.id' = 'flink-analysis',
> 'scan.startup.mode' = 'latest-offset'
> )
> {code}
>  
> query is ok  when using the following SQLs:
> {code:java}
> select * from t_order{code}
> {code:java}
> select receiverAddress from t_order{code}
> {code:java}
> select
> id,
> timestamps,
> orderInformationId,
> userId,
> categoryId,
> productId,
> price,
> productCount,
> priceSum,
> shipAddress
> from t_order{code}
> but when I add the *receiveraddress* field to the third sql like:
> {code:java}
> select
> id,
> timestamps,
> orderInformationId,
> userId,
> categoryId,
> productId,
> price,
> productCount,
> priceSum,
> shipAddress,
> receiverAddress
> from t_order
> {code}
> it throws an exception:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: This 
> calc has no useful projection and no filter. It should be removed by 
> CalcRemoveRule.Exception in thread "main" 
> org.apache.flink.table.api.TableException: This calc has no useful projection 
> and no filter. It should be removed by CalcRemoveRule. at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
>  at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>  at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at 
> scala.collection.Iterator.foreach(Iterator.scala:937) at 
> scala.collection.Iterator.foreach$(Iterator.scala:937) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
>  at com.bugboy.analysis.AnalysisCase$.main(AnalysisCase.scala:161) at 
> com.bugboy.analysis.AnalysisCase.main(AnalysisCase.scala){code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to