Repository: spark
Updated Branches:
  refs/heads/master a78a91f4d -> 499ac3e69


[SPARK-12091] [PYSPARK] Deprecate the JAVA-specific deserialized storage levels

The current default storage level of Python persist API is MEMORY_ONLY_SER. 
This is different from the default level MEMORY_ONLY in the official document 
and RDD APIs.

davies Is this inconsistency intentional? Thanks!

Updates: Since the data is always serialized on the Python side, the storage 
levels of JAVA-specific deserialization are not removed, such as MEMORY_ONLY.

Updates: Based on the reviewers' feedback. In Python, stored objects will 
always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not 
matter whether you choose a serialized level. The available storage levels in 
Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK`, 
`MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #10092 from gatorsmile/persistStorageLevel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/499ac3e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/499ac3e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/499ac3e6

Branch: refs/heads/master
Commit: 499ac3e69a102f9b10a1d7e14382fa191516f7b5
Parents: a78a91f
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Fri Dec 18 20:06:05 2015 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Fri Dec 18 20:06:05 2015 -0800

----------------------------------------------------------------------
 docs/configuration.md               |  7 ++++---
 docs/programming-guide.md           | 10 ++++++----
 python/pyspark/rdd.py               |  8 ++++----
 python/pyspark/sql/dataframe.py     |  6 +++---
 python/pyspark/storagelevel.py      | 31 +++++++++++++++++++++----------
 python/pyspark/streaming/context.py |  2 +-
 python/pyspark/streaming/dstream.py |  4 ++--
 python/pyspark/streaming/flume.py   |  4 ++--
 python/pyspark/streaming/kafka.py   |  2 +-
 python/pyspark/streaming/mqtt.py    |  2 +-
 10 files changed, 45 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 38d3d05..85e7d12 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -687,9 +687,10 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.rdd.compress</code></td>
   <td>false</td>
   <td>
-    Whether to compress serialized RDD partitions (e.g. for
-    <code>StorageLevel.MEMORY_ONLY_SER</code>). Can save substantial space at 
the cost of some
-    extra CPU time.
+    Whether to compress serialized RDD partitions (e.g. for 
+    <code>StorageLevel.MEMORY_ONLY_SER</code> in Java 
+    and Scala or <code>StorageLevel.MEMORY_ONLY</code> in Python). 
+    Can save substantial space at the cost of some extra CPU time. 
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index f823b89..c5e2a1c 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1196,14 +1196,14 @@ storage levels is:
     partitions that don't fit on disk, and read them from there when they're 
needed. </td>
 </tr>
 <tr>
-  <td> MEMORY_ONLY_SER </td>
+  <td> MEMORY_ONLY_SER <br /> (Java and Scala) </td>
   <td> Store RDD as <i>serialized</i> Java objects (one byte array per 
partition).
     This is generally more space-efficient than deserialized objects, 
especially when using a
     <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
   </td>
 </tr>
 <tr>
-  <td> MEMORY_AND_DISK_SER </td>
+  <td> MEMORY_AND_DISK_SER <br /> (Java and Scala) </td>
   <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in 
memory to disk instead of
     recomputing them on the fly each time they're needed. </td>
 </tr>
@@ -1230,7 +1230,9 @@ storage levels is:
 </tr>
 </table>
 
