Ahmad Humayun created FLINK-38366:
-------------------------------------
Summary: Pyflink planner throws exception in batch mode when there
is no limit on order_by()
Key: FLINK-38366
URL: https://issues.apache.org/jira/browse/FLINK-38366
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.0.0
Environment: Pyflink 2.0.0
Reporter: Ahmad Humayun
The following is a minimized program that causes an exception in the planner
```
from pyflink.table import EnvironmentSettings, TableEnvironment, Table
from pyflink.common import Configuration
from pyflink.table.expressions import col
cfg = Configuration()
# The program runs fine if I set any default limit
# cfg.set_string("table.exec.sort.default-limit", "10000")
settings = (
EnvironmentSettings.new_instance()
.in_batch_mode()
.with_configuration(cfg)
.build()
)
table_env = TableEnvironment.create(settings)
data = [
("Los Angeles", {_}9.50{_}, "Premium", 1),
("Houston", {_}8.25{_}, "Basic", 2),
("New York", {_}10.00{_}, "Standard", 1),
("Chicago", {_}8.25{_}, "Premium", 1)
]
source_table = table_env.from_elements(
data,
schema=["cc_city", "cc_tax_percentage", "cc_mkt_class", "extra"]
)
ordered = source_table.order_by(col('cc_city'))
result =
ordered.group_by(col('cc_tax_percentage')).select(col('cc_mkt_class').min.alias('cc_mkt_class'))
print(result.to_pandas())
```
The following is the exception thrown:
```
org.apache.flink.table.api.TableException: Cannot generate a valid execution
plan for the given query:
FlinkLogicalCalc(select=[EXPR$0 AS cc_mkt_class])
+- FlinkLogicalAggregate(group=[\{1}], EXPR$0=[MIN($2)])
+- FlinkLogicalSort(sort0=[$0], dir0=[ASC])
+-
FlinkLogicalTableSourceScan(table=[[*anonymous_python-input-format$1*]],
fields=[cc_city, cc_tax_percentage, cc_mkt_class, dummy])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
at
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
at
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
at
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are
not enough rules to produce a node with desired properties:
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversions are FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL,
FlinkRelDistributionTraitDef: any -> hash[1]false, sort: [0] -> [1
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL,
FlinkRelDistributionTraitDef: any -> hash[1]true, sort: [0] -> [1
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL,
sort: [0] -> [1 ASC-nulls-first]]
There are 3 empty subsets:
Empty subset 0: rel#240:RelSubset#5.BATCH_PHYSICAL.hash[1]false.[1
ASC-nulls-first], the relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
178:FlinkLogicalTableSourceScan(subset=[rel#200:RelSubset#4.LOGICAL.any.[]],
table=[[*anonymous_python-input-format$1*]], fields=[cc_city,
cc_tax_percentage, cc_mkt_class, dummy])
Empty subset 1: rel#215:RelSubset#5.BATCH_PHYSICAL.any.[1 ASC-nulls-first], the
relevant part of the original plan is as follows
201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])
178:FlinkLogicalTableSourceScan(subset=[rel#200:RelSubset#4.LOGICAL.any.[]],
table=[[*anonymous_python-input-format$1*]], fields=[cc_city,
cc_tax_percentage, cc_mkt_class, dummy])
... several more lines
```
I don't understand why this is the case. Furthermore, this program will run
fine if I apply min on the "extra" column instead of "cc_mkt_class". Which
means its just an issue when dealing with performing a min on a string column.
If this is not a bug, can someone explain this behaviour?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)