spark git commit: [SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …

2017-09-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 7c7266208 -> 1e978b17d


[SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …

Profiling some of our big jobs, we see that around 30% of the time is being 
spent in reading the spill files from disk. In order to amortize the disk IO 
cost, the idea is to implement a read ahead input stream which asynchronously 
reads ahead from the underlying input stream when specified amount of data has 
been read from the current buffer. It does it by maintaining two buffer - 
active buffer and read ahead buffer. The active buffer contains data which 
should be returned when a read() call is issued. The read-ahead buffer is used 
to asynchronously read from the underlying input stream and once the active 
buffer is exhausted, we flip the two buffers so that we can start reading from 
the read ahead buffer without being blocked in disk I/O.

## How was this patch tested?

Tested by running a job on the cluster and could see up to 8% CPU improvement.

Author: Sital Kedia 
Author: Shixiong Zhu 
Author: Sital Kedia 

Closes #18317 from sitalkedia/read_ahead_buffer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e978b17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e978b17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e978b17

Branch: refs/heads/master
Commit: 1e978b17d63d7ba20368057aa4e65f5ef6e87369
Parents: 7c72662
Author: Sital Kedia 
Authored: Sun Sep 17 23:15:08 2017 -0700
Committer: Shixiong Zhu 
Committed: Sun Sep 17 23:15:08 2017 -0700

--
 .../apache/spark/io/ReadAheadInputStream.java   | 408 +++
 .../unsafe/sort/UnsafeSorterSpillReader.java|  21 +-
 .../spark/io/GenericFileInputStreamSuite.java   | 130 ++
 .../io/NioBufferedFileInputStreamSuite.java | 135 --
 .../spark/io/NioBufferedInputStreamSuite.java   |  33 ++
 .../spark/io/ReadAheadInputStreamSuite.java |  33 ++
 6 files changed, 621 insertions(+), 139 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
--
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
new file mode 100644
index 000..618bd42
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.spark.util.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead from 
the underlying input
+ * stream when specified amount of data has been read from the current buffer. 
It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer contains 
data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip the 
two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ReadAheadInputStream.class);
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  pri

spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 309c401a5 -> a86831d61


[SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

## What changes were proposed in this pull request?

This PR proposes to improve error message from:

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1000, in show_profiles
self.profiler_collector.show_profiles()
AttributeError: 'NoneType' object has no attribute 'show_profiles'
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles
self.profiler_collector.dump_profiles(path)
AttributeError: 'NoneType' object has no attribute 'dump_profiles'
```

to

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1003, in show_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
```

## How was this patch tested?

Unit tests added in `python/pyspark/tests.py` and manual tests.

Author: hyukjinkwon 

Closes #19260 from HyukjinKwon/profile-errors.

(cherry picked from commit 7c7266208a3be984ac1ce53747dc0c3640f4ecac)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a86831d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a86831d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a86831d6

Branch: refs/heads/branch-2.2
Commit: a86831d618b05c789c2cea0afe5488c3234a14bc
Parents: 309c401
Author: hyukjinkwon 
Authored: Mon Sep 18 13:20:11 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 13:20:29 2017 +0900

--
 python/pyspark/context.py | 12 ++--
 python/pyspark/tests.py   | 16 
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a86831d6/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 49be76e..ea58b3a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -994,12 +994,20 @@ class SparkContext(object):
 
 def show_profiles(self):
 """ Print the profile stats to stdout """
-self.profiler_collector.show_profiles()
+if self.profiler_collector is not None:
+self.profiler_collector.show_profiles()
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def dump_profiles(self, path):
 """ Dump the profile stats into directory `path`
 """
-self.profiler_collector.dump_profiles(path)
+if self.profiler_collector is not None:
+self.profiler_collector.dump_profiles(path)
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def getConf(self):
 conf = SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/a86831d6/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 9f47798..6a96aaf 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1288,6 +1288,22 @@ class ProfilerTests(PySparkTestCase):
 rdd.foreach(heavy_foo)
 
 
+class ProfilerTests2(unittest.TestCase):
+def test_profiler_disabled(self):
+sc = SparkContext(conf=SparkConf().set("spark.python.profile", 
"false"))
+try:
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.show_profiles())
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.dump_profiles("/tmp/abc"))
+finally:
+sc.stop()
+
+
 class InputFormatTests(ReusedPySparkTestCase):
 
 @classmethod


