spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page (branch-2.2)

2018-08-11 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 051ea3a62 -> 1e73ee248


[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access 
released memory page (branch-2.2)

## What changes were proposed in this pull request?

Backport https://github.com/apache/spark/pull/22062 to branch-2.2. Just two 
minor differences in the test:

- branch-2.2 doesn't have `SparkOutOfMemoryError`. It's using 
`OutOfMemoryError` directly.
- MockitoSugar is in a different package in old scalatest.

## How was this patch tested?

Jenkins

Closes #22072 from zsxwing/SPARK-25081-2.2.

Authored-by: Shixiong Zhu 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e73ee24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e73ee24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e73ee24

Branch: refs/heads/branch-2.2
Commit: 1e73ee2482c89b819d457f564bd2847102d789fe
Parents: 051ea3a
Author: Shixiong Zhu 
Authored: Sat Aug 11 22:54:07 2018 -0700
Committer: Xiao Li 
Committed: Sat Aug 11 22:54:07 2018 -0700

--
 .../shuffle/sort/ShuffleInMemorySorter.java |  12 +-
 .../sort/ShuffleExternalSorterSuite.scala   | 111 +++
 2 files changed, 121 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1e73ee24/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index dc36809..ad948e1 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -66,7 +66,7 @@ final class ShuffleInMemorySorter {
*/
   private int usableCapacity = 0;
 
-  private int initialSize;
+  private final int initialSize;
 
   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean 
useRadixSort) {
 this.consumer = consumer;
@@ -95,12 +95,20 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below `allocateArray` 
will be no-op.
+pos = 0;
 if (consumer != null) {
   consumer.freeArray(array);
+  // As `array` has been released, we should set it to  `null` to avoid 
accessing it before
+  // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid 
any codes writing
+  // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
+  // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to 
access
+  // `ShuffleInMemorySorter` when `allocateArray` throws OutOfMemoryError).
+  array = null;
+  usableCapacity = 0;
   array = consumer.allocateArray(initialSize);
   usableCapacity = getUsableCapacity();
 }
-pos = 0;
   }
 
   public void expandPointerArray(LongArray newArray) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1e73ee24/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
new file mode 100644
index 000..0cf0d7e
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends 

spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page

2018-08-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 7306ac71d -> 04c652064


[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access 
released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general 
contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.

Closes #22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit f5aba657396bd4e2e03dd06491a2d169a99592a7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04c65206
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04c65206
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04c65206

Branch: refs/heads/branch-2.3
Commit: 04c652064861720d991675b7f5b53f2bbca9d14d
Parents: 7306ac7
Author: Shixiong Zhu 
Authored: Fri Aug 10 10:53:44 2018 -0700
Committer: Shixiong Zhu 
Committed: Fri Aug 10 10:54:03 2018 -0700

--
 .../shuffle/sort/ShuffleInMemorySorter.java |  12 +-
 .../sort/ShuffleExternalSorterSuite.scala   | 111 +++
 2 files changed, 121 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04c65206/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index dc36809..0d06912 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -66,7 +66,7 @@ final class ShuffleInMemorySorter {
*/
   private int usableCapacity = 0;
 
-  private int initialSize;
+  private final int initialSize;
 
   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean 
useRadixSort) {
 this.consumer = consumer;
@@ -95,12 +95,20 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below `allocateArray` 
will be no-op.
+pos = 0;
 if (consumer != null) {
   consumer.freeArray(array);
+  // As `array` has been released, we should set it to  `null` to avoid 
accessing it before
+  // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid 
any codes writing
+  // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
+  // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to 
access
+  // `ShuffleInMemorySorter` when `allocateArray` throws 
SparkOutOfMemoryError).
+  array = null;
+  usableCapacity = 0;
   array = consumer.allocateArray(initialSize);
   usableCapacity = getUsableCapacity();
 }
-pos = 0;
   }
 
   public void expandPointerArray(LongArray newArray) {

http://git-wip-us.apache.org/repos/asf/spark/blob/04c65206/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
new file mode 100644
index 000..b9f0e87
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache 

spark git commit: [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page

2018-08-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 91cdab51c -> f5aba6573


[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access 
released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in 
[ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99)
 may trigger a spill and cause ShuffleInMemorySorter access the released 
`array`. Another task may get the same memory page from the pool. This will 
cause two tasks access the same memory page. When a task reads memory written 
by another task, many types of failures may happen. Here are some examples I  
have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly 
allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points 
to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general 
contract!
- java.lang.NullPointerException at 
org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size 
-536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling 
`allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.

Closes #22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5aba657
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5aba657
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5aba657

Branch: refs/heads/master
Commit: f5aba657396bd4e2e03dd06491a2d169a99592a7
Parents: 91cdab5
Author: Shixiong Zhu 
Authored: Fri Aug 10 10:53:44 2018 -0700
Committer: Shixiong Zhu 
Committed: Fri Aug 10 10:53:44 2018 -0700

--
 .../shuffle/sort/ShuffleInMemorySorter.java |  12 +-
 .../sort/ShuffleExternalSorterSuite.scala   | 111 +++
 2 files changed, 121 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 8f49859..4b48599 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -65,7 +65,7 @@ final class ShuffleInMemorySorter {
*/
   private int usableCapacity = 0;
 
-  private int initialSize;
+  private final int initialSize;
 
   ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean 
useRadixSort) {
 this.consumer = consumer;
@@ -94,12 +94,20 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below `allocateArray` 
will be no-op.
+pos = 0;
 if (consumer != null) {
   consumer.freeArray(array);
+  // As `array` has been released, we should set it to  `null` to avoid 
accessing it before
+  // `allocateArray` returns. `usableCapacity` is also set to `0` to avoid 
any codes writing
+  // data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
+  // ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to 
access
+  // `ShuffleInMemorySorter` when `allocateArray` throws 
SparkOutOfMemoryError).
+  array = null;
+  usableCapacity = 0;
   array = consumer.allocateArray(initialSize);
   usableCapacity = getUsableCapacity();
 }
-pos = 0;
   }
 
   public void expandPointerArray(LongArray newArray) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f5aba657/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
new file mode 100644
index 000..b9f0e87
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file