spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page (branch-2.2)
Repository: spark Updated Branches: refs/heads/branch-2.2 051ea3a62 -> 1e73ee248 [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page (branch-2.2) ## What changes were proposed in this pull request? Backport https://github.com/apache/spark/pull/22062 to branch-2.2. Just two minor differences in the test: - branch-2.2 doesn't have `SparkOutOfMemoryError`. It's using `OutOfMemoryError` directly. - MockitoSugar is in a different package in old scalatest. ## How was this patch tested? Jenkins Closes #22072 from zsxwing/SPARK-25081-2.2. Authored-by: Shixiong Zhu Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e73ee24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e73ee24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e73ee24 Branch: refs/heads/branch-2.2 Commit: 1e73ee2482c89b819d457f564bd2847102d789fe Parents: 051ea3a Author: Shixiong Zhu Authored: Sat Aug 11 22:54:07 2018 -0700 Committer: Xiao Li Committed: Sat Aug 11 22:54:07 2018 -0700 -- .../shuffle/sort/ShuffleInMemorySorter.java | 12 +- .../sort/ShuffleExternalSorterSuite.scala | 111 +++ 2 files changed, 121 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e73ee24/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index dc36809..ad948e1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -66,7 +66,7 @@ final class ShuffleInMemorySorter { */ private int usableCapacity = 0; - private int initialSize; + private final int initialSize; ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) { this.consumer = consumer; @@ -95,12 +95,20 @@ final class ShuffleInMemorySorter { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; if (consumer != null) { consumer.freeArray(array); + // As `array` has been released, we should set it to `null` to avoid accessing it before + // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing + // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in + // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access + // `ShuffleInMemorySorter` when `allocateArray` throws OutOfMemoryError). + array = null; + usableCapacity = 0; array = consumer.allocateArray(initialSize); usableCapacity = getUsableCapacity(); } -pos = 0; } public void expandPointerArray(LongArray newArray) { http://git-wip-us.apache.org/repos/asf/spark/blob/1e73ee24/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala new file mode 100644 index 000..0cf0d7e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mock.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends
spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page
Repository: spark Updated Branches: refs/heads/branch-2.3 7306ac71d -> 04c652064 [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page ## What changes were proposed in this pull request? This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue. ## How was this patch tested? The new unit test will make JVM crash without the fix. Closes #22062 from zsxwing/SPARK-25081. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit f5aba657396bd4e2e03dd06491a2d169a99592a7) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04c65206 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04c65206 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04c65206 Branch: refs/heads/branch-2.3 Commit: 04c652064861720d991675b7f5b53f2bbca9d14d Parents: 7306ac7 Author: Shixiong Zhu Authored: Fri Aug 10 10:53:44 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 10 10:54:03 2018 -0700 -- .../shuffle/sort/ShuffleInMemorySorter.java | 12 +- .../sort/ShuffleExternalSorterSuite.scala | 111 +++ 2 files changed, 121 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04c65206/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index dc36809..0d06912 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -66,7 +66,7 @@ final class ShuffleInMemorySorter { */ private int usableCapacity = 0; - private int initialSize; + private final int initialSize; ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) { this.consumer = consumer; @@ -95,12 +95,20 @@ final class ShuffleInMemorySorter { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; if (consumer != null) { consumer.freeArray(array); + // As `array` has been released, we should set it to `null` to avoid accessing it before + // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing + // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in + // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access + // `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError). + array = null; + usableCapacity = 0; array = consumer.allocateArray(initialSize); usableCapacity = getUsableCapacity(); } -pos = 0; } public void expandPointerArray(LongArray newArray) { http://git-wip-us.apache.org/repos/asf/spark/blob/04c65206/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala new file mode 100644 index 000..b9f0e87 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache
spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page
Repository: spark Updated Branches: refs/heads/master 91cdab51c -> f5aba6573 [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page ## What changes were proposed in this pull request? This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue. ## How was this patch tested? The new unit test will make JVM crash without the fix. Closes #22062 from zsxwing/SPARK-25081. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5aba657 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5aba657 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5aba657 Branch: refs/heads/master Commit: f5aba657396bd4e2e03dd06491a2d169a99592a7 Parents: 91cdab5 Author: Shixiong Zhu Authored: Fri Aug 10 10:53:44 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 10 10:53:44 2018 -0700 -- .../shuffle/sort/ShuffleInMemorySorter.java | 12 +- .../sort/ShuffleExternalSorterSuite.scala | 111 +++ 2 files changed, 121 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 8f49859..4b48599 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -65,7 +65,7 @@ final class ShuffleInMemorySorter { */ private int usableCapacity = 0; - private int initialSize; + private final int initialSize; ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) { this.consumer = consumer; @@ -94,12 +94,20 @@ final class ShuffleInMemorySorter { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; if (consumer != null) { consumer.freeArray(array); + // As `array` has been released, we should set it to `null` to avoid accessing it before + // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing + // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in + // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access + // `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError). + array = null; + usableCapacity = 0; array = consumer.allocateArray(initialSize); usableCapacity = getUsableCapacity(); } -pos = 0; } public void expandPointerArray(LongArray newArray) { http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala new file mode 100644 index 000..b9f0e87 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file