peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459273107



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReuseExchangeAndSubquerySuite.scala
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ReuseExchangeAndSubquerySuite extends SparkPlanTest with 
SharedSparkSession {
+
+  val tableFormat: String = "parquet"
+
+  test("SPARK-32041: No reuse interference inside ReuseExchange") {
+    withTable("df1", "df2") {
+      spark.range(100)
+        .select(col("id"), col("id").as("k"))
+        .write
+        .partitionBy("k")
+        .format(tableFormat)
+        .mode("overwrite")
+        .saveAsTable("df1")
+
+      spark.range(10)
+        .select(col("id"), col("id").as("k"))
+        .write
+        .format(tableFormat)
+        .mode("overwrite")
+        .saveAsTable("df2")
+
+      val df = sql(
+        """
+          |WITH t AS (
+          |  SELECT df1.id, df2.k
+          |  FROM df1 JOIN df2 ON df1.k = df2.k
+          |  WHERE df2.id < 2
+          |)
+          |SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
+          |""".stripMargin)
+
+      val plan = df.queryExecution.executedPlan
+
+      val exchangeIds = plan.collectWithSubqueries { case e: Exchange => e.id }
+      val reusedExchangeIds = plan.collectWithSubqueries {
+        case re: ReusedExchangeExec => re.child.id
+      }
+
+      assert(reusedExchangeIds.forall(exchangeIds.contains(_)),
+        "ReusedExchangeExec should reuse an existing exchange")

Review comment:
       I don't know how to categorize this issue. Currently, without this PR 
the query runs and returns the expected result but reuse doesn't happen, which 
can have serious negative impact on performance. See TPCDS Q14a benchmark 
results: https://github.com/apache/spark/pull/28885#issuecomment-647097876
   So I don't know if this PR is a bugfix or an improvement. @dongjoon-hyun any 
idea?
   
   When a reuse node points to an exchange or subquery instance that doesn't 
appear in the plan then that reuse node simply doesn't makes any sense. The 
plan will work and produce the expected results but the point of reuse is lost.
   
   To answer your question the test case without the PR fails due to the 
assert, but the query doesn't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to