This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2e8c1cb60e3 [SPARK-48195][CORE] Save and reuse RDD/Broadcast created 
by SparkPlan
d2e8c1cb60e3 is described below

commit d2e8c1cb60e34a1c7e92374c07d682aa5ca79145
Author: Julek Sompolski <Juliusz Sompolski>
AuthorDate: Mon Sep 23 12:39:02 2024 +0900

    [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan
    
    ### What changes were proposed in this pull request?
    
    Save the RDD created by doExecute, instead of creating a new one in execute 
each time.
    
    Currently, many types of SparkPlans already save the RDD they create. For 
example, shuffle just save `lazy val inputRDD: RDD[InternalRow] = 
child.execute()`. It creates inconsistencies when an action (e.g. repeated 
`df.collect()`) is executed on Dataframe twice:
    * The SparkPlan will be reused, since the same 
`df.queryExecution.executedPlan` will be used.
    * Any not-result stage will be reused, as the shuffle operators will just 
have their `inputRDD` reused.
    * However, for result stage, `execute()` will call `doExecute()` again, and 
the logic of generating the actual execution RDD will be reexecuted for the 
result stage.
    
    This means that for example for the result stage, WSCG code gen will 
generate and compile new code, create a new RDD out of it. Generation of 
execution RDDs is also often influenced by config: for example, staying with 
WSCG, various configs like `spark.sql.codegen.hugeMethodLimit` or 
`spark.sql.codegen.methodSplitThreshold`. The fact that upon re-execution this 
will be evaluated anew for the result stage, but not for earlier stages creates 
inconsistencies in what config changes are visible.
    
    By saving the result of `doExecute` and reusing the RDD in `execute` we 
make sure that work in creating that RDD is not duplicated, and it is more 
consistent that all RDDs of the plan are reused, same as with the 
`executedPlan`.
    
    Note, that while the results of earlier shuffle stages are also reused, the 
result stage still does get executed again, as the result of it are not saved 
and available for Reuse in BlockManager.
    
    We also add a `Lazy` utility instead of using `lazy val` to deal with 
shortcomings of scala lazy val.
    
    ### Why are the changes needed?
    
    Resolved subtle inconsistencies coming from object reuse vs. recreating 
objects from scratch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Subtle changes caused by the RDD being reused, e.g. when a config change 
might be picked up. However, it makes things more consistent. Spark 4.0.0 might 
be a good candidate for making such a change.
    
    ### How was this patch tested?
    
    Existing SQL execution tests validate that the change in SparkPlan works.
    Tests were added for the new Lazy utility.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Github Copilot
    (trivial code completion suggestions)
    
    Closes #48037 from juliuszsompolski/SPARK-48195-rdd.
    
    Lead-authored-by: Julek Sompolski <Juliusz Sompolski>
    Co-authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../main/scala/org/apache/spark/util/LazyTry.scala |  70 ++++++++++
 .../main/scala/org/apache/spark/util/Utils.scala   |  80 +++++++++++
 .../scala/org/apache/spark/util/LazyTrySuite.scala | 151 +++++++++++++++++++++
 .../scala/org/apache/spark/util/UtilsSuite.scala   | 112 ++++++++++++++-
 .../spark/sql/execution/CollectMetricsExec.scala   |   5 +
 .../org/apache/spark/sql/execution/SparkPlan.scala |  21 ++-
 .../execution/columnar/InMemoryTableScanExec.scala |  82 ++++++-----
 .../execution/exchange/ShuffleExchangeExec.scala   |  13 +-
 8 files changed, 475 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/LazyTry.scala 
b/core/src/main/scala/org/apache/spark/util/LazyTry.scala
new file mode 100644
index 000000000000..7edc08672c26
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/LazyTry.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import scala.util.Try
+
+/**
+ * Wrapper utility for a lazy val, with two differences compared to scala 
behavior:
+ *
+ * 1. Non-retrying in case of failure. This wrapper stores the exception in a 
Try, and will re-throw
+ *    it on the access to `get`.
+ *    In scala, when a `lazy val` field initialization throws an exception, 
the field remains
+ *    uninitialized, and initialization will be re-attempted on the next 
access. This also can lead
+ *    to performance issues, needlessly computing something towards a failure, 
and also can lead to
+ *    duplicated side effects.
+ *
+ * 2. Resolving locking issues.
+ *    In scala, when a `lazy val` field is initialized, it grabs the 
synchronized lock on the
+ *    enclosing object instance. This can lead both to performance issues, and 
deadlocks.
+ *    For example:
+ *     a) Thread 1 entered a synchronized method, grabbing a coarse lock on 
the parent object.
+ *     b) Thread 2 get spawned off, and tries to initialize a lazy value on 
the same parent object
+ *        This causes scala to also try to grab a lock on the parent object.
+ *     c) If thread 1 waits for thread 2 to join, a deadlock occurs.
+ *    This wrapper will only grab a lock on the wrapper itself, and not the 
parent object.
+ *
+ * @param initialize The block of code to initialize the lazy value.
+ * @tparam T type of the lazy value.
+ */
+private[spark] class LazyTry[T](initialize: => T) extends Serializable {
+  private lazy val tryT: Try[T] = Utils.doTryWithCallerStacktrace { initialize 
}
+
+  /**
+   * Get the lazy value. If the initialization block threw an exception, it 
will be re-thrown here.
+   * The exception will be re-thrown with the current caller's stacktrace.
+   * An exception with stack trace from when the exception was first thrown 
can be accessed with
+   * ```
+   * ex.getSuppressed.find { e =>
+   *   e.getMessage == 
org.apache.spark.util.Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE
+   * }
+   * ```
+   */
+  def get: T = Utils.getTryWithCallerStacktrace(tryT)
+}
+
+private[spark] object LazyTry {
+  /**
+   * Create a new LazyTry instance.
+   *
+   * @param initialize The block of code to initialize the lazy value.
+   * @tparam T type of the lazy value.
+   * @return a new LazyTry instance.
+   */
+  def apply[T](initialize: => T): LazyTry[T] = new LazyTry(initialize)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d8392cd8043d..52213f36a2cd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1351,6 +1351,86 @@ private[spark] object Utils
     }
   }
 
