[ 
https://issues.apache.org/jira/browse/FLINK-38366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmad Humayun updated FLINK-38366:
----------------------------------
    Description: 
The following is a minimized program that causes an exception in the planner
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, Table
from pyflink.common import Configuration
from pyflink.table.expressions import col
 
cfg = Configuration()

# The program runs fine if I set any default limit 
# cfg.set_string("table.exec.sort.default-limit", "10000")
 
settings = (
   EnvironmentSettings.new_instance()
   .in_batch_mode()
   .with_configuration(cfg)
   .build()
)
 
table_env = TableEnvironment.create(settings)
 
data = [
   ("Los Angeles", 9.50, "Premium", 1),
   ("Houston", 8.25, "Basic", 2),
   ("New York", 10.00, "Standard", 1),
   ("Chicago", 8.25, "Premium", 1)
]
 
source_table = table_env.from_elements(
   data,
   schema=["cc_city", "cc_tax_percentage", "cc_mkt_class", "extra"]
)
 
ordered = source_table.order_by(col('cc_city'))
result = 
ordered.group_by(col('cc_tax_percentage')).select(col('cc_mkt_class').min.alias('cc_mkt_class'))
 
print(result.to_pandas())
{code}
 

 

The following is the exception thrown:

 

 
{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query:
 
FlinkLogicalCalc(select=[EXPR$0 AS cc_mkt_class])
+- FlinkLogicalAggregate(group=[\{1}], EXPR$0=[MIN($2)])
  +- FlinkLogicalSort(sort0=[$0], dir0=[ASC])
     +- 
FlinkLogicalTableSourceScan(table=[[*anonymous_python-input-format$1*]], 
fields=[cc_city, cc_tax_percentage, cc_mkt_class, dummy])
 
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
   at scala.collection.Iterator.foreach(Iterator.scala:943)
   at scala.collection.Iterator.foreach$(Iterator.scala:943)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:431)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
   at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
   at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
   at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:569)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversions are FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]false, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]true, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
sort: [0] -> [1 ASC-nulls-first]]
There are 3 empty subsets:
Empty subset 0: rel#240:RelSubset#5.BATCH_PHYSICAL.hash[1]false.[1 
ASC-nulls-first], the relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
 178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])
 
Empty subset 1: rel#215:RelSubset#5.BATCH_PHYSICAL.any.[1 ASC-nulls-first], the 
relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
 178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])
... several more lines 
{code}
I don't understand why a plan cannot be generated for this program. 
Furthermore, this program will run fine if I apply min on the "extra" column 
instead of "cc_mkt_class". Which means its just an issue when dealing with 
performing a min on a string column.

If this is not a bug, can someone explain this behaviour?

  was:
The following is a minimized program that causes an exception in the planner
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, Table
from pyflink.common import Configuration
from pyflink.table.expressions import col
 
cfg = Configuration()

The program runs fine if I set any default limit 
cfg.set_string("table.exec.sort.default-limit", "10000")
 
settings = (
   EnvironmentSettings.new_instance()
   .in_batch_mode()
   .with_configuration(cfg)
   .build()
)
 
table_env = TableEnvironment.create(settings)
 
data = [
   ("Los Angeles", 9.50, "Premium", 1),
   ("Houston", 8.25, "Basic", 2),
   ("New York", 10.00, "Standard", 1),
   ("Chicago", 8.25, "Premium", 1)
]
 
source_table = table_env.from_elements(
   data,
   schema=["cc_city", "cc_tax_percentage", "cc_mkt_class", "extra"]
)
 
ordered = source_table.order_by(col('cc_city'))
result = 
ordered.group_by(col('cc_tax_percentage')).select(col('cc_mkt_class').min.alias('cc_mkt_class'))
 
print(result.to_pandas())
{code}
 

 

The following is the exception thrown:

 

 
{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query:
 
FlinkLogicalCalc(select=[EXPR$0 AS cc_mkt_class])
+- FlinkLogicalAggregate(group=[\{1}], EXPR$0=[MIN($2)])
  +- FlinkLogicalSort(sort0=[$0], dir0=[ASC])
     +- 
FlinkLogicalTableSourceScan(table=[[*anonymous_python-input-format$1*]], 
fields=[cc_city, cc_tax_percentage, cc_mkt_class, dummy])
 
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
   at scala.collection.Iterator.foreach(Iterator.scala:943)
   at scala.collection.Iterator.foreach$(Iterator.scala:943)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:431)
   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
   at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
   at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
   at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:569)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversions are FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]false, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]true, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
sort: [0] -> [1 ASC-nulls-first]]
There are 3 empty subsets:
Empty subset 0: rel#240:RelSubset#5.BATCH_PHYSICAL.hash[1]false.[1 
ASC-nulls-first], the relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
 178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])
 