-**Note:** *In Python, stored objects will always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not 
matter whether you choose a serialized level.*
+**Note:** *In Python, stored objects will always be serialized with the 
[Pickle](https://docs.python.org/2/library/pickle.html) library, 
+so it does not matter whether you choose a serialized level. The available 
storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, 
+`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and 
`OFF_HEAP`.*
 
 Spark also automatically persists some intermediate data in shuffle operations 
(e.g. `reduceByKey`), even without users calling `persist`. This is done to 
avoid recomputing the entire input if a node fails during the shuffle. We still 
recommend users call `persist` on the resulting RDD if they plan to reuse it.
 
@@ -1243,7 +1245,7 @@ efficiency. We recommend going through the following 
process to select one:
   This is the most CPU-efficient option, allowing operations on the RDDs to 
run as fast as possible.
 
 * If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization 
library](tuning.html) to
-make the objects much more space-efficient, but still reasonably fast to 
access.
+make the objects much more space-efficient, but still reasonably fast to 
access. (Java and Scala)
 
 * Don't spill to disk unless the functions that computed your datasets are 
expensive, or they filter
 a large amount of the data. Otherwise, recomputing a partition may be as fast 
as reading it from

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 00bb9a6..a019c05 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -220,18 +220,18 @@ class RDD(object):
 
     def cache(self):
         """
-        Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).
+        Persist this RDD with the default storage level (C{MEMORY_ONLY}).
         """
         self.is_cached = True
-        self.persist(StorageLevel.MEMORY_ONLY_SER)
+        self.persist(StorageLevel.MEMORY_ONLY)
         return self
 
-    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
+    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
         """
         Set this RDD's storage level to persist its values across operations
         after the first time it is computed. This can only be used to assign
         a new storage level if the RDD does not have a storage level set yet.
-        If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
+        If no storage level is specified defaults to (C{MEMORY_ONLY}).
 
         >>> rdd = sc.parallelize(["b", "a", "c"])
         >>> rdd.persist().is_cached

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 78ab475..24fc291 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -371,18 +371,18 @@ class DataFrame(object):
 
     @since(1.3)
     def cache(self):
-        """ Persists with the default storage level (C{MEMORY_ONLY_SER}).
+        """ Persists with the default storage level (C{MEMORY_ONLY}).
         """
         self.is_cached = True
         self._jdf.cache()
         return self
 
     @since(1.3)
-    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
+    def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
         """Sets the storage level to persist its values across operations
         after the first time it is computed. This can only be used to assign
         a new storage level if the RDD does not have a storage level set yet.
-        If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
+        If no storage level is specified defaults to (C{MEMORY_ONLY}).
         """
         self.is_cached = True
         javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 676aa0f..d4f184a 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -23,8 +23,10 @@ class StorageLevel(object):
     """
     Flags for controlling the storage of an RDD. Each StorageLevel records 
whether to use memory,
     whether to drop the RDD to disk if it falls out of memory, whether to keep 
the data in memory
-    in a serialized format, and whether to replicate the RDD partitions on 
multiple nodes.
-    Also contains static constants for some commonly used storage levels, such 
as MEMORY_ONLY.
+    in a JAVA-specific serialized format, and whether to replicate the RDD 
partitions on multiple
+    nodes. Also contains static constants for some commonly used storage 
levels, MEMORY_ONLY.
+    Since the data is always serialized on the Python side, all the constants 
use the serialized
+    formats.
     """
 
     def __init__(self, useDisk, useMemory, useOffHeap, deserialized, 
replication=1):
@@ -49,12 +51,21 @@ class StorageLevel(object):
 
 StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
 StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
-StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True)
-StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2)
-StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False)
-StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
-StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True)
-StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
-StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
-StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
 StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)
+
+"""
+.. note:: The following four storage level constants are deprecated in 2.0, 
since the records \
+will always be serialized in Python.
+"""
+StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead."""
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead."""
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` 
instead."""
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` 
instead."""

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 1388b6d..3deed52 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -258,7 +258,7 @@ class StreamingContext(object):
         """
         self._jssc.checkpoint(directory)
 
-    def socketTextStream(self, hostname, port, 
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+    def socketTextStream(self, hostname, port, 
storageLevel=StorageLevel.MEMORY_AND_DISK_2):
         """
         Create an input from TCP source hostname:port. Data is received using
         a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` 
delimited

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index b994a53..adc2651 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -208,10 +208,10 @@ class DStream(object):
     def cache(self):
         """
         Persist the RDDs of this DStream with the default storage level
-        (C{MEMORY_ONLY_SER}).
+        (C{MEMORY_ONLY}).
         """
         self.is_cached = True
-        self.persist(StorageLevel.MEMORY_ONLY_SER)
+        self.persist(StorageLevel.MEMORY_ONLY)
         return self
 
     def persist(self, storageLevel):

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py 
b/python/pyspark/streaming/flume.py
index b3d1905..b1fff0a 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -40,7 +40,7 @@ class FlumeUtils(object):
 
     @staticmethod
     def createStream(ssc, hostname, port,
-                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                      enableDecompression=False,
                      bodyDecoder=utf8_decoder):
         """
@@ -70,7 +70,7 @@ class FlumeUtils(object):
 
     @staticmethod
     def createPollingStream(ssc, addresses,
-                            storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+                            storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                             maxBatchSize=1000,
                             parallelism=5,
                             bodyDecoder=utf8_decoder):

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index cdf97ec..13f8f95 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -40,7 +40,7 @@ class KafkaUtils(object):
 
     @staticmethod
     def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
-                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                      keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
         """
         Create an input stream that pulls messages from a Kafka Broker.

http://git-wip-us.apache.org/repos/asf/spark/blob/499ac3e6/python/pyspark/streaming/mqtt.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index 1ce4093..3a515ea 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -28,7 +28,7 @@ class MQTTUtils(object):
 
     @staticmethod
     def createStream(ssc, brokerUrl, topic,
-                     storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+                     storageLevel=StorageLevel.MEMORY_AND_DISK_2):
         """
         Create an input stream that pulls messages from a Mqtt Broker.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to