-
To unsubscribe, e-mail: commits-unsubscr...@s

spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 99de4b8f5 -> b35136a9e


[SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

## What changes were proposed in this pull request?

This PR proposes to improve error message from:

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1000, in show_profiles
self.profiler_collector.show_profiles()
AttributeError: 'NoneType' object has no attribute 'show_profiles'
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles
self.profiler_collector.dump_profiles(path)
AttributeError: 'NoneType' object has no attribute 'dump_profiles'
```

to

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1003, in show_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
```

## How was this patch tested?

Unit tests added in `python/pyspark/tests.py` and manual tests.

Author: hyukjinkwon 

Closes #19260 from HyukjinKwon/profile-errors.

(cherry picked from commit 7c7266208a3be984ac1ce53747dc0c3640f4ecac)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b35136a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b35136a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b35136a9

Branch: refs/heads/branch-2.1
Commit: b35136a9e4b72b403434c991e111e667cfe9177d
Parents: 99de4b8
Author: hyukjinkwon 
Authored: Mon Sep 18 13:20:11 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 13:20:48 2017 +0900

--
 python/pyspark/context.py | 12 ++--
 python/pyspark/tests.py   | 16 
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b35136a9/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5a4c2fa..c091882 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -970,12 +970,20 @@ class SparkContext(object):
 
 def show_profiles(self):
 """ Print the profile stats to stdout """
-self.profiler_collector.show_profiles()
+if self.profiler_collector is not None:
+self.profiler_collector.show_profiles()
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def dump_profiles(self, path):
 """ Dump the profile stats into directory `path`
 """
-self.profiler_collector.dump_profiles(path)
+if self.profiler_collector is not None:
+self.profiler_collector.dump_profiles(path)
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def getConf(self):
 conf = SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/b35136a9/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index bd21029..61272fe 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1223,6 +1223,22 @@ class ProfilerTests(PySparkTestCase):
 rdd.foreach(heavy_foo)
 
 
+class ProfilerTests2(unittest.TestCase):
+def test_profiler_disabled(self):
+sc = SparkContext(conf=SparkConf().set("spark.python.profile", 
"false"))
+try:
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.show_profiles())
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.dump_profiles("/tmp/abc"))
+finally:
+sc.stop()
+
+
 class InputFormatTests(ReusedPySparkTestCase):
 
 @classmethod


-
To unsubscribe, e-mail: commits-unsubscr...@s