Empty subset 1: rel#215:RelSubset#5.BATCH_PHYSICAL.any.[1 ASC-nulls-first], the 
relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
 178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])
... several more lines 
{code}
I don't understand why a plan cannot be generated for this program. 
Furthermore, this program will run fine if I apply min on the "extra" column 
instead of "cc_mkt_class". Which means its just an issue when dealing with 
performing a min on a string column.

If this is not a bug, can someone explain this behaviour?


> Pyflink planner throws exception in batch mode when there is no limit on 
> order_by()
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38366
>                 URL: https://issues.apache.org/jira/browse/FLINK-38366
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.0
>         Environment: Pyflink 2.0.0
>            Reporter: Ahmad Humayun
>            Priority: Major
>
> The following is a minimized program that causes an exception in the planner
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, Table
> from pyflink.common import Configuration
> from pyflink.table.expressions import col
>  
> cfg = Configuration()
> # The program runs fine if I set any default limit 
> # cfg.set_string("table.exec.sort.default-limit", "10000")
>  
> settings = (
>    EnvironmentSettings.new_instance()
>    .in_batch_mode()
>    .with_configuration(cfg)
>    .build()
> )
>  
> table_env = TableEnvironment.create(settings)
>  
> data = [
>    ("Los Angeles", 9.50, "Premium", 1),
>    ("Houston", 8.25, "Basic", 2),
>    ("New York", 10.00, "Standard", 1),
>    ("Chicago", 8.25, "Premium", 1)
> ]
>  
> source_table = table_env.from_elements(
>    data,
>    schema=["cc_city", "cc_tax_percentage", "cc_mkt_class", "extra"]
> )
>  
> ordered = source_table.order_by(col('cc_city'))
> result = 
> ordered.group_by(col('cc_tax_percentage')).select(col('cc_mkt_class').min.alias('cc_mkt_class'))
>  
> print(result.to_pandas())
> {code}
>  
>  
> The following is the exception thrown:
>  
>  
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query:
>  
> FlinkLogicalCalc(select=[EXPR$0 AS cc_mkt_class])
> +- FlinkLogicalAggregate(group=[\{1}], EXPR$0=[MIN($2)])
>   +- FlinkLogicalSort(sort0=[$0], dir0=[ASC])
>      +- 
> FlinkLogicalTableSourceScan(table=[[*anonymous_python-input-format$1*]], 
> fields=[cc_city, cc_tax_percentage, cc_mkt_class, dummy])
>  
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>    at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
>    at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>    at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>    at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>    at scala.collection.Iterator.foreach(Iterator.scala:943)
>    at scala.collection.Iterator.foreach$(Iterator.scala:943)
>    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>    at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>    at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
>    at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
>    at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at scala.collection.immutable.List.foreach(List.scala:431)
>    at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>    at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
>    at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
>    at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
>    at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
>    at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
>    at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>    at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>    at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>    at java.base/java.lang.reflect.Method.invoke(Method.java:569)
>    at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>    at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>    at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>    at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>    at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>    at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
> are not enough rules to produce a node with desired properties: 
> convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversions are FlinkLogicalSort[convention: LOGICAL -> 
> BATCH_PHYSICAL, FlinkRelDistributionTraitDef: any -> hash[1]false, sort: [0] 
> -> [1 ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> 
> BATCH_PHYSICAL, FlinkRelDistributionTraitDef: any -> hash[1]true, sort: [0] 
> -> [1 ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> 
> BATCH_PHYSICAL, sort: [0] -> [1 ASC-nulls-first]]
> There are 3 empty subsets:
> Empty subset 0: rel#240:RelSubset#5.BATCH_PHYSICAL.hash[1]false.[1 
> ASC-nulls-first], the relevant part of the original plan is as follows
> 201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
>  178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
> table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
> cc_tax_percentage, cc_mkt_class, dummy])
>  
> Empty subset 1: rel#215:RelSubset#5.BATCH_PHYSICAL.any.[1 ASC-nulls-first], 
> the relevant part of the original plan is as follows
> 201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
>  178:FlinkLogicalTableSourceScan(subset=rel#200:RelSubset#4.LOGICAL.any.[], 
> table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
> cc_tax_percentage, cc_mkt_class, dummy])
> ... several more lines 
> {code}
> I don't understand why a plan cannot be generated for this program. 
> Furthermore, this program will run fine if I apply min on the "extra" column 
> instead of "cc_mkt_class". Which means its just an issue when dealing with 
> performing a min on a string column.
> If this is not a bug, can someone explain this behaviour?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to