Repository: spark
Updated Branches:
  refs/heads/master e84815dc3 -> b127ff8a0


[SPARK-2808] [STREAMING] [KAFKA] cleanup tests from

see if requiring producer acks eliminates the need for waitUntilLeaderOffset 
calls in tests

Author: cody koeninger <c...@koeninger.org>

Closes #5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following 
commits:

1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup
4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of 
method from private test class
af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup
4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins 
attempt
1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks 
eliminates the need for waitUntilLeaderOffset calls in tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b127ff8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b127ff8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b127ff8a

Branch: refs/heads/master
Commit: b127ff8a0c5fb704da574d101a2d0e27ac5f463a
Parents: e84815d
Author: cody koeninger <c...@koeninger.org>
Authored: Sun Jun 7 21:42:45 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Jun 7 21:42:45 2015 +0100

----------------------------------------------------------------------
 .../spark/streaming/kafka/KafkaTestUtils.scala     | 17 ++---------------
 .../spark/streaming/kafka/JavaKafkaRDDSuite.java   |  3 ---
 .../spark/streaming/kafka/KafkaRDDSuite.scala      |  4 ----
 project/MimaExcludes.scala                         |  3 +++
 python/pyspark/streaming/tests.py                  |  5 -----
 5 files changed, 5 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b127ff8a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 6dc4e95..b608b75 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
     val props = new Properties()
     props.put("metadata.broker.list", brokerAddress)
     props.put("serializer.class", classOf[StringEncoder].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("request.required.acks", "-1")
     props
   }
 
@@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
     tryAgain(1)
   }
 
-  /** Wait until the leader offset for the given topic/partition equals the 
specified offset */
-  def waitUntilLeaderOffset(
-      topic: String,
-      partition: Int,
-      offset: Long): Unit = {
-    eventually(Time(10000), Time(100)) {
-      val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
-      val tp = TopicAndPartition(topic, partition)
-      val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
-      assert(
-        llo == offset,
-        s"$topic $partition $offset not reached after timeout")
-    }
-  }
-
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
     def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
       case Some(partitionState) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b127ff8a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index 5cf3796..a9dc6e5 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -72,9 +72,6 @@ public class JavaKafkaRDDSuite implements Serializable {
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
     kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
 
-    kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
-    kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
-
     OffsetRange[] offsetRanges = {
       OffsetRange.create(topic1, 0, 0, 1),
       OffsetRange.create(topic2, 0, 0, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/b127ff8a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 0544872..d5baf5f 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -61,8 +61,6 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
       "group.id" -> s"test-consumer-${Random.nextInt}")
 
-    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
-
     val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
 
     val rdd = KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
@@ -86,7 +84,6 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     // this is the "lots of messages" case
     kafkaTestUtils.sendMessages(topic, sent)
     val sentCount = sent.values.sum
-    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
 
     // rdd defined from leaders after sending messages, should get the number 
sent
     val rdd = getRdd(kc, Set(topic))
@@ -113,7 +110,6 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     val sentOnlyOne = Map("d" -> 1)
 
     kafkaTestUtils.sendMessages(topic, sentOnlyOne)
-    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
 
     assert(rdd2.isDefined)
     assert(rdd2.get.count === 0, "got messages when there shouldn't be any")

http://git-wip-us.apache.org/repos/asf/spark/blob/b127ff8a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 73e4bfd..8a93ca2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -47,6 +47,9 @@ object MimaExcludes {
             // Mima false positive (was a private[spark] class)
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.util.collection.PairIterator"),
+            // Removing a testing method from a private class
+            ProblemFilters.exclude[MissingMethodProblem](
+              
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
             // SQL execution is considered private.
             excludePackage("org.apache.spark.sql.execution")
           )

http://git-wip-us.apache.org/repos/asf/spark/blob/b127ff8a/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 46cb18b..57049be 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -615,7 +615,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, 
sum(sendData.values()))
 
         stream = KafkaUtils.createStream(self.ssc, 
self._kafkaTestUtils.zkAddress(),
                                          "test-streaming-consumer", {topic: 1},
@@ -631,7 +630,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, 
sum(sendData.values()))
 
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
         self._validateStreamResult(sendData, stream)
@@ -646,7 +644,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, 
sum(sendData.values()))
 
         stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, 
fromOffsets)
         self._validateStreamResult(sendData, stream)
@@ -661,7 +658,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, 
sum(sendData.values()))
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
         self._validateRddResult(sendData, rdd)
 
@@ -677,7 +673,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 
         self._kafkaTestUtils.createTopic(topic)
         self._kafkaTestUtils.sendMessages(topic, sendData)
-        self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, 
sum(sendData.values()))
         rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
         self._validateRddResult(sendData, rdd)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to