spark git commit: [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 6308c65f0 -> 7c7266208


[SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles

## What changes were proposed in this pull request?

This PR proposes to improve error message from:

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1000, in show_profiles
self.profiler_collector.show_profiles()
AttributeError: 'NoneType' object has no attribute 'show_profiles'
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles
self.profiler_collector.dump_profiles(path)
AttributeError: 'NoneType' object has no attribute 'dump_profiles'
```

to

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1003, in show_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "", line 1, in 
  File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles
raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to 
enable Python profile.
```

## How was this patch tested?

Unit tests added in `python/pyspark/tests.py` and manual tests.

Author: hyukjinkwon 

Closes #19260 from HyukjinKwon/profile-errors.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c726620
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c726620
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c726620

Branch: refs/heads/master
Commit: 7c7266208a3be984ac1ce53747dc0c3640f4ecac
Parents: 6308c65
Author: hyukjinkwon 
Authored: Mon Sep 18 13:20:11 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 13:20:11 2017 +0900

--
 python/pyspark/context.py | 12 ++--
 python/pyspark/tests.py   | 16 
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c726620/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a704604..a33f6dc 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -997,12 +997,20 @@ class SparkContext(object):
 
 def show_profiles(self):
 """ Print the profile stats to stdout """
-self.profiler_collector.show_profiles()
+if self.profiler_collector is not None:
+self.profiler_collector.show_profiles()
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def dump_profiles(self, path):
 """ Dump the profile stats into directory `path`
 """
-self.profiler_collector.dump_profiles(path)
+if self.profiler_collector is not None:
+self.profiler_collector.dump_profiles(path)
+else:
+raise RuntimeError("'spark.python.profile' configuration must be 
set "
+   "to 'true' to enable Python profile.")
 
 def getConf(self):
 conf = SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/7c726620/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3c108ec..da99872 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1296,6 +1296,22 @@ class ProfilerTests(PySparkTestCase):
 rdd.foreach(heavy_foo)
 
 
+class ProfilerTests2(unittest.TestCase):
+def test_profiler_disabled(self):
+sc = SparkContext(conf=SparkConf().set("spark.python.profile", 
"false"))
+try:
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.show_profiles())
+self.assertRaisesRegexp(
+RuntimeError,
+"'spark.python.profile' configuration must be set",
+lambda: sc.dump_profiles("/tmp/abc"))
+finally:
+sc.stop()
+
+
 class InputFormatTests(ReusedPySparkTestCase):
 
 @classmethod


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present

2017-09-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3ae7ab8e8 -> 99de4b8f5


[SPARK-21953] Show both memory and disk bytes spilled if either is present

As written now, there must be both memory and disk bytes spilled to show either 
of them. If there is only one of those types of spill recorded, it will be 
hidden.

Author: Andrew Ash 

Closes #19164 from ash211/patch-3.

(cherry picked from commit 6308c65f08b507408033da1f1658144ea8c1491f)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99de4b8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99de4b8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99de4b8f

Branch: refs/heads/branch-2.1
Commit: 99de4b8f55ea2f700ad4ac32620217fa43a2cbdb
Parents: 3ae7ab8
Author: Andrew Ash 
Authored: Mon Sep 18 10:42:24 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Sep 18 10:43:21 2017 +0800

--
 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99de4b8f/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 78113ac..cd7b396 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -104,7 +104,7 @@ private[spark] object UIData {
 def hasOutput: Boolean = outputBytes > 0
 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0
 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0
-def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled 
> 0
+def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled 
> 0
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present

2017-09-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 42852bb17 -> 309c401a5


[SPARK-21953] Show both memory and disk bytes spilled if either is present

As written now, there must be both memory and disk bytes spilled to show either 
of them. If there is only one of those types of spill recorded, it will be 
hidden.

Author: Andrew Ash 

Closes #19164 from ash211/patch-3.

(cherry picked from commit 6308c65f08b507408033da1f1658144ea8c1491f)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/309c401a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/309c401a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/309c401a

Branch: refs/heads/branch-2.2
Commit: 309c401a5b3c76cc1b6b5aef97d03034fe4e1ce4
Parents: 42852bb
Author: Andrew Ash 
Authored: Mon Sep 18 10:42:24 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Sep 18 10:42:41 2017 +0800

--
 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/309c401a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 8bedd07..25aa504 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -108,7 +108,7 @@ private[spark] object UIData {
 def hasOutput: Boolean = outputBytes > 0
 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0
 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0
-def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled 
> 0
+def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled 
> 0
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21953] Show both memory and disk bytes spilled if either is present

2017-09-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 6adf67dd1 -> 6308c65f0


[SPARK-21953] Show both memory and disk bytes spilled if either is present

As written now, there must be both memory and disk bytes spilled to show either 
of them. If there is only one of those types of spill recorded, it will be 
hidden.

Author: Andrew Ash 

Closes #19164 from ash211/patch-3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6308c65f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6308c65f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6308c65f

Branch: refs/heads/master
Commit: 6308c65f08b507408033da1f1658144ea8c1491f
Parents: 6adf67d
Author: Andrew Ash 
Authored: Mon Sep 18 10:42:24 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Sep 18 10:42:24 2017 +0800

--
 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6308c65f/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index d9c87f6..5acec0d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -110,7 +110,7 @@ private[spark] object UIData {
 def hasOutput: Boolean = outputBytes > 0
 def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0
 def hasShuffleWrite: Boolean = shuffleWriteBytes > 0
-def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled 
> 0
+def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled 
> 0
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e49c997fe -> 3ae7ab8e8


[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do 
not have `__len__` already) so that we can check that they are the same size. 
Normally they already are lists so this should not have a performance impact, 
but this is needed when repeated `zip`'s are done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray 

Closes #19226 from aray/SPARK-21985.

(cherry picked from commit 6adf67dd14b0ece342bb91adf800df0a7101e038)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae7ab8e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae7ab8e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae7ab8e

Branch: refs/heads/branch-2.1
Commit: 3ae7ab8e82446e6d299a3e344beebb76ebf9dc4c
Parents: e49c997
Author: Andrew Ray 
Authored: Mon Sep 18 02:46:27 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 02:47:06 2017 +0900

--
 python/pyspark/serializers.py |  6 +-
 python/pyspark/tests.py   | 12 
 2 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ae7ab8e/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ea5e00e..9bd4e55 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -97,7 +97,7 @@ class Serializer(object):
 
 def _load_stream_without_unbatching(self, stream):
 """
-Return an iterator of deserialized batches (lists) of objects from the 
input stream.
+Return an iterator of deserialized batches (iterable) of objects from 
the input stream.
 if the serializer does not operate on batches the default 
implementation returns an
 iterator of single element lists.
 """
@@ -326,6 +326,10 @@ class PairDeserializer(Serializer):
 key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+# For double-zipped RDDs, the batches can be iterators from other 
PairDeserializer,
+# instead of lists. We need to convert them to lists if needed.
+key_batch = key_batch if hasattr(key_batch, '__len__') else 
list(key_batch)
+val_batch = val_batch if hasattr(val_batch, '__len__') else 
list(val_batch)
 if len(key_batch) != len(val_batch):
 raise ValueError("Can not deserialize PairRDD with different 
number of items"
  " in batches: (%d, %d)" % (len(key_batch), 
len(val_batch)))

http://git-wip-us.apache.org/repos/asf/spark/blob/3ae7ab8e/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 25ed127..bd21029 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -579,6 +579,18 @@ class RDDTests(ReusedPySparkTestCase):
 set([(x, (y, y)) for x in range(10) for y in range(10)])
 )
 
+def test_zip_chaining(self):
+# Tests for SPARK-21985
+rdd = self.sc.parallelize('abc', 2)
+self.assertSetEqual(
+set(rdd.zip(rdd).zip(rdd).collect()),
+set([((x, x), x) for x in 'abc'])
+)
+self.assertSetEqual(
+set(rdd.zip(rdd.zip(rdd)).collect()),
+set([(x, (x, x)) for x in 'abc'])
+)
+
 def test_deleting_input_files(self):
 # Regression test for SPARK-1025
 tempFile = tempfile.NamedTemporaryFile(delete=False)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master f4073020a -> 6adf67dd1


[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do 
not have `__len__` already) so that we can check that they are the same size. 
Normally they already are lists so this should not have a performance impact, 
but this is needed when repeated `zip`'s are done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray 

Closes #19226 from aray/SPARK-21985.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6adf67dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6adf67dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6adf67dd

Branch: refs/heads/master
Commit: 6adf67dd14b0ece342bb91adf800df0a7101e038
Parents: f407302
Author: Andrew Ray 
Authored: Mon Sep 18 02:46:27 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 02:46:27 2017 +0900

--
 python/pyspark/serializers.py |  6 +-
 python/pyspark/tests.py   | 12 
 2 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index d5c2a75..660b19a 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -97,7 +97,7 @@ class Serializer(object):
 
 def _load_stream_without_unbatching(self, stream):
 """
-Return an iterator of deserialized batches (lists) of objects from the 
input stream.
+Return an iterator of deserialized batches (iterable) of objects from 
the input stream.
 if the serializer does not operate on batches the default 
implementation returns an
 iterator of single element lists.
 """
@@ -343,6 +343,10 @@ class PairDeserializer(Serializer):
 key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+# For double-zipped RDDs, the batches can be iterators from other 
PairDeserializer,
+# instead of lists. We need to convert them to lists if needed.
+key_batch = key_batch if hasattr(key_batch, '__len__') else 
list(key_batch)
+val_batch = val_batch if hasattr(val_batch, '__len__') else 
list(val_batch)
 if len(key_batch) != len(val_batch):
 raise ValueError("Can not deserialize PairRDD with different 
number of items"
  " in batches: (%d, %d)" % (len(key_batch), 
len(val_batch)))

http://git-wip-us.apache.org/repos/asf/spark/blob/6adf67dd/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 000dd1e..3c108ec 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase):
 set([(x, (y, y)) for x in range(10) for y in range(10)])
 )
 
+def test_zip_chaining(self):
+# Tests for SPARK-21985
+rdd = self.sc.parallelize('abc', 2)
+self.assertSetEqual(
+set(rdd.zip(rdd).zip(rdd).collect()),
+set([((x, x), x) for x in 'abc'])
+)
+self.assertSetEqual(
+set(rdd.zip(rdd.zip(rdd)).collect()),
+set([(x, (x, x)) for x in 'abc'])
+)
+
 def test_deleting_input_files(self):
 # Regression test for SPARK-1025
 tempFile = tempfile.NamedTemporaryFile(delete=False)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 51e5a821d -> 42852bb17


[SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs

## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do 
not have `__len__` already) so that we can check that they are the same size. 
Normally they already are lists so this should not have a performance impact, 
but this is needed when repeated `zip`'s are done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray 

Closes #19226 from aray/SPARK-21985.

(cherry picked from commit 6adf67dd14b0ece342bb91adf800df0a7101e038)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42852bb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42852bb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42852bb1

Branch: refs/heads/branch-2.2
Commit: 42852bb17121fb8067a4aea3e56d56f76a2e0d1d
Parents: 51e5a82
Author: Andrew Ray 
Authored: Mon Sep 18 02:46:27 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 02:46:47 2017 +0900

--
 python/pyspark/serializers.py |  6 +-
 python/pyspark/tests.py   | 12 
 2 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/serializers.py
--
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ea5e00e..9bd4e55 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -97,7 +97,7 @@ class Serializer(object):
 
 def _load_stream_without_unbatching(self, stream):
 """
-Return an iterator of deserialized batches (lists) of objects from the 
input stream.
+Return an iterator of deserialized batches (iterable) of objects from 
the input stream.
 if the serializer does not operate on batches the default 
implementation returns an
 iterator of single element lists.
 """
@@ -326,6 +326,10 @@ class PairDeserializer(Serializer):
 key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
 val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
 for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
+# For double-zipped RDDs, the batches can be iterators from other 
PairDeserializer,
+# instead of lists. We need to convert them to lists if needed.
+key_batch = key_batch if hasattr(key_batch, '__len__') else 
list(key_batch)
+val_batch = val_batch if hasattr(val_batch, '__len__') else 
list(val_batch)
 if len(key_batch) != len(val_batch):
 raise ValueError("Can not deserialize PairRDD with different 
number of items"
  " in batches: (%d, %d)" % (len(key_batch), 
len(val_batch)))

http://git-wip-us.apache.org/repos/asf/spark/blob/42852bb1/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 20a933e..9f47798 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -644,6 +644,18 @@ class RDDTests(ReusedPySparkTestCase):
 set([(x, (y, y)) for x in range(10) for y in range(10)])
 )
 
