Repository: spark
Updated Branches:
  refs/heads/master aaa2c3b62 -> 39ae04e6b


[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space 
before "," or ":")

Fix the style violation (space before , and :).
This PR is a followup for #10643.

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #10685 from sarutak/SPARK-12692-followup-streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ae04e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ae04e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ae04e6

Branch: refs/heads/master
Commit: 39ae04e6b714e085a1341aa84d8fc5fc827d5f35
Parents: aaa2c3b
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Mon Jan 11 21:06:22 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jan 11 21:06:22 2016 -0800

----------------------------------------------------------------------
 .../clickstream/PageViewGenerator.scala         | 14 ++++----
 .../spark/streaming/flume/sink/Logging.scala    |  8 ++---
 .../streaming/flume/FlumeInputDStream.scala     | 18 +++++-----
 .../kafka/DirectKafkaInputDStream.scala         |  4 +--
 .../streaming/kafka/KafkaInputDStream.scala     |  4 +--
 .../kafka/ReliableKafkaStreamSuite.scala        |  2 +-
 .../spark/streaming/mqtt/MQTTInputDStream.scala |  4 +--
 .../streaming/twitter/TwitterInputDStream.scala |  4 +--
 project/MimaExcludes.scala                      | 12 +++++++
 .../org/apache/spark/streaming/Checkpoint.scala | 12 +++----
 .../spark/streaming/StreamingContext.scala      | 36 ++++++++++----------
 .../streaming/api/java/JavaDStreamLike.scala    |  2 +-
 .../dstream/ConstantInputDStream.scala          |  4 +--
 .../dstream/DStreamCheckpointData.scala         |  2 +-
 .../streaming/dstream/FileInputDStream.scala    | 18 +++++-----
 .../spark/streaming/dstream/InputDStream.scala  |  6 ++--
 .../dstream/PluggableInputDStream.scala         |  4 +--
 .../streaming/dstream/RawInputDStream.scala     |  4 +--
 .../dstream/ReceiverInputDStream.scala          |  6 ++--
 .../streaming/dstream/SocketInputDStream.scala  |  4 +--
 .../spark/streaming/dstream/StateDStream.scala  |  6 ++--
 .../spark/streaming/receiver/Receiver.scala     |  8 ++---
 .../spark/streaming/BasicOperationsSuite.scala  |  2 +-
 .../spark/streaming/CheckpointSuite.scala       |  2 +-
 .../spark/streaming/MasterFailureTest.scala     |  4 +--
 .../apache/spark/streaming/StateMapSuite.scala  |  2 +-
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |  4 +--
 .../scheduler/ReceiverTrackerSuite.scala        |  4 +--
 .../streaming/util/WriteAheadLogSuite.scala     |  2 +-
 30 files changed, 108 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index ce1a620..50216b9 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -23,15 +23,15 @@ import java.net.ServerSocket
 import java.util.Random
 
 /** Represents a page view on a website with associated dimension data. */