+  val TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE =
+    "Full stacktrace of original doTryWithCallerStacktrace caller"
+
+  val TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE =
+    "Stacktrace under doTryWithCallerStacktrace"
+
+  /**
+   * Use Try with stacktrace substitution for the caller retrieving the error.
+   *
+   * Normally in case of failure, the exception would have the stacktrace of 
the caller that
+   * originally called doTryWithCallerStacktrace. However, we want to replace 
the part above
+   * this function with the stacktrace of the caller who calls 
getTryWithCallerStacktrace.
+   * So here we save the part of the stacktrace below 
doTryWithCallerStacktrace, and
+   * getTryWithCallerStacktrace will stitch it with the new stack trace of the 
caller.
+   * The full original stack trace is kept in ex.getSuppressed.
+   *
+   * @param f Code block to be wrapped in Try
+   * @return Try with Success or Failure of the code block. Use with 
getTryWithCallerStacktrace.
+   */
+  def doTryWithCallerStacktrace[T](f: => T): Try[T] = {
+    val t = Try {
+      f
+    }
+    t match {
+      case Failure(ex) =>
+        // Note: we remove the common suffix instead of e.g. finding the call 
to this function, to
+        // account for recursive calls with multiple doTryWithCallerStacktrace 
on the stack trace.
+        val origStackTrace = ex.getStackTrace
+        val currentStackTrace = Thread.currentThread().getStackTrace
+        val commonSuffixLen = 
origStackTrace.reverse.zip(currentStackTrace.reverse).takeWhile {
+          case (exElem, currentElem) => exElem == currentElem
+        }.length
+        val belowEx = new Exception(TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE)
+        belowEx.setStackTrace(origStackTrace.dropRight(commonSuffixLen))
+        ex.addSuppressed(belowEx)
+
+        // keep the full original stack trace in a suppressed exception.
+        val fullEx = new Exception(TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE)
+        fullEx.setStackTrace(origStackTrace)
+        ex.addSuppressed(fullEx)
+      case Success(_) => // nothing
+    }
+    t
+  }
+
+  /**
+   * Retrieve the result of Try that was created by doTryWithCallerStacktrace.
+   *
+   * In case of failure, the resulting exception has a stack trace that 
combines the stack trace
+   * below the original doTryWithCallerStacktrace which triggered it, with the 
caller stack trace
+   * of the current caller of getTryWithCallerStacktrace.
+   *
+   * Full stack trace of the original doTryWithCallerStacktrace caller can be 
retrieved with
+   * ```
+   * ex.getSuppressed.find { e =>
+   * e.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE
+   * }
+   * ```
+   *
+   *
+   * @param t Try from doTryWithCallerStacktrace
+   * @return Result of the Try or rethrows the failure exception with modified 
stacktrace.
+   */
+  def getTryWithCallerStacktrace[T](t: Try[T]): T = t match {
+    case Failure(ex) =>
+      val belowStacktrace = ex.getSuppressed.find { e =>
+        // added in doTryWithCallerStacktrace
+        e.getMessage == TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE
+      }.getOrElse {
+        // If we don't have the expected stacktrace information, just rethrow
+        throw ex
+      }.getStackTrace
+      // We are modifying and throwing the original exception. It would be 
better if we could
+      // return a copy, but we can't easily clone it and preserve. If this is 
accessed from
+      // multiple threads that then look at the stack trace, this could break.
+      ex.setStackTrace(belowStacktrace ++ 
Thread.currentThread().getStackTrace.drop(1))
+      throw ex
+    case Success(s) => s
+  }
+
   // A regular expression to match classes of the internal Spark API's
   // that we want to skip when finding the call site of a method.
   private val SPARK_CORE_CLASS_REGEX =