+def test_zip_chaining(self):
+# Tests for SPARK-21985
+rdd = self.sc.parallelize('abc', 2)
+self.assertSetEqual(
+set(rdd.zip(rdd).zip(rdd).collect()),
+set([((x, x), x) for x in 'abc'])
+)
+self.assertSetEqual(
+set(rdd.zip(rdd.zip(rdd)).collect()),
+set([(x, (x, x)) for x in 'abc'])
+)
+
 def test_deleting_input_files(self):
 # Regression test for SPARK-1025
 tempFile = tempfile.NamedTemporaryFile(delete=False)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22032][PYSPARK] Speed up StructType conversion

2017-09-17 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 73d906722 -> f4073020a


[SPARK-22032][PYSPARK] Speed up StructType conversion

## What changes were proposed in this pull request?

StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function 
calls. (its calculated once per StructType and used in per record calculations)

Benchmarks (Python profiler)
```
df = spark.range(1000).selectExpr("id as id0", "id as id1", "id as id2", 
"id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", 
"id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 1000  253.4170.000  486.9910.000 types.py:619()
 3000  192.2720.000 1009.9860.000 types.py:612(fromInternal)
1  176.1400.000  176.1400.000 types.py:88(fromInternal)
 2000  156.8320.000  328.0930.000 types.py:1471(_create_row)
14000  107.2060.008 1237.9170.088 {built-in method loads}
 2000   80.1760.000 1090.1620.000 types.py:1468()
```

