spark git commit: [SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …
Repository: spark Updated Branches: refs/heads/master 7c7266208 -> 1e978b17d [SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost ⦠Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O. ## How was this patch tested? Tested by running a job on the cluster and could see up to 8% CPU improvement. Author: Sital Kedia Author: Shixiong Zhu Author: Sital Kedia Closes #18317 from sitalkedia/read_ahead_buffer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e978b17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e978b17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e978b17 Branch: refs/heads/master Commit: 1e978b17d63d7ba20368057aa4e65f5ef6e87369 Parents: 7c72662 Author: Sital Kedia Authored: Sun Sep 17 23:15:08 2017 -0700 Committer: Shixiong Zhu Committed: Sun Sep 17 23:15:08 2017 -0700 -- .../apache/spark/io/ReadAheadInputStream.java | 408 +++ .../unsafe/sort/UnsafeSorterSpillReader.java| 21 +- .../spark/io/GenericFileInputStreamSuite.java | 130 ++ .../io/NioBufferedFileInputStreamSuite.java | 135 -- .../spark/io/NioBufferedInputStreamSuite.java | 33 ++ .../spark/io/ReadAheadInputStreamSuite.java | 33 ++ 6 files changed, 621 insertions(+), 139 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java -- diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java new file mode 100644 index 000..618bd42 --- /dev/null +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -0,0 +1,408 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.apache.spark.util.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + pri
spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles
Repository: spark Updated Branches: refs/heads/branch-2.2 309c401a5 -> a86831d61 [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles ## What changes were proposed in this pull request? This PR proposes to improve error message from: ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1000, in show_profiles self.profiler_collector.show_profiles() AttributeError: 'NoneType' object has no attribute 'show_profiles' >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles self.profiler_collector.dump_profiles(path) AttributeError: 'NoneType' object has no attribute 'dump_profiles' ``` to ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1003, in show_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. ``` ## How was this patch tested? Unit tests added in `python/pyspark/tests.py` and manual tests. Author: hyukjinkwon Closes #19260 from HyukjinKwon/profile-errors. (cherry picked from commit 7c7266208a3be984ac1ce53747dc0c3640f4ecac) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a86831d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a86831d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a86831d6 Branch: refs/heads/branch-2.2 Commit: a86831d618b05c789c2cea0afe5488c3234a14bc Parents: 309c401 Author: hyukjinkwon Authored: Mon Sep 18 13:20:11 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 13:20:29 2017 +0900 -- python/pyspark/context.py | 12 ++-- python/pyspark/tests.py | 16 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a86831d6/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 49be76e..ea58b3a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -994,12 +994,20 @@ class SparkContext(object): def show_profiles(self): """ Print the profile stats to stdout """ -self.profiler_collector.show_profiles() +if self.profiler_collector is not None: +self.profiler_collector.show_profiles() +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def dump_profiles(self, path): """ Dump the profile stats into directory `path` """ -self.profiler_collector.dump_profiles(path) +if self.profiler_collector is not None: +self.profiler_collector.dump_profiles(path) +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def getConf(self): conf = SparkConf() http://git-wip-us.apache.org/repos/asf/spark/blob/a86831d6/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 9f47798..6a96aaf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1288,6 +1288,22 @@ class ProfilerTests(PySparkTestCase): rdd.foreach(heavy_foo) +class ProfilerTests2(unittest.TestCase): +def test_profiler_disabled(self): +sc = SparkContext(conf=SparkConf().set("spark.python.profile", "false")) +try: +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.show_profiles()) +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.dump_profiles("/tmp/abc")) +finally: +sc.stop() + + class InputFormatTests(ReusedPySparkTestCase): @classmethod - To unsubscribe, e-mail: commits-unsubscr...@s
spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles
Repository: spark Updated Branches: refs/heads/branch-2.1 99de4b8f5 -> b35136a9e [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles ## What changes were proposed in this pull request? This PR proposes to improve error message from: ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1000, in show_profiles self.profiler_collector.show_profiles() AttributeError: 'NoneType' object has no attribute 'show_profiles' >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles self.profiler_collector.dump_profiles(path) AttributeError: 'NoneType' object has no attribute 'dump_profiles' ``` to ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1003, in show_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. ``` ## How was this patch tested? Unit tests added in `python/pyspark/tests.py` and manual tests. Author: hyukjinkwon Closes #19260 from HyukjinKwon/profile-errors. (cherry picked from commit 7c7266208a3be984ac1ce53747dc0c3640f4ecac) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b35136a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b35136a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b35136a9 Branch: refs/heads/branch-2.1 Commit: b35136a9e4b72b403434c991e111e667cfe9177d Parents: 99de4b8 Author: hyukjinkwon Authored: Mon Sep 18 13:20:11 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 13:20:48 2017 +0900 -- python/pyspark/context.py | 12 ++-- python/pyspark/tests.py | 16 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b35136a9/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5a4c2fa..c091882 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -970,12 +970,20 @@ class SparkContext(object): def show_profiles(self): """ Print the profile stats to stdout """ -self.profiler_collector.show_profiles() +if self.profiler_collector is not None: +self.profiler_collector.show_profiles() +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def dump_profiles(self, path): """ Dump the profile stats into directory `path` """ -self.profiler_collector.dump_profiles(path) +if self.profiler_collector is not None: +self.profiler_collector.dump_profiles(path) +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def getConf(self): conf = SparkConf() http://git-wip-us.apache.org/repos/asf/spark/blob/b35136a9/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index bd21029..61272fe 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1223,6 +1223,22 @@ class ProfilerTests(PySparkTestCase): rdd.foreach(heavy_foo) +class ProfilerTests2(unittest.TestCase): +def test_profiler_disabled(self): +sc = SparkContext(conf=SparkConf().set("spark.python.profile", "false")) +try: +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.show_profiles()) +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.dump_profiles("/tmp/abc")) +finally: +sc.stop() + + class InputFormatTests(ReusedPySparkTestCase): @classmethod - To unsubscribe, e-mail: commits-unsubscr...@s
spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles
Repository: spark Updated Branches: refs/heads/master 6308c65f0 -> 7c7266208 [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles ## What changes were proposed in this pull request? This PR proposes to improve error message from: ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1000, in show_profiles self.profiler_collector.show_profiles() AttributeError: 'NoneType' object has no attribute 'show_profiles' >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles self.profiler_collector.dump_profiles(path) AttributeError: 'NoneType' object has no attribute 'dump_profiles' ``` to ``` >>> sc.show_profiles() Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1003, in show_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. >>> sc.dump_profiles("/tmp/abc") Traceback (most recent call last): File "", line 1, in File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles raise RuntimeError("'spark.python.profile' configuration must be set " RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile. ``` ## How was this patch tested? Unit tests added in `python/pyspark/tests.py` and manual tests. Author: hyukjinkwon Closes #19260 from HyukjinKwon/profile-errors. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c726620 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c726620 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c726620 Branch: refs/heads/master Commit: 7c7266208a3be984ac1ce53747dc0c3640f4ecac Parents: 6308c65 Author: hyukjinkwon Authored: Mon Sep 18 13:20:11 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 13:20:11 2017 +0900 -- python/pyspark/context.py | 12 ++-- python/pyspark/tests.py | 16 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c726620/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a704604..a33f6dc 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -997,12 +997,20 @@ class SparkContext(object): def show_profiles(self): """ Print the profile stats to stdout """ -self.profiler_collector.show_profiles() +if self.profiler_collector is not None: +self.profiler_collector.show_profiles() +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def dump_profiles(self, path): """ Dump the profile stats into directory `path` """ -self.profiler_collector.dump_profiles(path) +if self.profiler_collector is not None: +self.profiler_collector.dump_profiles(path) +else: +raise RuntimeError("'spark.python.profile' configuration must be set " + "to 'true' to enable Python profile.") def getConf(self): conf = SparkConf() http://git-wip-us.apache.org/repos/asf/spark/blob/7c726620/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3c108ec..da99872 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1296,6 +1296,22 @@ class ProfilerTests(PySparkTestCase): rdd.foreach(heavy_foo) +class ProfilerTests2(unittest.TestCase): +def test_profiler_disabled(self): +sc = SparkContext(conf=SparkConf().set("spark.python.profile", "false")) +try: +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.show_profiles()) +self.assertRaisesRegexp( +RuntimeError, +"'spark.python.profile' configuration must be set", +lambda: sc.dump_profiles("/tmp/abc")) +finally: +sc.stop() + + class InputFormatTests(ReusedPySparkTestCase): @classmethod - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present
Repository: spark Updated Branches: refs/heads/branch-2.1 3ae7ab8e8 -> 99de4b8f5 [SPARK-21953] Show both memory and disk bytes spilled if either is present As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden. Author: Andrew Ash Closes #19164 from ash211/patch-3. (cherry picked from commit 6308c65f08b507408033da1f1658144ea8c1491f) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99de4b8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99de4b8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99de4b8f Branch: refs/heads/branch-2.1 Commit: 99de4b8f55ea2f700ad4ac32620217fa43a2cbdb Parents: 3ae7ab8 Author: Andrew Ash Authored: Mon Sep 18 10:42:24 2017 +0800 Committer: Wenchen Fan Committed: Mon Sep 18 10:43:21 2017 +0800 -- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99de4b8f/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 78113ac..cd7b396 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -104,7 +104,7 @@ private[spark] object UIData { def hasOutput: Boolean = outputBytes > 0 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0 -def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled > 0 +def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled > 0 } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present
Repository: spark Updated Branches: refs/heads/branch-2.2 42852bb17 -> 309c401a5 [SPARK-21953] Show both memory and disk bytes spilled if either is present As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden. Author: Andrew Ash Closes #19164 from ash211/patch-3. (cherry picked from commit 6308c65f08b507408033da1f1658144ea8c1491f) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/309c401a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/309c401a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/309c401a Branch: refs/heads/branch-2.2 Commit: 309c401a5b3c76cc1b6b5aef97d03034fe4e1ce4 Parents: 42852bb Author: Andrew Ash Authored: Mon Sep 18 10:42:24 2017 +0800 Committer: Wenchen Fan Committed: Mon Sep 18 10:42:41 2017 +0800 -- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/309c401a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 8bedd07..25aa504 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -108,7 +108,7 @@ private[spark] object UIData { def hasOutput: Boolean = outputBytes > 0 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0 -def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled > 0 +def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled > 0 } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present
Repository: spark Updated Branches: refs/heads/master 6adf67dd1 -> 6308c65f0 [SPARK-21953] Show both memory and disk bytes spilled if either is present As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden. Author: Andrew Ash Closes #19164 from ash211/patch-3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6308c65f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6308c65f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6308c65f Branch: refs/heads/master Commit: 6308c65f08b507408033da1f1658144ea8c1491f Parents: 6adf67d Author: Andrew Ash Authored: Mon Sep 18 10:42:24 2017 +0800 Committer: Wenchen Fan Committed: Mon Sep 18 10:42:24 2017 +0800 -- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6308c65f/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index d9c87f6..5acec0d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -110,7 +110,7 @@ private[spark] object UIData { def hasOutput: Boolean = outputBytes > 0 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0 -def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled > 0 +def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled > 0 } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
Repository: spark Updated Branches: refs/heads/branch-2.1 e49c997fe -> 3ae7ab8e8 [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs ## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67dd14b0ece342bb91adf800df0a7101e038) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae7ab8e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae7ab8e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae7ab8e Branch: refs/heads/branch-2.1 Commit: 3ae7ab8e82446e6d299a3e344beebb76ebf9dc4c Parents: e49c997 Author: Andrew Ray Authored: Mon Sep 18 02:46:27 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 02:47:06 2017 +0900 -- python/pyspark/serializers.py | 6 +- python/pyspark/tests.py | 12 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ae7ab8e/python/pyspark/serializers.py -- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ea5e00e..9bd4e55 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -97,7 +97,7 @@ class Serializer(object): def _load_stream_without_unbatching(self, stream): """ -Return an iterator of deserialized batches (lists) of objects from the input stream. +Return an iterator of deserialized batches (iterable) of objects from the input stream. if the serializer does not operate on batches the default implementation returns an iterator of single element lists. """ @@ -326,6 +326,10 @@ class PairDeserializer(Serializer): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +# For double-zipped RDDs, the batches can be iterators from other PairDeserializer, +# instead of lists. We need to convert them to lists if needed. +key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) +val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch): raise ValueError("Can not deserialize PairRDD with different number of items" " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) http://git-wip-us.apache.org/repos/asf/spark/blob/3ae7ab8e/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 25ed127..bd21029 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -579,6 +579,18 @@ class RDDTests(ReusedPySparkTestCase): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) +def test_zip_chaining(self): +# Tests for SPARK-21985 +rdd = self.sc.parallelize('abc', 2) +self.assertSetEqual( +set(rdd.zip(rdd).zip(rdd).collect()), +set([((x, x), x) for x in 'abc']) +) +self.assertSetEqual( +set(rdd.zip(rdd.zip(rdd)).collect()), +set([(x, (x, x)) for x in 'abc']) +) + def test_deleting_input_files(self): # Regression test for SPARK-1025 tempFile = tempfile.NamedTemporaryFile(delete=False) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
Repository: spark Updated Branches: refs/heads/master f4073020a -> 6adf67dd1 [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs ## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray Closes #19226 from aray/SPARK-21985. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6adf67dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6adf67dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6adf67dd Branch: refs/heads/master Commit: 6adf67dd14b0ece342bb91adf800df0a7101e038 Parents: f407302 Author: Andrew Ray Authored: Mon Sep 18 02:46:27 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 02:46:27 2017 +0900 -- python/pyspark/serializers.py | 6 +- python/pyspark/tests.py | 12 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/serializers.py -- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d5c2a75..660b19a 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -97,7 +97,7 @@ class Serializer(object): def _load_stream_without_unbatching(self, stream): """ -Return an iterator of deserialized batches (lists) of objects from the input stream. +Return an iterator of deserialized batches (iterable) of objects from the input stream. if the serializer does not operate on batches the default implementation returns an iterator of single element lists. """ @@ -343,6 +343,10 @@ class PairDeserializer(Serializer): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +# For double-zipped RDDs, the batches can be iterators from other PairDeserializer, +# instead of lists. We need to convert them to lists if needed. +key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) +val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch): raise ValueError("Can not deserialize PairRDD with different number of items" " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 000dd1e..3c108ec 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) +def test_zip_chaining(self): +# Tests for SPARK-21985 +rdd = self.sc.parallelize('abc', 2) +self.assertSetEqual( +set(rdd.zip(rdd).zip(rdd).collect()), +set([((x, x), x) for x in 'abc']) +) +self.assertSetEqual( +set(rdd.zip(rdd.zip(rdd)).collect()), +set([(x, (x, x)) for x in 'abc']) +) + def test_deleting_input_files(self): # Regression test for SPARK-1025 tempFile = tempfile.NamedTemporaryFile(delete=False) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
Repository: spark Updated Branches: refs/heads/branch-2.2 51e5a821d -> 42852bb17 [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs ## What changes were proposed in this pull request? (edited) Fixes a bug introduced in #16121 In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done. ## How was this patch tested? Additional unit test Author: Andrew Ray Closes #19226 from aray/SPARK-21985. (cherry picked from commit 6adf67dd14b0ece342bb91adf800df0a7101e038) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42852bb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42852bb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42852bb1 Branch: refs/heads/branch-2.2 Commit: 42852bb17121fb8067a4aea3e56d56f76a2e0d1d Parents: 51e5a82 Author: Andrew Ray Authored: Mon Sep 18 02:46:27 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 02:46:47 2017 +0900 -- python/pyspark/serializers.py | 6 +- python/pyspark/tests.py | 12 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/serializers.py -- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ea5e00e..9bd4e55 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -97,7 +97,7 @@ class Serializer(object): def _load_stream_without_unbatching(self, stream): """ -Return an iterator of deserialized batches (lists) of objects from the input stream. +Return an iterator of deserialized batches (iterable) of objects from the input stream. if the serializer does not operate on batches the default implementation returns an iterator of single element lists. """ @@ -326,6 +326,10 @@ class PairDeserializer(Serializer): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): +# For double-zipped RDDs, the batches can be iterators from other PairDeserializer, +# instead of lists. We need to convert them to lists if needed. +key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) +val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch): raise ValueError("Can not deserialize PairRDD with different number of items" " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 20a933e..9f47798 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) +def test_zip_chaining(self): +# Tests for SPARK-21985 +rdd = self.sc.parallelize('abc', 2) +self.assertSetEqual( +set(rdd.zip(rdd).zip(rdd).collect()), +set([((x, x), x) for x in 'abc']) +) +self.assertSetEqual( +set(rdd.zip(rdd.zip(rdd)).collect()), +set([(x, (x, x)) for x in 'abc']) +) + def test_deleting_input_files(self): # Regression test for SPARK-1025 tempFile = tempfile.NamedTemporaryFile(delete=False) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22032][PYSPARK] Speed up StructType conversion
Repository: spark Updated Branches: refs/heads/master 73d906722 -> f4073020a [SPARK-22032][PYSPARK] Speed up StructType conversion ## What changes were proposed in this pull request? StructType.fromInternal is calling f.fromInternal(v) for every field. We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations) Benchmarks (Python profiler) ``` df = spark.range(1000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache() df.count() df.rdd.map(lambda x: x).count() ``` Before ``` 310274584 function calls (300272456 primitive calls) in 1320.684 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 1000 253.4170.000 486.9910.000 types.py:619() 3000 192.2720.000 1009.9860.000 types.py:612(fromInternal) 1 176.1400.000 176.1400.000 types.py:88(fromInternal) 2000 156.8320.000 328.0930.000 types.py:1471(_create_row) 14000 107.2060.008 1237.9170.088 {built-in method loads} 2000 80.1760.000 1090.1620.000 types.py:1468() ``` After ``` 210274584 function calls (200272456 primitive calls) in 1035.974 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 3000 215.8450.000 698.7480.000 types.py:612(fromInternal) 2000 165.0420.000 351.5720.000 types.py:1471(_create_row) 14000 116.8340.008 946.7910.068 {built-in method loads} 2000 87.3260.000 786.0730.000 types.py:1468() 2000 85.4770.000 134.6070.000 types.py:1519(__new__) 1000 65.7770.000 126.7120.000 types.py:619() ``` Main difference is types.py:619() and types.py:88(fromInternal) (which is removed in After) The number of function calls is 100 million less. And performance is 20% better. Benchmark (worst case scenario.) Test ``` df = spark.range(100).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache() df.count() df.rdd.map(lambda x: x).count() ``` Before ``` 31166064 function calls (31163984 primitive calls) in 150.882 seconds ``` After ``` 31166064 function calls (31163984 primitive calls) in 153.220 seconds ``` IMPORTANT: The benchmark was done on top of https://github.com/apache/spark/pull/19246. Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater. ## How was this patch tested? Existing tests. Performance benchmark. Author: Maciej BryÅski Closes #19249 from maver1ck/spark_22032. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4073020 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4073020 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4073020 Branch: refs/heads/master Commit: f4073020adf9752c7d7b39631ec3fa36d6345902 Parents: 73d9067 Author: Maciej BryÅski Authored: Mon Sep 18 02:34:44 2017 +0900 Committer: hyukjinkwon Committed: Mon Sep 18 02:34:44 2017 +0900 -- python/pyspark/sql/types.py | 22 -- 1 file changed, 16 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4073020/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 920cf00..aaf520f 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -483,7 +483,9 @@ class StructType(DataType): self.names = [f.name for f in fields] assert all(isinstance(f, StructField) for f in fields),\ "fields should be a list of StructField" -self._needSerializeAnyField = any(f.needConversion() for f in self) +# Precalculated list of fields that need conversion with fromInternal/toInternal functions +self._needConversion = [f.needConversion() for f in self] +self._needSerializeAnyField = any(self._needConversion) def add(self, field, data_type=None, nullable=True, metadata=None): """ @@ -528,7 +530,9 @@ class StructType(DataType): data_type_f = data_type self.fields.append(StructField(field, data_type_f, nullable, metadata)) self.names.append(field) -self._needSerializeAnyField = any(f.needConversion() for f in se