Repository: spark
Updated Branches:
  refs/heads/master 46395db80 -> c62263340


[SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback

## What changes were proposed in this pull request?
code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch

## How was this patch tested?
unit tests

Author: cody koeninger <c...@koeninger.org>

Closes #13908 from koeninger/kafka-0-8-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6226334
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6226334
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6226334

Branch: refs/heads/master
Commit: c62263340edb6976a10f274e716fde6cd2c5bf34
Parents: 46395db
Author: cody koeninger <c...@koeninger.org>
Authored: Thu Jun 30 13:16:58 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Jun 30 13:16:58 2016 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 12 ++++++------
 .../org/apache/spark/streaming/kafka/KafkaRDD.scala     |  9 ++++++---
 .../streaming/kafka/JavaDirectKafkaStreamSuite.java     |  5 -----
 3 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index fb58ed7..c3c7993 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
 
 /**
- *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ *  A stream of [[KafkaRDD]] where
  * each given Kafka topic/partition corresponds to an RDD partition.
  * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the 
maximum number
  *  of messages
@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
  * and this DStream is not responsible for committing offsets,
  * so that you can control exactly-once semantics.
  * For an easy interface to Kafka-managed offsets,
- *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ *  see [[KafkaCluster]]
  * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
  * configuration parameters</a>.
  *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
       if (retries <= 0) {
         throw new SparkException(err)
       } else {
-        log.error(err)
+        logError(err)
         Thread.sleep(kc.config.refreshLeaderBackoffMs)
         latestLeaderOffsets(retries - 1)
       }
@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[
       data.asInstanceOf[mutable.HashMap[Time, 
Array[OffsetRange.OffsetRangeTuple]]]
     }
 
-    override def update(time: Time) {
+    override def update(time: Time): Unit = {
       batchForTime.clear()
       generatedRDDs.foreach { kv =>
         val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, 
R]].offsetRanges.map(_.toTuple).toArray
@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[
       }
     }
 
-    override def cleanup(time: Time) { }
+    override def cleanup(time: Time): Unit = { }
 
-    override def restore() {
+    override def restore(): Unit = {
       // this is assuming that the topics don't change during execution, which 
is true currently
       val topics = fromOffsets.keySet
       val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))

http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index d4881b1..2b92577 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -129,7 +129,7 @@ class KafkaRDD[
     val part = thePart.asInstanceOf[KafkaRDDPartition]
     assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
     if (part.fromOffset == part.untilOffset) {
-      log.info(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
+      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
         s"skipping ${part.topic} ${part.partition}")
       Iterator.empty
     } else {
@@ -137,13 +137,16 @@ class KafkaRDD[
     }
   }
 
+  /**
+   * An iterator that fetches messages directly from Kafka for the offsets in 
partition.
+   */
   private class KafkaRDDIterator(
       part: KafkaRDDPartition,
       context: TaskContext) extends NextIterator[R] {
 
     context.addTaskCompletionListener{ context => closeIfNeeded() }
 
-    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
 
     val kc = new KafkaCluster(kafkaParams)
@@ -177,7 +180,7 @@ class KafkaRDD[
         val err = resp.errorCode(part.topic, part.partition)
         if (err == ErrorMapping.LeaderNotAvailableCode ||
           err == ErrorMapping.NotLeaderForPartitionCode) {
-          log.error(s"Lost leader for topic ${part.topic} partition 
${part.partition}, " +
+          logError(s"Lost leader for topic ${part.topic} partition 
${part.partition}, " +
             s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
           Thread.sleep(kc.config.refreshLeaderBackoffMs)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index fa6b0db..71404a7 100644
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
           @Override
           public void call(JavaRDD<String> rdd) {
             result.addAll(rdd.collect());
-            for (OffsetRange o : offsetRanges.get()) {
-              System.out.println(
-                o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
-              );
-            }
           }
         }
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to