Ahmad Humayun created FLINK-38366:
-------------------------------------

             Summary: Pyflink planner throws exception in batch mode when there 
is no limit on order_by()
                 Key: FLINK-38366
                 URL: https://issues.apache.org/jira/browse/FLINK-38366
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.0.0
         Environment: Pyflink 2.0.0
            Reporter: Ahmad Humayun


The following is a minimized program that causes an exception in the planner

```

from pyflink.table import EnvironmentSettings, TableEnvironment, Table

from pyflink.common import Configuration

from pyflink.table.expressions import col

 

cfg = Configuration()


# The program runs fine if I set any default limit
# cfg.set_string("table.exec.sort.default-limit", "10000")

 

settings = (

   EnvironmentSettings.new_instance()

   .in_batch_mode()

   .with_configuration(cfg)

   .build()

)

 

table_env = TableEnvironment.create(settings)

 

data = [

   ("Los Angeles", {_}9.50{_}, "Premium", 1),

   ("Houston", {_}8.25{_}, "Basic", 2),

   ("New York", {_}10.00{_}, "Standard", 1),

   ("Chicago", {_}8.25{_}, "Premium", 1)

]

 

source_table = table_env.from_elements(

   data,

   schema=["cc_city", "cc_tax_percentage", "cc_mkt_class", "extra"]

)

 

ordered = source_table.order_by(col('cc_city'))

result = 
ordered.group_by(col('cc_tax_percentage')).select(col('cc_mkt_class').min.alias('cc_mkt_class'))

 

print(result.to_pandas())

```

The following is the exception thrown:

```

org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query:

 

FlinkLogicalCalc(select=[EXPR$0 AS cc_mkt_class])

+- FlinkLogicalAggregate(group=[\{1}], EXPR$0=[MIN($2)])

  +- FlinkLogicalSort(sort0=[$0], dir0=[ASC])

     +- 
FlinkLogicalTableSourceScan(table=[[*anonymous_python-input-format$1*]], 
fields=[cc_city, cc_tax_percentage, cc_mkt_class, dummy])

 

This exception indicates that the query uses an unsupported SQL feature.

Please check the documentation for the set of currently supported SQL features.

   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)

   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)

   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)

   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)

   at scala.collection.Iterator.foreach(Iterator.scala:943)

   at scala.collection.Iterator.foreach$(Iterator.scala:943)

   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

   at scala.collection.IterableLike.foreach(IterableLike.scala:74)

   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)

   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)

   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)

   at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)

   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)

   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)

   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)

   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)

   at scala.collection.immutable.List.foreach(List.scala:431)

   at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)

   at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)

   at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)

   at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)

   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)

   at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)

   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)

   at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)

   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)

   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)

   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

   at java.base/java.lang.reflect.Method.invoke(Method.java:569)

   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

   at java.base/java.lang.Thread.run(Thread.java:840)

Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: 
convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].

Missing conversions are FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]false, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
FlinkRelDistributionTraitDef: any -> hash[1]true, sort: [0] -> [1 
ASC-nulls-first]], FlinkLogicalSort[convention: LOGICAL -> BATCH_PHYSICAL, 
sort: [0] -> [1 ASC-nulls-first]]

There are 3 empty subsets:

Empty subset 0: rel#240:RelSubset#5.BATCH_PHYSICAL.hash[1]false.[1 
ASC-nulls-first], the relevant part of the original plan is as follows

201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])

 178:FlinkLogicalTableSourceScan(subset=[rel#200:RelSubset#4.LOGICAL.any.[]], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])

 

Empty subset 1: rel#215:RelSubset#5.BATCH_PHYSICAL.any.[1 ASC-nulls-first], the 
relevant part of the original plan is as follows

201:FlinkLogicalSort(sort0=[$0], dir0=[ASC])

 178:FlinkLogicalTableSourceScan(subset=[rel#200:RelSubset#4.LOGICAL.any.[]], 
table=[[*anonymous_python-input-format$1*]], fields=[cc_city, 
cc_tax_percentage, cc_mkt_class, dummy])
... several more lines 
```

I don't understand why this is the case. Furthermore, this program will run 
fine if I apply min on the "extra" column instead of "cc_mkt_class". Which 
means its just an issue when dealing with performing a min on a string column.

If this is not a bug, can someone explain this behaviour?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to