Ahmad Humayun created FLINK-38637:
-------------------------------------

             Summary: explain() crashes when sort default limit is set
                 Key: FLINK-38637
                 URL: https://issues.apache.org/jira/browse/FLINK-38637
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.0.0
            Reporter: Ahmad Humayun


The following is a standalone example to demonstrate the error

{code:python}
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf
from pyflink.table.types import DataTypes
from pyflink.common import Configuration


cfg = Configuration()


cfg.set_string("table.exec.sort.default-limit", "2")




env_settings = (
   EnvironmentSettings.new_instance()
   .in_batch_mode()
   .with_configuration(cfg)
   .build()
)


table_env = TableEnvironment.create(env_settings)


# Generate minimal data
data = [
   (1, 10, 100.50),
   (2, 15, 200.75),
   (3, 20, 300.25),
   (4, 5, 400.00),
   (5, 18, 500.50),
   (6, 12, 600.75),
   (7, 25, 700.25)
]


schema = [
   "wr_item_sk",
   "wr_returning_hdemo_sk",
   "wr_reversed_charge"
]


# Create source table
source_table = table_env.from_elements(data, schema=schema)
table_env.create_temporary_view("web_returns", source_table)


# Query that causes the crash
autonode_7 = table_env.from_path("web_returns")
autonode_6 = autonode_7.filter(col('wr_returning_hdemo_sk') <= 16)
autonode_5 = autonode_6.select(col('wr_reversed_charge'))
autonode_4 = autonode_5.distinct()
autonode_3 = autonode_4.order_by(col('wr_reversed_charge'))
autonode_2 = autonode_3.group_by(col('wr_reversed_charge')).select(
   col('wr_reversed_charge').max.alias('wr_reversed_charge')
)
autonode_1 = autonode_2.limit(57)
sink = autonode_1.group_by(col('wr_reversed_charge')).select(
   col('wr_reversed_charge').sum.alias('wr_reversed_charge')
)


print(sink.explain())

{code}


Following is the error:

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o85.explain.
: java.lang.RuntimeException: Error while applying rule 
SortProjectTransposeRule, args 
[rel#614:LogicalSort.NONE.any.[](input=RelSubset#582,fetch=57), 
rel#610:LogicalProject.NONE.any.[1](input=RelSubset#501,inputs=0,exprs=[$0])]
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
        at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
        at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)

{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to