Repository: spark
Updated Branches:
  refs/heads/branch-1.5 8a6e63c78 -> abb0ca7a9


[SPARK-11270][STREAMING] Add improved equality testing for TopicAndPartition 
from the Kafka Streaming API

jerryshao tdas

I know this is kind of minor, and I know you all are busy, but this brings this 
class in line with the `OffsetRange` class, and makes tests a little more 
concise.

Instead of doing something like:
```
assert topic_and_partition_instance._topic == "foo"
assert topic_and_partition_instance._partition == 0
```

You can do something like:
```
assert topic_and_partition_instance == TopicAndPartition("foo", 0)
```

Before:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
False
```

After:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
True
```

I couldn't find any tests - am I missing something?

Author: Nick Evans <m...@nicolasevans.org>

Closes #9236 from manygrams/topic_and_partition_equality.

(cherry picked from commit 8f888eea1aef5a28916ec406a99fc19648681ecf)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abb0ca7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abb0ca7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abb0ca7a

Branch: refs/heads/branch-1.5
Commit: abb0ca7a947f4803f3d16d65d1f6c53930890dee
Parents: 8a6e63c
Author: Nick Evans <m...@nicolasevans.org>
Authored: Tue Oct 27 01:29:06 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 27 01:30:29 2015 -0700

----------------------------------------------------------------------
 python/pyspark/streaming/kafka.py | 10 ++++++++++
 python/pyspark/streaming/tests.py | 10 ++++++++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abb0ca7a/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index 8a814c6..f7b59d6 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -254,6 +254,16 @@ class TopicAndPartition(object):
     def _jTopicAndPartition(self, helper):
         return helper.createTopicAndPartition(self._topic, self._partition)
 
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return (self._topic == other._topic
+                    and self._partition == other._partition)
+        else:
+            return False
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
 
 class Broker(object):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/abb0ca7a/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index cfea95b..a8c7b51 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -887,6 +887,16 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
 
+    def test_topic_and_partition_equality(self):
+        topic_and_partition_a = TopicAndPartition("foo", 0)
+        topic_and_partition_b = TopicAndPartition("foo", 0)
+        topic_and_partition_c = TopicAndPartition("bar", 0)
+        topic_and_partition_d = TopicAndPartition("foo", 1)
+
+        self.assertEqual(topic_and_partition_a, topic_and_partition_b)
+        self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
+        self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
+
 
 class FlumeStreamTests(PySparkStreamingTestCase):
     timeout = 20  # seconds


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to