-class PageView(val url : String, val status : Int, val zipCode : Int, val 
userID : Int)
+class PageView(val url: String, val status: Int, val zipCode: Int, val userID: 
Int)
     extends Serializable {
-  override def toString() : String = {
+  override def toString(): String = {
     "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
   }
 }
 
 object PageView extends Serializable {
-  def fromString(in : String) : PageView = {
+  def fromString(in: String): PageView = {
     val parts = in.split("\t")
     new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
   }
@@ -58,9 +58,9 @@ object PageViewGenerator {
                        404 -> .05)
   val userZipCode = Map(94709 -> .5,
                         94117 -> .5)
-  val userID = Map((1 to 100).map(_ -> .01) : _*)
+  val userID = Map((1 to 100).map(_ -> .01): _*)
 
-  def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+  def pickFromDistribution[T](inputMap: Map[T, Double]): T = {
     val rand = new Random().nextDouble()
     var total = 0.0
     for ((item, prob) <- inputMap) {
@@ -72,7 +72,7 @@ object PageViewGenerator {
     inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 
1.0
   }
 
-  def getNextClickEvent() : String = {
+  def getNextClickEvent(): String = {
     val id = pickFromDistribution(userID)
     val page = pickFromDistribution(pages)
     val status = pickFromDistribution(httpStatus)
@@ -80,7 +80,7 @@ object PageViewGenerator {
     new PageView(page, status, zipCode, id).toString()
   }
 
-  def main(args : Array[String]) {
+  def main(args: Array[String]) {
     if (args.length != 2) {
       System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
       System.exit(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
index d87b869..aa530a7 100644
--- 
a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
+++ 
b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -26,20 +26,20 @@ import org.slf4j.{Logger, LoggerFactory}
 private[sink] trait Logging {
   // Make the log field transient so that objects with Logging can
   // be serialized and used on another machine
-  @transient private var log_ : Logger = null
+  @transient private var _log: Logger = null
 
   // Method to get or create the logger for this object
   protected def log: Logger = {
-    if (log_ == null) {
+    if (_log == null) {
       initializeIfNecessary()
       var className = this.getClass.getName
       // Ignore trailing $'s in the class names for Scala objects
       if (className.endsWith("$")) {
         className = className.substring(0, className.length - 1)
       }
-      log_ = LoggerFactory.getLogger(className)
+      _log = LoggerFactory.getLogger(className)
     }
-    log_
+    _log
   }
 
   // Log methods that take only a String

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1bfa35a..74bd016 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -41,12 +41,12 @@ import org.apache.spark.util.Utils
 
 private[streaming]
 class FlumeInputDStream[T: ClassTag](
-  ssc_ : StreamingContext,
+  _ssc: StreamingContext,
   host: String,
   port: Int,
   storageLevel: StorageLevel,
   enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
 
   override def getReceiver(): Receiver[SparkFlumeEvent] = {
     new FlumeReceiver(host, port, storageLevel, enableDecompression)
@@ -60,7 +60,7 @@ class FlumeInputDStream[T: ClassTag](
  * which are not serializable.
  */
 class SparkFlumeEvent() extends Externalizable {
-  var event : AvroFlumeEvent = new AvroFlumeEvent()
+  var event: AvroFlumeEvent = new AvroFlumeEvent()
 
   /* De-serialize from bytes. */
   def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -75,12 +75,12 @@ class SparkFlumeEvent() extends Externalizable {
       val keyLength = in.readInt()
       val keyBuff = new Array[Byte](keyLength)
       in.readFully(keyBuff)
-      val key : String = Utils.deserialize(keyBuff)
+      val key: String = Utils.deserialize(keyBuff)
 
       val valLength = in.readInt()
       val valBuff = new Array[Byte](valLength)
       in.readFully(valBuff)
-      val value : String = Utils.deserialize(valBuff)
+      val value: String = Utils.deserialize(valBuff)
 
       headers.put(key, value)
     }
@@ -109,7 +109,7 @@ class SparkFlumeEvent() extends Externalizable {
 }
 
 private[streaming] object SparkFlumeEvent {
-  def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
     val event = new SparkFlumeEvent
     event.event = in
     event
@@ -118,13 +118,13 @@ private[streaming] object SparkFlumeEvent {
 
 /** A simple server that implements Flume's Avro protocol. */
 private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
-  override def append(event : AvroFlumeEvent) : Status = {
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+  override def append(event: AvroFlumeEvent): Status = {
     receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
     Status.OK
   }
 
-  override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = 
{
+  override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
     events.asScala.foreach(event => 
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
     Status.OK
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a08747..54d8c8b 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,11 +58,11 @@ class DirectKafkaInputDStream[
   U <: Decoder[K]: ClassTag,
   T <: Decoder[V]: ClassTag,
   R: ClassTag](
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     val kafkaParams: Map[String, String],
     val fromOffsets: Map[TopicAndPartition, Long],
     messageHandler: MessageAndMetadata[K, V] => R
-  ) extends InputDStream[R](ssc_) with Logging {
+  ) extends InputDStream[R](_ssc) with Logging {
   val maxRetries = context.sparkContext.getConf.getInt(
     "spark.streaming.kafka.maxRetries", 1)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 67f2360..89d1811 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,12 +48,12 @@ class KafkaInputDStream[
   V: ClassTag,
   U <: Decoder[_]: ClassTag,
   T <: Decoder[_]: ClassTag](
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     useReliableReceiver: Boolean,
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
+  ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
 
   def getReceiver(): Receiver[(K, V)] = {
     if (!useReliableReceiver) {

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 80e2df6..7b9aee3 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -50,7 +50,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
   private var ssc: StreamingContext = _
   private var tempDirectory: File = null
 
-  override def beforeAll() : Unit = {
+  override def beforeAll(): Unit = {
     kafkaTestUtils = new KafkaTestUtils
     kafkaTestUtils.setup()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 116c170..079bd8a 100644
--- 
a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ 
b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class MQTTInputDStream(
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     brokerUrl: String,
     topic: String,
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[String](ssc_) {
+  ) extends ReceiverInputDStream[String](_ssc) {
 
   private[streaming] override def name: String = s"MQTT stream [$id]"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index a48eec7..bdd57fd 100644
--- 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,11 +39,11 @@ import org.apache.spark.streaming.receiver.Receiver
 */
 private[streaming]
 class TwitterInputDStream(
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     twitterAuth: Option[Authorization],
     filters: Seq[String],
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[Status](ssc_)  {
+  ) extends ReceiverInputDStream[Status](_ssc)  {
 
   private def createOAuthAuthorization(): Authorization = {
     new OAuthAuthorization(new ConfigurationBuilder().build())

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0d5f938..4206d1f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -135,6 +135,18 @@ object MimaExcludes {
       ) ++ Seq(
         // SPARK-12510 Refactor ActorReceiver to support Java
         
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
+      ) ++ Seq(
+        // SPARK-12692 Scala style: Fix the style violation (Space before "," 
or ":")
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
       )
     case v if v.startsWith("1.6") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 86f01d2..298cdc0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -183,7 +183,7 @@ class CheckpointWriter(
   val executor = Executors.newFixedThreadPool(1)
   val compressionCodec = CompressionCodec.createCodec(conf)
   private var stopped = false
-  private var fs_ : FileSystem = _
+  private var _fs: FileSystem = _
 
   @volatile private var latestCheckpointTime: Time = null
 
@@ -298,12 +298,12 @@ class CheckpointWriter(
   }
 
   private def fs = synchronized {
-    if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
-    fs_
+    if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+    _fs
   }
 
   private def reset() = synchronized {
-    fs_ = null
+    _fs = null
   }
 }
 
@@ -370,8 +370,8 @@ object CheckpointReader extends Logging {
 }
 
 private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: 
ClassLoader)
-  extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: 
ClassLoader)
+  extends ObjectInputStream(_inputStream) {
 
   override def resolveClass(desc: ObjectStreamClass): Class[_] = {
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba509a1..157ee92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, 
CallSite, ShutdownHookMan
  * of the context by `stop()` or by an exception.
  */
 class StreamingContext private[streaming] (
-    sc_ : SparkContext,
-    cp_ : Checkpoint,
-    batchDur_ : Duration
+    _sc: SparkContext,
+    _cp: Checkpoint,
+    _batchDur: Duration
   ) extends Logging {
 
   /**
@@ -126,18 +126,18 @@ class StreamingContext private[streaming] (
   }
 
 
-  if (sc_ == null && cp_ == null) {
+  if (_sc == null && _cp == null) {
     throw new Exception("Spark Streaming cannot be initialized with " +
       "both SparkContext and checkpoint as null")
   }
 
-  private[streaming] val isCheckpointPresent = (cp_ != null)
+  private[streaming] val isCheckpointPresent = (_cp != null)
 
   private[streaming] val sc: SparkContext = {
-    if (sc_ != null) {
-      sc_
+    if (_sc != null) {
+      _sc
     } else if (isCheckpointPresent) {
-      SparkContext.getOrCreate(cp_.createSparkConf())
+      SparkContext.getOrCreate(_cp.createSparkConf())
     } else {
       throw new SparkException("Cannot create StreamingContext without a 
SparkContext")
     }
@@ -154,13 +154,13 @@ class StreamingContext private[streaming] (
 
   private[streaming] val graph: DStreamGraph = {
     if (isCheckpointPresent) {
-      cp_.graph.setContext(this)
-      cp_.graph.restoreCheckpointData()
-      cp_.graph
+      _cp.graph.setContext(this)
+      _cp.graph.restoreCheckpointData()
+      _cp.graph
     } else {
-      require(batchDur_ != null, "Batch duration for StreamingContext cannot 
be null")
+      require(_batchDur != null, "Batch duration for StreamingContext cannot 
be null")
       val newGraph = new DStreamGraph()
-      newGraph.setBatchDuration(batchDur_)
+      newGraph.setBatchDuration(_batchDur)
       newGraph
     }
   }
@@ -169,15 +169,15 @@ class StreamingContext private[streaming] (
 
   private[streaming] var checkpointDir: String = {
     if (isCheckpointPresent) {
-      sc.setCheckpointDir(cp_.checkpointDir)
-      cp_.checkpointDir
+      sc.setCheckpointDir(_cp.checkpointDir)
+      _cp.checkpointDir
     } else {
       null
     }
   }
 
   private[streaming] val checkpointDuration: Duration = {
-    if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+    if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
   }
 
   private[streaming] val scheduler = new JobScheduler(this)
@@ -246,7 +246,7 @@ class StreamingContext private[streaming] (
   }
 
   private[streaming] def initialCheckpoint: Checkpoint = {
-    if (isCheckpointPresent) cp_ else null
+    if (isCheckpointPresent) _cp else null
   }
 
   private[streaming] def getNewInputStreamId() = 
nextInputStreamId.getAndIncrement()
@@ -460,7 +460,7 @@ class StreamingContext private[streaming] (
   def binaryRecordsStream(
       directory: String,
       recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary 
records stream") {
-    val conf = sc_.hadoopConfiguration
+    val conf = _sc.hadoopConfiguration
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, 
recordLength)
     val br = fileStream[LongWritable, BytesWritable, 
FixedLengthBinaryInputFormat](
       directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly 
= true, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 733147f..a791a47 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
    * of elements in a window over this DStream. windowDuration and 
slideDuration are as defined in
    * the window() operation. This is equivalent to window(windowDuration, 
slideDuration).count()
    */
-  def countByWindow(windowDuration: Duration, slideDuration: Duration) : 
JavaDStream[jl.Long] = {
+  def countByWindow(windowDuration: Duration, slideDuration: Duration): 
JavaDStream[jl.Long] = {
     dstream.countByWindow(windowDuration, slideDuration)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index 695384d..b5f86fe 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
 /**
  * An input stream that always returns the same RDD on each timestep. Useful 
for testing.
  */
-class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
-  extends InputDStream[T](ssc_) {
+class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
+  extends InputDStream[T](_ssc) {
 
   require(rdd != null,
     "parameter rdd null is illegal, which will lead to NPE in the following 
transformation")

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 3eff174..a9ce113 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   // in that batch's checkpoint data
   @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, 
Time]
 
-  @transient private var fileSystem : FileSystem = null
+  @transient private var fileSystem: FileSystem = null
   protected[streaming] def currentCheckpointFiles = 
data.asInstanceOf[HashMap[Time, String]]
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index cb5b1f2..1c23254 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -73,13 +73,13 @@ import org.apache.spark.util.{SerializableConfiguration, 
TimeStampedHashMap, Uti
  */
 private[streaming]
 class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     directory: String,
     filter: Path => Boolean = FileInputDStream.defaultFilter,
     newFilesOnly: Boolean = true,
     conf: Option[Configuration] = None)
     (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
-  extends InputDStream[(K, V)](ssc_) {
+  extends InputDStream[(K, V)](_ssc) {
 
   private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
 
@@ -128,8 +128,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   // Timestamp of the last round of finding files
   @transient private var lastNewFileFindingTime = 0L
 
-  @transient private var path_ : Path = null
-  @transient private var fs_ : FileSystem = null
+  @transient private var _path: Path = null
+  @transient private var _fs: FileSystem = null
 
   override def start() { }
 
@@ -289,17 +289,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   }
 
   private def directoryPath: Path = {
-    if (path_ == null) path_ = new Path(directory)
-    path_
+    if (_path == null) _path = new Path(directory)
+    _path
   }
 
   private def fs: FileSystem = {
-    if (fs_ == null) fs_ = 
directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
-    fs_
+    if (_fs == null) _fs = 
directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
+    _fs
   }
 
   private def reset()  {
-    fs_ = null
+    _fs = null
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index d60f418..76f6230 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -38,10 +38,10 @@ import org.apache.spark.util.Utils
  * that requires running a receiver on the worker nodes, use
  * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent 
class.
  *
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
  */
-abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
-  extends DStream[T](ssc_) {
+abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+  extends DStream[T](_ssc) {
 
   private[streaming] var lastValidTime: Time = null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 2442e4c..e003ddb 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -24,8 +24,8 @@ import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class PluggableInputDStream[T: ClassTag](
-  ssc_ : StreamingContext,
-  receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
+  _ssc: StreamingContext,
+  receiver: Receiver[T]) extends ReceiverInputDStream[T](_ssc) {
 
   def getReceiver(): Receiver[T] = {
     receiver

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index ac73dca..409c565 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
  */
 private[streaming]
 class RawInputDStream[T: ClassTag](
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     host: String,
     port: Int,
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
+  ) extends ReceiverInputDStream[T](_ssc) with Logging {
 
   def getReceiver(): Receiver[T] = {
     new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 565b137..49d8f14 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -35,11 +35,11 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
  * define [[getReceiver]] function that gets the receiver object of type
  * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
  * to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
  * @tparam T Class type of the object of this stream
  */
-abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
-  extends InputDStream[T](ssc_) {
+abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
+  extends InputDStream[T](_ssc) {
 
   /**
    * Asynchronously maintains & sends new rate limits to the receiver through 
the receiver tracker.

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e70fc87..4414774 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -31,12 +31,12 @@ import org.apache.spark.util.NextIterator
 
 private[streaming]
 class SocketInputDStream[T: ClassTag](
-    ssc_ : StreamingContext,
+    _ssc: StreamingContext,
     host: String,
     port: Int,
     bytesToObjects: InputStream => Iterator[T],
     storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[T](ssc_) {
+  ) extends ReceiverInputDStream[T](_ssc) {
 
   def getReceiver(): Receiver[T] = {
     new SocketReceiver(host, port, bytesToObjects, storageLevel)

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebbe139..fedffb2 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
     updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
     partitioner: Partitioner,
     preservePartitioning: Boolean,
-    initialRDD : Option[RDD[(K, S)]]
+    initialRDD: Option[RDD[(K, S)]]
   ) extends DStream[(K, S)](parent.ssc) {
 
   super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -43,7 +43,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
   override val mustCheckpoint = true
 
   private [this] def computeUsingPreviousRDD (
-    parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
+    parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = {
     // Define the function for the mapPartition operation on cogrouped RDD;
     // first map the cogrouped tuple to tuples of required type,
     // and then apply the update function
@@ -98,7 +98,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
                 // first map the grouped tuple to tuples of required type,
                 // and then apply the update function
                 val updateFuncLocal = updateFunc
-                val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
+                val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
                   updateFuncLocal (iterator.map (tuple => (tuple._1, 
tuple._2.toSeq, None)))
                 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 639f425..3376cd5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -108,7 +108,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) 
extends Serializable
   def onStop()
 
   /** Override this to specify a preferred location (hostname). */
-  def preferredLocation : Option[String] = None
+  def preferredLocation: Option[String] = None
 
   /**
    * Store a single item of received data to Spark's memory.
@@ -257,11 +257,11 @@ abstract class Receiver[T](val storageLevel: 
StorageLevel) extends Serializable
   private var id: Int = -1
 
   /** Handler object that runs the receiver. This is instantiated lazily in 
the worker. */
-  @transient private var _supervisor : ReceiverSupervisor = null
+  @transient private var _supervisor: ReceiverSupervisor = null
 
   /** Set the ID of the DStream that this receiver is associated with. */
-  private[streaming] def setReceiverId(id_ : Int) {
-    id = id_
+  private[streaming] def setReceiverId(_id: Int) {
+    id = _id
   }
 
   /** Attach Network Receiver executor to this receiver. */

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9d296c6..25e7ae8 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -186,7 +186,7 @@ class BasicOperationsSuite extends TestSuiteBase {
     val output = Seq(1 to 8, 101 to 108, 201 to 208)
     testOperation(
       input,
-      (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+      (s: DStream[Int]) => s.union(s.map(_ + 4)),
       output
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4d04138..4a6b91f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, ManualClock, 
MutableURLClassLoader, ResetSy
  * A input stream that records the times of restore() invoked
  */
 private[streaming]
-class CheckpointInputDStream(ssc_ : StreamingContext) extends 
InputDStream[Int](ssc_) {
+class CheckpointInputDStream(_ssc: StreamingContext) extends 
InputDStream[Int](_ssc) {
   protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
   override def start(): Unit = { }
   override def stop(): Unit = { }

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 4e56dfb..7bbbdeb 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -200,12 +200,12 @@ object MasterFailureTest extends Logging {
    * the last expected output is generated. Finally, return
    */
   private def runStreams[T: ClassTag](
-      ssc_ : StreamingContext,
+      _ssc: StreamingContext,
       lastExpectedOutput: T,
       maxTimeToRun: Long
    ): Seq[T] = {
 
-    var ssc = ssc_
+    var ssc = _ssc
     var totalTimeRan = 0L
     var isLastOutputGenerated = false
     var isTimedOut = false

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
index da0430e..7a76caf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
@@ -280,7 +280,7 @@ class StateMapSuite extends SparkFunSuite {
     testSerialization(new KryoSerializer(conf), map, msg)
   }
 
-  private def testSerialization[T : ClassTag](
+  private def testSerialization[T: ClassTag](
       serializer: Serializer,
       map: OpenHashMapBasedStateMap[T, T],
       msg: String): OpenHashMapBasedStateMap[T, T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 0ae4c45..197b3d1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -896,7 +896,7 @@ object SlowTestReceiver {
 package object testPackage extends Assertions {
   def test() {
     val conf = new SparkConf().setMaster("local").setAppName("CreationSite 
test")
-    val ssc = new StreamingContext(conf , Milliseconds(100))
+    val ssc = new StreamingContext(conf, Milliseconds(100))
     try {
       val inputStream = ssc.receiverStream(new TestReceiver)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 54eff2b..239b108 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -58,8 +58,8 @@ private[streaming] class DummyInputDStream(ssc: 
StreamingContext) extends InputD
  * replayable, reliable message queue like Kafka. It requires a sequence as 
input, and
  * returns the i_th element at the i_th batch unde manual clock.
  */
-class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: 
Seq[Seq[T]], numPartitions: Int)
-  extends InputDStream[T](ssc_) {
+class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], 
numPartitions: Int)
+  extends InputDStream[T](_ssc) {
 
   def start() {}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 3bd8d08..b67189f 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -107,8 +107,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
 }
 
 /** An input DStream with for testing rate controlling */
-private[streaming] class RateTestInputDStream(@transient ssc_ : 
StreamingContext)
-  extends ReceiverInputDStream[Int](ssc_) {
+private[streaming] class RateTestInputDStream(@transient _ssc: 
StreamingContext)
+  extends ReceiverInputDStream[Int](_ssc) {
 
   override def getReceiver(): Receiver[Int] = new RateTestReceiver(id)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39ae04e6/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index b5d6a24..734dd93 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -154,7 +154,7 @@ abstract class CommonWriteAheadLogTests(
     // Recover old files and generate a second set of log files
     val dataToWrite2 = generateRandomData()
     manualClock.advance(100000)
-    writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, 
allowBatching ,
+    writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, 
allowBatching,
       manualClock)
     val logFiles2 = getLogFilesInDirectory(testDir)
     assert(logFiles2.size > logFiles1.size)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to