This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d2e8c1cb60e3 [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan d2e8c1cb60e3 is described below commit d2e8c1cb60e34a1c7e92374c07d682aa5ca79145 Author: Julek Sompolski <Juliusz Sompolski> AuthorDate: Mon Sep 23 12:39:02 2024 +0900 [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan ### What changes were proposed in this pull request? Save the RDD created by doExecute, instead of creating a new one in execute each time. Currently, many types of SparkPlans already save the RDD they create. For example, shuffle just save `lazy val inputRDD: RDD[InternalRow] = child.execute()`. It creates inconsistencies when an action (e.g. repeated `df.collect()`) is executed on Dataframe twice: * The SparkPlan will be reused, since the same `df.queryExecution.executedPlan` will be used. * Any not-result stage will be reused, as the shuffle operators will just have their `inputRDD` reused. * However, for result stage, `execute()` will call `doExecute()` again, and the logic of generating the actual execution RDD will be reexecuted for the result stage. This means that for example for the result stage, WSCG code gen will generate and compile new code, create a new RDD out of it. Generation of execution RDDs is also often influenced by config: for example, staying with WSCG, various configs like `spark.sql.codegen.hugeMethodLimit` or `spark.sql.codegen.methodSplitThreshold`. The fact that upon re-execution this will be evaluated anew for the result stage, but not for earlier stages creates inconsistencies in what config changes are visible. By saving the result of `doExecute` and reusing the RDD in `execute` we make sure that work in creating that RDD is not duplicated, and it is more consistent that all RDDs of the plan are reused, same as with the `executedPlan`. Note, that while the results of earlier shuffle stages are also reused, the result stage still does get executed again, as the result of it are not saved and available for Reuse in BlockManager. We also add a `Lazy` utility instead of using `lazy val` to deal with shortcomings of scala lazy val. ### Why are the changes needed? Resolved subtle inconsistencies coming from object reuse vs. recreating objects from scratch. ### Does this PR introduce _any_ user-facing change? Subtle changes caused by the RDD being reused, e.g. when a config change might be picked up. However, it makes things more consistent. Spark 4.0.0 might be a good candidate for making such a change. ### How was this patch tested? Existing SQL execution tests validate that the change in SparkPlan works. Tests were added for the new Lazy utility. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Github Copilot (trivial code completion suggestions) Closes #48037 from juliuszsompolski/SPARK-48195-rdd. Lead-authored-by: Julek Sompolski <Juliusz Sompolski> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/util/LazyTry.scala | 70 ++++++++++ .../main/scala/org/apache/spark/util/Utils.scala | 80 +++++++++++ .../scala/org/apache/spark/util/LazyTrySuite.scala | 151 +++++++++++++++++++++ .../scala/org/apache/spark/util/UtilsSuite.scala | 112 ++++++++++++++- .../spark/sql/execution/CollectMetricsExec.scala | 5 + .../org/apache/spark/sql/execution/SparkPlan.scala | 21 ++- .../execution/columnar/InMemoryTableScanExec.scala | 82 ++++++----- .../execution/exchange/ShuffleExchangeExec.scala | 13 +- 8 files changed, 475 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/LazyTry.scala b/core/src/main/scala/org/apache/spark/util/LazyTry.scala new file mode 100644 index 000000000000..7edc08672c26 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/LazyTry.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.util.Try + +/** + * Wrapper utility for a lazy val, with two differences compared to scala behavior: + * + * 1. Non-retrying in case of failure. This wrapper stores the exception in a Try, and will re-throw + * it on the access to `get`. + * In scala, when a `lazy val` field initialization throws an exception, the field remains + * uninitialized, and initialization will be re-attempted on the next access. This also can lead + * to performance issues, needlessly computing something towards a failure, and also can lead to + * duplicated side effects. + * + * 2. Resolving locking issues. + * In scala, when a `lazy val` field is initialized, it grabs the synchronized lock on the + * enclosing object instance. This can lead both to performance issues, and deadlocks. + * For example: + * a) Thread 1 entered a synchronized method, grabbing a coarse lock on the parent object. + * b) Thread 2 get spawned off, and tries to initialize a lazy value on the same parent object + * This causes scala to also try to grab a lock on the parent object. + * c) If thread 1 waits for thread 2 to join, a deadlock occurs. + * This wrapper will only grab a lock on the wrapper itself, and not the parent object. + * + * @param initialize The block of code to initialize the lazy value. + * @tparam T type of the lazy value. + */ +private[spark] class LazyTry[T](initialize: => T) extends Serializable { + private lazy val tryT: Try[T] = Utils.doTryWithCallerStacktrace { initialize } + + /** + * Get the lazy value. If the initialization block threw an exception, it will be re-thrown here. + * The exception will be re-thrown with the current caller's stacktrace. + * An exception with stack trace from when the exception was first thrown can be accessed with + * ``` + * ex.getSuppressed.find { e => + * e.getMessage == org.apache.spark.util.Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE + * } + * ``` + */ + def get: T = Utils.getTryWithCallerStacktrace(tryT) +} + +private[spark] object LazyTry { + /** + * Create a new LazyTry instance. + * + * @param initialize The block of code to initialize the lazy value. + * @tparam T type of the lazy value. + * @return a new LazyTry instance. + */ + def apply[T](initialize: => T): LazyTry[T] = new LazyTry(initialize) +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d8392cd8043d..52213f36a2cd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1351,6 +1351,86 @@ private[spark] object Utils } } + val TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE = + "Full stacktrace of original doTryWithCallerStacktrace caller" + + val TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE = + "Stacktrace under doTryWithCallerStacktrace" + + /** + * Use Try with stacktrace substitution for the caller retrieving the error. + * + * Normally in case of failure, the exception would have the stacktrace of the caller that + * originally called doTryWithCallerStacktrace. However, we want to replace the part above + * this function with the stacktrace of the caller who calls getTryWithCallerStacktrace. + * So here we save the part of the stacktrace below doTryWithCallerStacktrace, and + * getTryWithCallerStacktrace will stitch it with the new stack trace of the caller. + * The full original stack trace is kept in ex.getSuppressed. + * + * @param f Code block to be wrapped in Try + * @return Try with Success or Failure of the code block. Use with getTryWithCallerStacktrace. + */ + def doTryWithCallerStacktrace[T](f: => T): Try[T] = { + val t = Try { + f + } + t match { + case Failure(ex) => + // Note: we remove the common suffix instead of e.g. finding the call to this function, to + // account for recursive calls with multiple doTryWithCallerStacktrace on the stack trace. + val origStackTrace = ex.getStackTrace + val currentStackTrace = Thread.currentThread().getStackTrace + val commonSuffixLen = origStackTrace.reverse.zip(currentStackTrace.reverse).takeWhile { + case (exElem, currentElem) => exElem == currentElem + }.length + val belowEx = new Exception(TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE) + belowEx.setStackTrace(origStackTrace.dropRight(commonSuffixLen)) + ex.addSuppressed(belowEx) + + // keep the full original stack trace in a suppressed exception. + val fullEx = new Exception(TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE) + fullEx.setStackTrace(origStackTrace) + ex.addSuppressed(fullEx) + case Success(_) => // nothing + } + t + } + + /** + * Retrieve the result of Try that was created by doTryWithCallerStacktrace. + * + * In case of failure, the resulting exception has a stack trace that combines the stack trace + * below the original doTryWithCallerStacktrace which triggered it, with the caller stack trace + * of the current caller of getTryWithCallerStacktrace. + * + * Full stack trace of the original doTryWithCallerStacktrace caller can be retrieved with + * ``` + * ex.getSuppressed.find { e => + * e.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE + * } + * ``` + * + * + * @param t Try from doTryWithCallerStacktrace + * @return Result of the Try or rethrows the failure exception with modified stacktrace. + */ + def getTryWithCallerStacktrace[T](t: Try[T]): T = t match { + case Failure(ex) => + val belowStacktrace = ex.getSuppressed.find { e => + // added in doTryWithCallerStacktrace + e.getMessage == TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE + }.getOrElse { + // If we don't have the expected stacktrace information, just rethrow + throw ex + }.getStackTrace + // We are modifying and throwing the original exception. It would be better if we could + // return a copy, but we can't easily clone it and preserve. If this is accessed from + // multiple threads that then look at the stack trace, this could break. + ex.setStackTrace(belowStacktrace ++ Thread.currentThread().getStackTrace.drop(1)) + throw ex + case Success(s) => s + } + // A regular expression to match classes of the internal Spark API's // that we want to skip when finding the call site of a method. private val SPARK_CORE_CLASS_REGEX = diff --git a/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala b/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala new file mode 100644 index 000000000000..79c07f8fbfea --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/LazyTrySuite.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import org.apache.spark.SparkFunSuite + +class LazyTrySuite extends SparkFunSuite{ + test("LazyTry should initialize only once") { + var count = 0 + val lazyVal = LazyTry { + count += 1 + count + } + assert(count == 0) + assert(lazyVal.get == 1) + assert(count == 1) + assert(lazyVal.get == 1) + assert(count == 1) + } + + test("LazyTry should re-throw exceptions") { + val lazyVal = LazyTry { + throw new RuntimeException("test") + } + intercept[RuntimeException] { + lazyVal.get + } + intercept[RuntimeException] { + lazyVal.get + } + } + + test("LazyTry should re-throw exceptions with current caller stack-trace") { + val fileName = Thread.currentThread().getStackTrace()(1).getFileName + val lineNo = Thread.currentThread().getStackTrace()(1).getLineNumber + val lazyVal = LazyTry { + throw new RuntimeException("test") + } + + val e1 = intercept[RuntimeException] { + lazyVal.get // lineNo + 6 + } + assert(e1.getStackTrace + .exists(elem => elem.getFileName == fileName && elem.getLineNumber == lineNo + 6)) + + val e2 = intercept[RuntimeException] { + lazyVal.get // lineNo + 12 + } + assert(e2.getStackTrace + .exists(elem => elem.getFileName == fileName && elem.getLineNumber == lineNo + 12)) + } + + test("LazyTry does not lock containing object") { + class LazyContainer() { + @volatile var aSet = 0 + + val a: LazyTry[Int] = LazyTry { + aSet = 1 + aSet + } + + val b: LazyTry[Int] = LazyTry { + val t = new Thread(new Runnable { + override def run(): Unit = { + assert(a.get == 1) + } + }) + t.start() + t.join() + aSet + } + } + val container = new LazyContainer() + // Nothing is lazy initialized yet + assert(container.aSet == 0) + // This will not deadlock, thread t will initialize a, and update aSet + assert(container.b.get == 1) + assert(container.aSet == 1) + } + + // Scala lazy val tests are added to test for potential changes in the semantics of scala lazy val + + test("Scala lazy val initializing multiple times on error") { + class LazyValError() { + var counter = 0 + lazy val a = { + counter += 1 + throw new RuntimeException("test") + } + } + val lazyValError = new LazyValError() + intercept[RuntimeException] { + lazyValError.a + } + assert(lazyValError.counter == 1) + intercept[RuntimeException] { + lazyValError.a + } + assert(lazyValError.counter == 2) + } + + test("Scala lazy val locking containing object and deadlocking") { + // Note: this will change in scala 3, with different lazy vals not deadlocking with each other. + // https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html + class LazyValContainer() { + @volatile var aSet = 0 + @volatile var t: Thread = _ + + lazy val a = { + aSet = 1 + aSet + } + + lazy val b = { + t = new Thread(new Runnable { + override def run(): Unit = { + assert(a == 1) + } + }) + t.start() + t.join(1000) + aSet + } + } + val container = new LazyValContainer() + // Nothing is lazy initialized yet + assert(container.aSet == 0) + // This will deadlock, because b will take monitor on LazyValContainer, and then thread t + // will wait on that monitor, not able to initialize a. + // b will therefore see aSet == 0. + assert(container.b == 0) + // However, after b finishes initializing, the monitor will be released, and then thread t + // will finish initializing a, and set aSet to 1. + container.t.join() + assert(container.aSet == 1) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 4fe6fcf17f49..a694e08def89 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit import java.util.zip.GZIPOutputStream import scala.collection.mutable.ListBuffer -import scala.util.Random +import scala.util.{Random, Try} import com.google.common.io.Files import org.apache.commons.io.IOUtils @@ -1523,6 +1523,116 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties { conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") assert(Utils.isPushBasedShuffleEnabled(conf, isDriver = true) === false) } + + + private def throwException(): String = { + throw new Exception("test") + } + + private def callDoTry(): Try[String] = { + Utils.doTryWithCallerStacktrace { + throwException() + } + } + + private def callGetTry(t: Try[String]): String = { + Utils.getTryWithCallerStacktrace(t) + } + + private def callGetTryAgain(t: Try[String]): String = { + Utils.getTryWithCallerStacktrace(t) + } + + test("doTryWithCallerStacktrace and getTryWithCallerStacktrace") { + val t = callDoTry() + + val e1 = intercept[Exception] { + callGetTry(t) + } + // Uncomment for manual inspection + // e1.printStackTrace() + // Example: + // java.lang.Exception: test + // at org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640) + // at org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645) + // at scala.util.Try$.apply(Try.scala:213) + // at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586) + // at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1639) + // at org.apache.spark.util.UtilsSuite.callGetTry(UtilsSuite.scala:1650) + // at org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1661) + // <- callGetTry is seen as calling getTryWithCallerStacktrace + + val st1 = e1.getStackTrace + // throwException should be on the stack trace + assert(st1.exists(_.getMethodName == "throwException")) + // callDoTry shouldn't be on the stack trace, but callGetTry should be. + assert(!st1.exists(_.getMethodName == "callDoTry")) + assert(st1.exists(_.getMethodName == "callGetTry")) + + // The original stack trace with callDoTry should be in the suppressed exceptions. + // Example: + // scalastyle:off line.size.limit + // Suppressed: java.lang.Exception: Full stacktrace of original doTryWithCallerStacktrace caller + // at org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640) + // at org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645) + // at scala.util.Try$.apply(Try.scala:213) + // at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586) + // at org.apache.spark.util.UtilsSuite.callDoTry(UtilsSuite.scala:1645) + // at org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1658) + // ... 56 more + // scalastyle:on line.size.limit + val origSt = e1.getSuppressed.find( + _.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_FULL_STACKTRACE) + assert(origSt.isDefined) + assert(origSt.get.getStackTrace.exists(_.getMethodName == "throwException")) + assert(origSt.get.getStackTrace.exists(_.getMethodName == "callDoTry")) + + // The stack trace under Try should be in the suppressed exceptions. + // Example: + // Suppressed: java.lang.Exception: Stacktrace under doTryWithCallerStacktrace + // at org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala: 1640) + // at org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala: 1645) + // at scala.util.Try$.apply(Try.scala: 213) + // at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala: 1586) + val trySt = e1.getSuppressed.find( + _.getMessage == Utils.TRY_WITH_CALLER_STACKTRACE_TRY_STACKTRACE) + assert(trySt.isDefined) + // calls under callDoTry should be present. + assert(trySt.get.getStackTrace.exists(_.getMethodName == "throwException")) + // callDoTry should be removed. + assert(!trySt.get.getStackTrace.exists(_.getMethodName == "callDoTry")) + + val e2 = intercept[Exception] { + callGetTryAgain(t) + } + // Uncomment for manual inspection + // e2.printStackTrace() + // Example: + // java.lang.Exception: test + // at org.apache.spark.util.UtilsSuite.throwException(UtilsSuite.scala:1640) + // at org.apache.spark.util.UtilsSuite.$anonfun$callDoTry$1(UtilsSuite.scala:1645) + // at scala.util.Try$.apply(Try.scala:213) + // at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1586) + // at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1639) + // at org.apache.spark.util.UtilsSuite.callGetTryAgain(UtilsSuite.scala:1654) + // at org.apache.spark.util.UtilsSuite.$anonfun$new$165(UtilsSuite.scala:1711) + // <- callGetTryAgain is seen as calling getTryWithCallerStacktrace + + val st2 = e2.getStackTrace + // throwException should be on the stack trace + assert(st2.exists(_.getMethodName == "throwException")) + // callDoTry shouldn't be on the stack trace, but callGetTryAgain should be. + assert(!st2.exists(_.getMethodName == "callDoTry")) + assert(st2.exists(_.getMethodName == "callGetTryAgain")) + // callGetTry that we called before shouldn't be on the stack trace. + assert(!st2.exists(_.getMethodName == "callGetTry")) + + // Unfortunately, this utility is not able to clone the exception, but modifies it in place, + // so now e1 is also pointing to "callGetTryAgain" instead of "callGetTry". + val st1Again = e1.getStackTrace + assert(st1Again.exists(_.getMethodName == "callGetTryAgain")) + assert(!st1Again.exists(_.getMethodName == "callGetTry")) + } } private class SimpleExtension diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala index dc918e51d055..2115e21f81d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala @@ -60,6 +60,11 @@ case class CollectMetricsExec( override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def resetMetrics(): Unit = { + accumulator.reset() + super.resetMetrics() + } + override protected def doExecute(): RDD[InternalRow] = { val collector = accumulator collector.reset() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7bc770a0c9e3..fb3ec3ad4181 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesSpec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.NextIterator +import org.apache.spark.util.{LazyTry, NextIterator} import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} object SparkPlan { @@ -182,6 +182,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) + @transient + private val executeRDD = LazyTry { + doExecute() + } + /** * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after * preparations. @@ -192,7 +197,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ if (isCanonicalizedPlan) { throw SparkException.internalError("A canonicalized plan is not supposed to be executed.") } - doExecute() + executeRDD.get + } + + private val executeBroadcastBcast = LazyTry { + doExecuteBroadcast() } /** @@ -205,7 +214,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ if (isCanonicalizedPlan) { throw SparkException.internalError("A canonicalized plan is not supposed to be executed.") } - doExecuteBroadcast() + executeBroadcastBcast.get.asInstanceOf[broadcast.Broadcast[T]] + } + + private val executeColumnarRDD = LazyTry { + doExecuteColumnar() } /** @@ -219,7 +232,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ if (isCanonicalizedPlan) { throw SparkException.internalError("A canonicalized plan is not supposed to be executed.") } - doExecuteColumnar() + executeColumnarRDD.get } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index cfcfd282e548..cbd60804b27e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -99,48 +99,6 @@ case class InMemoryTableScanExec( relation.cacheBuilder.serializer.supportsColumnarOutput(relation.schema) } - private lazy val columnarInputRDD: RDD[ColumnarBatch] = { - val numOutputRows = longMetric("numOutputRows") - val buffers = filteredCachedBatches() - relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( - buffers, - relation.output, - attributes, - conf).map { cb => - numOutputRows += cb.numRows() - cb - } - } - - private lazy val inputRDD: RDD[InternalRow] = { - if (enableAccumulatorsForTest) { - readPartitions.setValue(0) - readBatches.setValue(0) - } - - val numOutputRows = longMetric("numOutputRows") - // Using these variables here to avoid serialization of entire objects (if referenced - // directly) within the map Partitions closure. - val relOutput = relation.output - val serializer = relation.cacheBuilder.serializer - - // update SQL metrics - val withMetrics = - filteredCachedBatches().mapPartitionsInternal { iter => - if (enableAccumulatorsForTest && iter.hasNext) { - readPartitions.add(1) - } - iter.map { batch => - if (enableAccumulatorsForTest) { - readBatches.add(1) - } - numOutputRows += batch.numRows - batch - } - } - serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf) - } - override def output: Seq[Attribute] = attributes private def cachedPlan = relation.cachedPlan match { @@ -191,11 +149,47 @@ case class InMemoryTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - inputRDD + // Resulting RDD is cached and reused by SparkPlan.executeRDD + if (enableAccumulatorsForTest) { + readPartitions.setValue(0) + readBatches.setValue(0) + } + + val numOutputRows = longMetric("numOutputRows") + // Using these variables here to avoid serialization of entire objects (if referenced + // directly) within the map Partitions closure. + val relOutput = relation.output + val serializer = relation.cacheBuilder.serializer + + // update SQL metrics + val withMetrics = + filteredCachedBatches().mapPartitionsInternal { iter => + if (enableAccumulatorsForTest && iter.hasNext) { + readPartitions.add(1) + } + iter.map { batch => + if (enableAccumulatorsForTest) { + readBatches.add(1) + } + numOutputRows += batch.numRows + batch + } + } + serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf) } protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { - columnarInputRDD + // Resulting RDD is cached and reused by SparkPlan.executeColumnarRDD + val numOutputRows = longMetric("numOutputRows") + val buffers = filteredCachedBatches() + relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( + buffers, + relation.output, + attributes, + conf).map { cb => + numOutputRows += cb.numRows() + cb + } } override def isMaterialized: Boolean = relation.cacheBuilder.isCachedColumnBuffersLoaded diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 90f00a5035e1..ae11229cd516 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -249,17 +249,10 @@ case class ShuffleExchangeExec( dep } - /** - * Caches the created ShuffleRowRDD so we can reuse that. - */ - private var cachedShuffleRDD: ShuffledRowRDD = null - protected override def doExecute(): RDD[InternalRow] = { - // Returns the same ShuffleRowRDD if this plan is used by multiple plans. - if (cachedShuffleRDD == null) { - cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics) - } - cachedShuffleRDD + // The ShuffleRowRDD will be cached in SparkPlan.executeRDD and reused if this plan is used by + // multiple plans. + new ShuffledRowRDD(shuffleDependency, readMetrics) } override protected def withNewChildInternal(newChild: SparkPlan): ShuffleExchangeExec = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org