After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 3000  215.8450.000  698.7480.000 types.py:612(fromInternal)
 2000  165.0420.000  351.5720.000 types.py:1471(_create_row)
14000  116.8340.008  946.7910.068 {built-in method loads}
 2000   87.3260.000  786.0730.000 types.py:1468()
 2000   85.4770.000  134.6070.000 types.py:1519(__new__)
 1000   65.7770.000  126.7120.000 types.py:619()
```

Main difference is types.py:619() and types.py:88(fromInternal) 
(which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.

Benchmark (worst case scenario.)

Test
```
df = spark.range(100).selectExpr("current_timestamp as id0", 
"current_timestamp as id1", "current_timestamp as id2", "current_timestamp as 
id3", "current_timestamp as id4", "current_timestamp as id5", 
"current_timestamp as id6", "current_timestamp as id7", "current_timestamp as 
id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```

After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```

IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement 
will be even greater.

## How was this patch tested?

Existing tests.
Performance benchmark.

Author: Maciej Bryński 

Closes #19249 from maver1ck/spark_22032.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4073020
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4073020
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4073020

Branch: refs/heads/master
Commit: f4073020adf9752c7d7b39631ec3fa36d6345902
Parents: 73d9067
Author: Maciej Bryński 
Authored: Mon Sep 18 02:34:44 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Sep 18 02:34:44 2017 +0900

--
 python/pyspark/sql/types.py | 22 --
 1 file changed, 16 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4073020/python/pyspark/sql/types.py
--
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 920cf00..aaf520f 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -483,7 +483,9 @@ class StructType(DataType):
 self.names = [f.name for f in fields]
 assert all(isinstance(f, StructField) for f in fields),\
 "fields should be a list of StructField"
-self._needSerializeAnyField = any(f.needConversion() for f in self)
+# Precalculated list of fields that need conversion with 
fromInternal/toInternal functions
+self._needConversion = [f.needConversion() for f in self]
+self._needSerializeAnyField = any(self._needConversion)
 
 def add(self, field, data_type=None, nullable=True, metadata=None):
 """
@@ -528,7 +530,9 @@ class StructType(DataType):
 data_type_f = data_type
 self.fields.append(StructField(field, data_type_f, nullable, 
metadata))
 self.names.append(field)
-self._needSerializeAnyField = any(f.needConversion() for f in se