This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new eef5f28a0 chore: check `missingInput` for Comet plan nodes (#2795)
eef5f28a0 is described below

commit eef5f28a0727d9aef043fa2b87d6747ff68b827a
Author: Oleks V <[email protected]>
AuthorDate: Tue Nov 18 17:05:18 2025 -0800

    chore: check `missingInput` for Comet plan nodes (#2795)
    
    * chore: check `missingInput` for Comet plan nodes
    
    ---------
    
    Co-authored-by: Martin Grigorov <[email protected]>
---
 .../sql/comet/CometTakeOrderedAndProjectExec.scala |  7 ++++++-
 .../q79.native_iceberg_compat/simplified.txt       |  2 +-
 .../q79/simplified.txt                             |  2 +-
 .../q84.native_iceberg_compat/simplified.txt       |  2 +-
 .../q84/simplified.txt                             |  2 +-
 .../q84.native_iceberg_compat/simplified.txt       |  2 +-
 .../q84/simplified.txt                             |  2 +-
 .../q79.native_iceberg_compat/simplified.txt       |  2 +-
 .../approved-plans-v1_4/q79/simplified.txt         |  2 +-
 .../q84.native_iceberg_compat/simplified.txt       |  2 +-
 .../approved-plans-v1_4/q84/simplified.txt         |  2 +-
 .../q6.native_iceberg_compat/simplified.txt        |  2 +-
 .../approved-plans-v2_7-spark3_5/q6/simplified.txt |  2 +-
 .../q6.native_iceberg_compat/simplified.txt        |  2 +-
 .../approved-plans-v2_7/q6/simplified.txt          |  2 +-
 .../scala/org/apache/spark/sql/CometTestBase.scala | 24 ++++++++++++++++++++++
 16 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 65ae84d21..2517c19f2 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -22,7 +22,7 @@ package org.apache.spark.sql.comet
 import org.apache.spark.TaskContext
 import org.apache.spark.rdd.{ParallelCollectionRDD, RDD}
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, 
SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
NamedExpression, SortOrder}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, 
UnaryExecNode, UnsafeRowSerializer}
@@ -98,10 +98,15 @@ case class CometTakeOrderedAndProjectExec(
     child: SparkPlan)
     extends CometExec
     with UnaryExecNode {
+
+  override def producedAttributes: AttributeSet = outputSet ++ 
AttributeSet(projectList)
+
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
   private lazy val readMetrics =
     SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "numPartitions" -> SQLMetrics.createMetric(
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
index 8e244d6c9..5b252a906 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
+      CometTakeOrderedAndProject [s_city] 
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
         CometProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
           CometBroadcastHashJoin 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
             CometHashAggregate [ss_addr_sk,sum,sum] 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
index 8e244d6c9..5b252a906 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q79/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
+      CometTakeOrderedAndProject [s_city] 
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
         CometProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
           CometBroadcastHashJoin 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
             CometHashAggregate [ss_addr_sk,sum,sum] 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q84/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q84/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
index 8e244d6c9..5b252a906 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
+      CometTakeOrderedAndProject [s_city] 
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
         CometProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
           CometBroadcastHashJoin 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
             CometHashAggregate [ss_addr_sk,sum,sum] 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
index 8e244d6c9..5b252a906 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q79/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
+      CometTakeOrderedAndProject [s_city] 
[c_last_name,c_first_name,substr(s_city, 1, 30),ss_ticket_number,amt,profit]
         CometProject [c_last_name,c_first_name,substr(s_city, 1, 
30),ss_ticket_number,amt,profit,s_city]
           CometBroadcastHashJoin 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,c_customer_sk,c_first_name,c_last_name]
             CometHashAggregate [ss_addr_sk,sum,sum] 
[ss_ticket_number,ss_customer_sk,s_city,amt,profit,sum(UnscaledValue(ss_coupon_amt)),sum(UnscaledValue(ss_net_profit))]
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
index fd8d1864e..e43557c27 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q84/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [customer_id,customername,c_customer_id]
+      CometTakeOrderedAndProject [c_customer_id] [customer_id,customername]
         CometProject [c_last_name,c_first_name] 
[customer_id,customername,c_customer_id]
           CometBroadcastHashJoin 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk,sr_cdemo_sk]
             CometBroadcastExchange 
[c_customer_id,c_first_name,c_last_name,cd_demo_sk] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
index cbaf71ab0..2978e30c1 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [state,cnt,ca_state]
+      CometTakeOrderedAndProject [ca_state] [state,cnt]
         CometFilter [state,cnt,ca_state]
           CometHashAggregate [count] [state,cnt,ca_state,count(1)]
             CometExchange [ca_state] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
index cbaf71ab0..2978e30c1 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q6/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [state,cnt,ca_state]
+      CometTakeOrderedAndProject [ca_state] [state,cnt]
         CometFilter [state,cnt,ca_state]
           CometHashAggregate [count] [state,cnt,ca_state,count(1)]
             CometExchange [ca_state] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
index cbaf71ab0..2978e30c1 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.native_iceberg_compat/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [state,cnt,ca_state]
+      CometTakeOrderedAndProject [ca_state] [state,cnt]
         CometFilter [state,cnt,ca_state]
           CometHashAggregate [count] [state,cnt,ca_state,count(1)]
             CometExchange [ca_state] #1
diff --git 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
index cbaf71ab0..2978e30c1 100644
--- 
a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
+++ 
b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
@@ -1,7 +1,7 @@
 WholeStageCodegen (1)
   CometColumnarToRow
     InputAdapter
-      CometTakeOrderedAndProject [state,cnt,ca_state]
+      CometTakeOrderedAndProject [ca_state] [state,cnt]
         CometFilter [state,cnt,ca_state]
           CometHashAggregate [count] [state,cnt,ca_state,count(1)]
             CometExchange [ca_state] #1
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 591f75acb..3f7ce5b4f 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -385,6 +385,8 @@ abstract class CometTestBase
             s"plan: ${new ExtendedExplainInfo().generateExtendedInfo(plan)}")
       case _ =>
     }
+
+    checkPlanNotMissingInput(plan)
   }
 
   protected def findFirstNonCometOperator(
@@ -406,6 +408,28 @@ abstract class CometTestBase
     None
   }
 
+  // checks the plan node has no missing inputs
+  // such nodes represented in plan with exclamation mark !
+  // example: !CometWindowExec
+  protected def checkPlanNotMissingInput(plan: SparkPlan): Unit = {
+    def hasMissingInput(node: SparkPlan): Boolean = {
+      node.missingInput.nonEmpty && node.children.nonEmpty
+    }
+
+    val isCometNode = plan.nodeName.startsWith("Comet")
+
+    if (isCometNode && hasMissingInput(plan)) {
+      assert(
+        false,
+        s"Plan node `${plan.nodeName}` has invalid missingInput: 
${plan.missingInput}")
+    }
+
+    // Otherwise recursively check children
+    plan.children.foreach { child =>
+      checkPlanNotMissingInput(child)
+    }
+  }
+
   private def checkPlanContains(plan: SparkPlan, includePlans: Class[_]*): 
Unit = {
     includePlans.foreach { case planClass =>
       if (plan.find(op => planClass.isAssignableFrom(op.getClass)).isEmpty) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to