[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/21424 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21424#discussion_r191118494 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -106,11 +108,20 @@ private[execution] object HashedRelation { 1), 0) } - -if (key.length == 1 && key.head.dataType == LongType) { - LongHashedRelation(input, key, sizeEstimate, mm) -} else { - UnsafeHashedRelation(input, key, sizeEstimate, mm) +try { + if (key.length == 1 && key.head.dataType == LongType) { +LongHashedRelation(input, key, sizeEstimate, mm) + } else { +UnsafeHashedRelation(input, key, sizeEstimate, mm) + } +} catch { + case oe: SparkOutOfMemoryError => +throw new SparkOutOfMemoryError(s"If this SparkOutOfMemoryError happens in Spark driver," + --- End diff -- it seems we don't need to change anything, maybe just add some comments to say where OOM can occur, i.e. `RDD#collect` and `BroadcastMode#transform` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/21424#discussion_r191107018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -106,11 +108,20 @@ private[execution] object HashedRelation { 1), 0) } - -if (key.length == 1 && key.head.dataType == LongType) { - LongHashedRelation(input, key, sizeEstimate, mm) -} else { - UnsafeHashedRelation(input, key, sizeEstimate, mm) +try { + if (key.length == 1 && key.head.dataType == LongType) { +LongHashedRelation(input, key, sizeEstimate, mm) + } else { +UnsafeHashedRelation(input, key, sizeEstimate, mm) + } +} catch { + case oe: SparkOutOfMemoryError => +throw new SparkOutOfMemoryError(s"If this SparkOutOfMemoryError happens in Spark driver," + --- End diff -- So, I change back ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21424#discussion_r191104413 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -106,11 +108,20 @@ private[execution] object HashedRelation { 1), 0) } - -if (key.length == 1 && key.head.dataType == LongType) { - LongHashedRelation(input, key, sizeEstimate, mm) -} else { - UnsafeHashedRelation(input, key, sizeEstimate, mm) +try { + if (key.length == 1 && key.head.dataType == LongType) { +LongHashedRelation(input, key, sizeEstimate, mm) + } else { +UnsafeHashedRelation(input, key, sizeEstimate, mm) + } +} catch { + case oe: SparkOutOfMemoryError => +throw new SparkOutOfMemoryError(s"If this SparkOutOfMemoryError happens in Spark driver," + --- End diff -- ah i see. So the `SparkOutOfMemoryError` is thrown by `BytesToBytesMap`, we need to catch and rethrow it to attach the error message anyway. I also found that we may throw OOM when calling `child.executeCollectIterator` which calls `RDD#collect`, seems the previous code is corrected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21424#discussion_r190577287 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -115,9 +116,9 @@ case class BroadcastExchangeExec( // SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. - case oe: OutOfMemoryError => + case oe: SparkOutOfMemoryError => throw new SparkFatalException( - new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + + new SparkOutOfMemoryError(s"Not enough memory to build and broadcast the table to " + --- End diff -- since we fully control the creation of `SparkOutOfMemoryError`, can we move the error message to where we throw `SparkOutOfMemoryError` when building hash relation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21424: [SPARK-24379] BroadcastExchangeExec should catch ...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/21424 [SPARK-24379] BroadcastExchangeExec should catch SparkOutOfMemory and re-throw SparkFatalException, which wraps SparkOutOfMemory inside. ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/20014, Spark won't fails the entire executor but only fails the task suffering `SparkOutOfMemoryError`. After https://github.com/apache/spark/pull/21342, `BroadcastExchangeExec` try-catch `OutOfMemoryError`. Think about below scenario: 1. `SparkOutOfMemoryError`(subclass of `OutOfMemoryError`) is thrown in `scala.concurrent.Future`; 2. `SparkOutOfMemoryError` is caught and an `OutOfMemoryError` is wrapped in `SparkFatalException` and re-thrown; 3. `ThreadUtils.awaitResult` catches `SparkFatalException` and a `OutOfMemoryError` is thrown; 4. The `OutOfMemoryErro`r will go to `SparkUncaughtExceptionHandler.uncaughtException` and Executor fails. So it makes more sense to catch `SparkOutOfMemory` and re-throw `SparkFatalException`, which wraps `SparkOutOfMemory` inside. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-24379 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21424.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21424 commit aa10470b7b09a100ee80afedb29b24548fbe5512 Author: jinxingDate: 2018-05-24T11:51:40Z [SPARK-24379] BroadcastExchangeExec should catch SparkOutOfMemory and re-throw SparkFatalException, which wraps SparkOutOfMemory inside. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org