[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209372979 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- lol --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22062 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209337943 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- > We can also create a TaskContextImpl by hand right? I can. Just to save several lines :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209338026 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] +val taskMetrics = new TaskMetrics +when(taskContext.taskMetrics()).thenReturn(taskMetrics) +val sorter = new ShuffleExternalSorter( + taskMemoryManager, + sc.env.blockManager, + taskContext, + 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes + 1, // numPartitions + conf, + new ShuffleWriteMetrics) +val inMemSorter = { + val field = sorter.getClass.getDeclaredField("inMemSorter") + field.setAccessible(true) + field.get(sorter).asInstanceOf[ShuffleInMemorySorter] +} +// Allocate memory to make the next "insertRecord" call triggers a spill. +val bytes = new Array[Byte](1) +while (inMemSorter.hasSpaceForAnotherRecord) { --- End diff -- > Access to the hasSpaceForAnotherRecord is the only reason why we need reflection right? Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209337484 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -94,12 +94,20 @@ public int numRecords() { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; --- End diff -- We also need to set `usableCapacity` to `0`. Otherwise, https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L343 will not rethrow SparkOutOfMemoryError. ShuffleExternalSorter will keep running and finally touch `array`. Setting `array` to `null` is just for safety so that anything incorrect use will fail with NPE. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209292284 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] --- End diff -- Do we need mockito here? We can also create a `TaskContextImpl` by hand right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209291439 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.lang.{Long => JLong} + +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.memory._ +import org.apache.spark.unsafe.Platform + +class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("nested spill should be no-op") { +val conf = new SparkConf() + .setMaster("local[1]") + .setAppName("ShuffleExternalSorterSuite") + .set("spark.testing", "true") + .set("spark.testing.memory", "1600") + .set("spark.memory.fraction", "1") +sc = new SparkContext(conf) + +val memoryManager = UnifiedMemoryManager(conf, 1) + +var shouldAllocate = false + +// Mock `TaskMemoryManager` to allocate free memory when `shouldAllocate` is true. +// This will trigger a nested spill and expose issues if we don't handle this case properly. +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) { + override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = { +// ExecutionMemoryPool.acquireMemory will wait until there are 400 bytes for a task to use. +// So we leave 400 bytes for the task. +if (shouldAllocate && + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) { + val acquireExecutionMemoryMethod = +memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head + acquireExecutionMemoryMethod.invoke( +memoryManager, +JLong.valueOf( + memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400), +JLong.valueOf(1L), // taskAttemptId +MemoryMode.ON_HEAP + ).asInstanceOf[java.lang.Long] +} +super.acquireExecutionMemory(required, consumer) + } +} +val taskContext = mock[TaskContext] +val taskMetrics = new TaskMetrics +when(taskContext.taskMetrics()).thenReturn(taskMetrics) +val sorter = new ShuffleExternalSorter( + taskMemoryManager, + sc.env.blockManager, + taskContext, + 100, // initialSize - This will require ShuffleInMemorySorter to acquire at least 800 bytes + 1, // numPartitions + conf, + new ShuffleWriteMetrics) +val inMemSorter = { + val field = sorter.getClass.getDeclaredField("inMemSorter") + field.setAccessible(true) + field.get(sorter).asInstanceOf[ShuffleInMemorySorter] +} +// Allocate memory to make the next "insertRecord" call triggers a spill. +val bytes = new Array[Byte](1) +while (inMemSorter.hasSpaceForAnotherRecord) { --- End diff -- Access to the `hasSpaceForAnotherRecord` is the only reason why we need reflection right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22062#discussion_r209262151 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -94,12 +94,20 @@ public int numRecords() { } public void reset() { +// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op. +pos = 0; --- End diff -- For my understanding: this is enough to fix the actual issue here right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22062 [SPARK-25081][Core]Nested spill in ShuffleExternalSorter should not access released memory page ## What changes were proposed in this pull request? This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907). "allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I have seen: - JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address) - java.lang.IllegalArgumentException: Comparison method violates its general contract! - java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384) - java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632 This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue. ## How was this patch tested? The new unit test will make JVM crash without the fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25081 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22062.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22062 commit 54799cae8ef0727988bbb863d326ea61b4d9ae72 Author: Shixiong Zhu Date: 2018-08-10T00:02:33Z Nested spill in ShuffleExternalSorter should not access released memory page --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org