Repository: spark
Updated Branches:
  refs/heads/master 5aa2710c1 -> 8d4940092


[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in 
Python API

The semantics of Python countByValue is different from Scala API, it is more 
like countDistinctValue, so here change to make it consistent with Scala/Java 
API.

Author: jerryshao <ss...@hortonworks.com>

Closes #10350 from jerryshao/SPARK-12353.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d494009
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d494009
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d494009

Branch: refs/heads/master
Commit: 8d4940092141c0909b673607f393cdd53f093ed6
Parents: 5aa2710
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Dec 28 10:43:23 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Dec 28 10:43:23 2015 +0000

----------------------------------------------------------------------
 python/pyspark/streaming/dstream.py |  4 ++--
 python/pyspark/streaming/tests.py   | 17 ++++++++++++++---
 2 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d494009/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index adc2651..86447f5 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -247,7 +247,7 @@ class DStream(object):
         Return a new DStream in which each RDD contains the counts of each
         distinct value in each RDD of this DStream.
         """
-        return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: 
None).count()
+        return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
 
     def saveAsTextFiles(self, prefix, suffix=None):
         """
@@ -493,7 +493,7 @@ class DStream(object):
         keyed = self.map(lambda x: (x, 1))
         counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                              windowDuration, slideDuration, 
numPartitions)
-        return counted.filter(lambda kv: kv[1] > 0).count()
+        return counted.filter(lambda kv: kv[1] > 0)
 
     def groupByKeyAndWindow(self, windowDuration, slideDuration, 
numPartitions=None):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/8d494009/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 4949cd6..86b05d9 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -279,8 +279,10 @@ class BasicOperationTests(PySparkStreamingTestCase):
 
         def func(dstream):
             return dstream.countByValue()
-        expected = [[4], [4], [3]]
-        self._test_func(input, func, expected)
+        expected = [[(1, 2), (2, 2), (3, 2), (4, 2)],
+                    [(5, 2), (6, 2), (7, 1), (8, 1)],
+                    [("a", 2), ("b", 1), ("", 1)]]
+        self._test_func(input, func, expected, sort=True)
 
     def test_groupByKey(self):
         """Basic operation test for DStream.groupByKey."""
@@ -651,7 +653,16 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         def func(dstream):
             return dstream.countByValueAndWindow(2.5, .5)
 
-        expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
+        expected = [[(0, 1)],
+                    [(0, 2), (1, 1)],
+                    [(0, 3), (1, 2), (2, 1)],
+                    [(0, 4), (1, 3), (2, 2), (3, 1)],
+                    [(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)],
+                    [(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)],
+                    [(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)],
+                    [(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)],
+                    [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)],
+                    [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]]
         self._test_func(input, func, expected)
 
     def test_group_by_key_and_window(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to