diff --git a/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala 
b/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala
new file mode 100644
index 000000000000..79c07f8fbfea
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class LazyTrySuite extends SparkFunSuite{
+  test("LazyTry should initialize only once") {
+    var count = 0
+    val lazyVal = LazyTry {
+      count += 1
+      count
+    }
+    assert(count == 0)
+    assert(lazyVal.get == 1)
+    assert(count == 1)
+    assert(lazyVal.get == 1)
+    assert(count == 1)
+  }
+
+  test("LazyTry should re-throw exceptions") {
+    val lazyVal = LazyTry {
+      throw new RuntimeException("test")
+    }
+    intercept[RuntimeException] {
+      lazyVal.get
+    }
+    intercept[RuntimeException] {
+      lazyVal.get
+    }
+  }
+
+  test("LazyTry should re-throw exceptions with current caller stack-trace") {
+    val fileName = Thread.currentThread().getStackTrace()(1).getFileName
+    val lineNo = Thread.currentThread().getStackTrace()(1).getLineNumber
+    val lazyVal = LazyTry {
+      throw new RuntimeException("test")
+    }
+
+    val e1 = intercept[RuntimeException] {
+      lazyVal.get // lineNo + 6
+    }
+    assert(e1.getStackTrace
+      .exists(elem => elem.getFileName == fileName && elem.getLineNumber == 
lineNo + 6))
+
+    val e2 = intercept[RuntimeException] {
+      lazyVal.get // lineNo + 12
+    }
+    assert(e2.getStackTrace
+      .exists(elem => elem.getFileName == fileName && elem.getLineNumber == 
lineNo + 12))
+  }
+
+  test("LazyTry does not lock containing object") {
+    class LazyContainer() {
+      @volatile var aSet = 0
+
+      val a: LazyTry[Int] = LazyTry {
+        aSet = 1
+        aSet
+      }
+
+      val b: LazyTry[Int] = LazyTry {
+        val t = new Thread(new Runnable {
+          override def run(): Unit = {
+            assert(a.get == 1)
+          }
+        })
+        t.start()
+        t.join()
+        aSet
+      }
+    }
+    val container = new LazyContainer()
+    // Nothing is lazy initialized yet
+    assert(container.aSet == 0)
+    // This will not deadlock, thread t will initialize a, and update aSet
+    assert(container.b.get == 1)
+    assert(container.aSet == 1)
+  }
+
+  // Scala lazy val tests are added to test for potential changes in the 
semantics of scala lazy val
+
+  test("Scala lazy val initializing multiple times on error") {
+    class LazyValError() {
+      var counter = 0
+      lazy val a = {
+        counter += 1
+        throw new RuntimeException("test")
+      }
+    }
+    val lazyValError = new LazyValError()
+    intercept[RuntimeException] {
+      lazyValError.a
+    }
+    assert(lazyValError.counter == 1)
+    intercept[RuntimeException] {
+      lazyValError.a
+    }
+    assert(lazyValError.counter == 2)
+  }
+
+  test("Scala lazy val locking containing object and deadlocking") {
+    // Note: this will change in scala 3, with different lazy vals not 
deadlocking with each other.
+    // 
https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html
+    class LazyValContainer() {
+      @volatile var aSet = 0
+      @volatile var t: Thread = _
+
+      lazy val a = {
+        aSet = 1
+        aSet
+      }
+
+      lazy val b = {
+        t = new Thread(new Runnable {
+          override def run(): Unit = {
+            assert(a == 1)
+          }
+        })
+        t.start()
+        t.join(1000)
+        aSet
+      }
+    }
+    val container = new LazyValContainer()
+    // Nothing is lazy initialized yet
+    assert(container.aSet == 0)
+    // This will deadlock, because b will take monitor on LazyValContainer, 
and then thread t
+    // will wait on that monitor, not able to initialize a.
+    // b will therefore see aSet == 0.
+    assert(container.b == 0)
+    // However, after b finishes initializing, the monitor will be released, 
and then thread t
+    // will finish initializing a, and set aSet to 1.
+    container.t.join()
+    assert(container.aSet == 1)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4fe6fcf17f49..a694e08def89 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit
 import java.util.zip.GZIPOutputStream
 
 import scala.collection.mutable.ListBuffer
-import scala.util.Random
+import scala.util.{Random, Try}
 
 import com.google.common.io.Files
 import org.apache.commons.io.IOUtils
@@ -1523,6 +1523,116 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties {
     conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
     assert(Utils.isPushBasedShuffleEnabled(conf, isDriver = true) === false)
   }
+
+
+  private def throwException(): String = {
+    throw new Exception("test")
+  }
+
+  private def callDoTry(): Try[String] = {
+    Utils.doTryWithCallerStacktrace {
+      throwException()
+    }
+  }
+
+  private def callGetTry(t: Try[String]): String = {
+    Utils.getTryWithCallerStacktrace(t)
+  }
+
+  private def callGetTryAgain(t: Try[String]): String = {
+    Utils.getTryWithCallerStacktrace(t)
+  }
+
+  test("doTryWithCallerStacktrace and getTryWithCallerStacktrace") {
+    val t = callDoTry()
+
+    val e1 = intercept[Exception] {
+      callGetTry(t)
+    }
+    // Uncomment for manual inspection
+    // e1.printStackTrace()
+    // Example:
+    // java.lang.Exception: test
+    //   at 
org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645)
+    //   at scala.util.Try$.apply(Try.scala:213)
+    //   at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586)
+    //   at 
org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1639)
+    //   at org.apache.spark.util.UtilsSuite.callGetTry(UtilsSuite.scala:1650)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1661)
+    // <- callGetTry is seen as calling getTryWithCallerStacktrace
+
+    val st1 = e1.getStackTrace
+    // throwException should be on the stack trace
+    assert(st1.exists(_.getMethodName == "throwException"))
+    // callDoTry shouldn't be on the stack trace, but callGetTry should be.
+    assert(!st1.exists(_.getMethodName == "callDoTry"))
+    assert(st1.exists(_.getMethodName == "callGetTry"))
+
+    // The original stack trace with callDoTry should be in the suppressed 
exceptions.
+    // Example:
+    // scalastyle:off line.size.limit
+    // Suppressed: java.lang.Exception: Full stacktrace of original 
doTryWithCallerStacktrace caller
+    //   at 
org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645)
+    //   at scala.util.Try$.apply(Try.scala:213)
+    //   at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586)
+    //   at org.apache.spark.util.UtilsSuite.callDoTry(UtilsSuite.scala:1645)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1658)
+    //   ... 56 more
+    // scalastyle:on line.size.limit
+    val origSt = e1.getSuppressed.find(
+      _.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE)
+    assert(origSt.isDefined)
+    assert(origSt.get.getStackTrace.exists(_.getMethodName == 
"throwException"))
+    assert(origSt.get.getStackTrace.exists(_.getMethodName == "callDoTry"))
+
+    // The stack trace under Try should be in the suppressed exceptions.
+    // Example:
+    // Suppressed: java.lang.Exception: Stacktrace under 
doTryWithCallerStacktrace
+    //   at org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala: 
1640)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala: 1645)
+    //   at scala.util.Try$.apply(Try.scala: 213)
+    //   at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala: 1586)
+    val trySt = e1.getSuppressed.find(
+      _.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE)
+    assert(trySt.isDefined)
+    // calls under callDoTry should be present.
+    assert(trySt.get.getStackTrace.exists(_.getMethodName == "throwException"))
+    // callDoTry should be removed.
+    assert(!trySt.get.getStackTrace.exists(_.getMethodName == "callDoTry"))
+
+    val e2 = intercept[Exception] {
+      callGetTryAgain(t)
+    }
+    // Uncomment for manual inspection
+    // e2.printStackTrace()
+    // Example:
+    // java.lang.Exception: test
+    //   at 
org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645)
+    //   at scala.util.Try$.apply(Try.scala:213)
+    //   at 
org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586)
+    //   at 
org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1639)
+    //   at 
org.apache.spark.util.UtilsSuite.callGetTryAgain(UtilsSuite.scala:1654)
+    //   at 
org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1711)
+    // <- callGetTryAgain is seen as calling getTryWithCallerStacktrace
+
+    val st2 = e2.getStackTrace
+    // throwException should be on the stack trace
+    assert(st2.exists(_.getMethodName == "throwException"))
+    // callDoTry shouldn't be on the stack trace, but callGetTryAgain should 
be.
+    assert(!st2.exists(_.getMethodName == "callDoTry"))
+    assert(st2.exists(_.getMethodName == "callGetTryAgain"))
+    // callGetTry that we called before shouldn't be on the stack trace.
+    assert(!st2.exists(_.getMethodName == "callGetTry"))
+
+    // Unfortunately, this utility is not able to clone the exception, but 
modifies it in place,
+    // so now e1 is also pointing to "callGetTryAgain" instead of "callGetTry".
+    val st1Again = e1.getStackTrace
+    assert(st1Again.exists(_.getMethodName == "callGetTryAgain"))
+    assert(!st1Again.exists(_.getMethodName == "callGetTry"))
+  }
 }
 
 private class SimpleExtension
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
index dc918e51d055..2115e21f81d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala
@@ -60,6 +60,11 @@ case class CollectMetricsExec(
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def resetMetrics(): Unit = {
+    accumulator.reset()
+    super.resetMetrics()
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     val collector = accumulator
     collector.reset()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 7bc770a0c9e3..fb3ec3ad4181 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.execution.datasources.WriteFilesSpec
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.NextIterator
+import org.apache.spark.util.{LazyTry, NextIterator}
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 object SparkPlan {
@@ -182,6 +182,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   /** Specifies sort order for each partition requirements on the input data 
for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
+  @transient
+  private val executeRDD = LazyTry {
+    doExecute()
+  }
+
   /**
    * Returns the result of this query as an RDD[InternalRow] by delegating to 
`doExecute` after
    * preparations.
@@ -192,7 +197,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     if (isCanonicalizedPlan) {
       throw SparkException.internalError("A canonicalized plan is not supposed 
to be executed.")
     }
-    doExecute()
+    executeRDD.get
+  }
+
+  private val executeBroadcastBcast = LazyTry {
+    doExecuteBroadcast()
   }
 
   /**
@@ -205,7 +214,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     if (isCanonicalizedPlan) {
       throw SparkException.internalError("A canonicalized plan is not supposed 
to be executed.")
     }
-    doExecuteBroadcast()
+    executeBroadcastBcast.get.asInstanceOf[broadcast.Broadcast[T]]
+  }
+
+  private val executeColumnarRDD = LazyTry {
+    doExecuteColumnar()
   }
 
   /**
@@ -219,7 +232,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     if (isCanonicalizedPlan) {
       throw SparkException.internalError("A canonicalized plan is not supposed 
to be executed.")
     }
-    doExecuteColumnar()
+    executeColumnarRDD.get
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index cfcfd282e548..cbd60804b27e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -99,48 +99,6 @@ case class InMemoryTableScanExec(
         
relation.cacheBuilder.serializer.supportsColumnarOutput(relation.schema)
   }
 
-  private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val buffers = filteredCachedBatches()
-    relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch(
-      buffers,
-      relation.output,
-      attributes,
-      conf).map { cb =>
-      numOutputRows += cb.numRows()
-      cb
-    }
-  }
-
-  private lazy val inputRDD: RDD[InternalRow] = {
-    if (enableAccumulatorsForTest) {
-      readPartitions.setValue(0)
-      readBatches.setValue(0)
-    }
-
-    val numOutputRows = longMetric("numOutputRows")
-    // Using these variables here to avoid serialization of entire objects (if 
referenced
-    // directly) within the map Partitions closure.
-    val relOutput = relation.output
-    val serializer = relation.cacheBuilder.serializer
-
-    // update SQL metrics
-    val withMetrics =
-      filteredCachedBatches().mapPartitionsInternal { iter =>
-        if (enableAccumulatorsForTest && iter.hasNext) {
-          readPartitions.add(1)
-        }
-        iter.map { batch =>
-          if (enableAccumulatorsForTest) {
-            readBatches.add(1)
-          }
-          numOutputRows += batch.numRows
-          batch
-        }
-      }
-    serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, 
attributes, conf)
-  }
-
   override def output: Seq[Attribute] = attributes
 
   private def cachedPlan = relation.cachedPlan match {
@@ -191,11 +149,47 @@ case class InMemoryTableScanExec(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    inputRDD
+    // Resulting RDD is cached and reused by SparkPlan.executeRDD
+    if (enableAccumulatorsForTest) {
+      readPartitions.setValue(0)
+      readBatches.setValue(0)
+    }
+
+    val numOutputRows = longMetric("numOutputRows")
+    // Using these variables here to avoid serialization of entire objects (if 
referenced
+    // directly) within the map Partitions closure.
+    val relOutput = relation.output
+    val serializer = relation.cacheBuilder.serializer
+
+    // update SQL metrics
+    val withMetrics =
+      filteredCachedBatches().mapPartitionsInternal { iter =>
+        if (enableAccumulatorsForTest && iter.hasNext) {
+          readPartitions.add(1)
+        }
+        iter.map { batch =>
+          if (enableAccumulatorsForTest) {
+            readBatches.add(1)
+          }
+          numOutputRows += batch.numRows
+          batch
+        }
+      }
+    serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, 
attributes, conf)
   }
 
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    columnarInputRDD
+    // Resulting RDD is cached and reused by SparkPlan.executeColumnarRDD
+    val numOutputRows = longMetric("numOutputRows")
+    val buffers = filteredCachedBatches()
+    relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch(
+      buffers,
+      relation.output,
+      attributes,
+      conf).map { cb =>
+      numOutputRows += cb.numRows()
+      cb
+    }
   }
 
   override def isMaterialized: Boolean = 
relation.cacheBuilder.isCachedColumnBuffersLoaded
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 90f00a5035e1..ae11229cd516 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -249,17 +249,10 @@ case class ShuffleExchangeExec(
     dep
   }
 
-  /**
-   * Caches the created ShuffleRowRDD so we can reuse that.
-   */
-  private var cachedShuffleRDD: ShuffledRowRDD = null
-
   protected override def doExecute(): RDD[InternalRow] = {
-    // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
-    }
-    cachedShuffleRDD
+    // The ShuffleRowRDD will be cached in SparkPlan.executeRDD and reused if 
this plan is used by
+    // multiple plans.
+    new ShuffledRowRDD(shuffleDependency, readMetrics)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
ShuffleExchangeExec =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to