Repository: spark
Updated Branches:
  refs/heads/master 67d753516 -> 06694f1c6


[MINOR] Typo fixes

## What changes were proposed in this pull request?

Typo fixes. No functional changes.

## How was this patch tested?

Built the sources and ran with samples.

Author: Jacek Laskowski <ja...@japila.pl>

Closes #11802 from jaceklaskowski/typo-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06694f1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06694f1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06694f1c

Branch: refs/heads/master
Commit: 06694f1c68cb752ea311144f0dbe50e92e1393cf
Parents: 67d7535
Author: Jacek Laskowski <ja...@japila.pl>
Authored: Sat Apr 2 08:12:04 2016 -0700
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Apr 2 08:12:04 2016 -0700

----------------------------------------------------------------------
 .../streaming/RecoverableNetworkWordCount.scala |  2 +-
 .../scala/org/apache/spark/ml/Pipeline.scala    |  2 +-
 .../spark/ml/regression/LinearRegression.scala  |  2 +-
 .../catalyst/plans/logical/LogicalPlan.scala    |  4 ++--
 .../apache/spark/sql/ExperimentalMethods.scala  |  2 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  | 12 +++++------
 .../spark/streaming/StreamingContext.scala      | 13 ++++++------
 .../dstream/ConstantInputDStream.scala          |  2 +-
 .../spark/streaming/dstream/DStream.scala       |  8 +++----
 .../dstream/DStreamCheckpointData.scala         |  6 +++---
 .../spark/streaming/dstream/InputDStream.scala  |  6 +++---
 .../dstream/ReducedWindowedDStream.scala        |  2 +-
 .../spark/streaming/dstream/StateDStream.scala  | 12 +++++------
 .../scheduler/ReceivedBlockTracker.scala        |  4 ++--
 .../scheduler/rate/RateEstimator.scala          | 22 +++++++++++---------
 16 files changed, 52 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 05f8e65..b6b8bc3 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -141,7 +141,7 @@ object RecoverableNetworkWordCount {
 
   def main(args: Array[String]) {
     if (args.length != 4) {
-      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+      System.err.println("Your arguments were " + args.mkString("[", ", ", 
"]"))
       System.err.println(
         """
           |Usage: RecoverableNetworkWordCount <hostname> <port> 
<checkpoint-directory>

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 3a99979..afefaaa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") (
             t
           case _ =>
             throw new IllegalArgumentException(
-              s"Do not support stage $stage of type ${stage.getClass}")
+              s"Does not support stage $stage of type ${stage.getClass}")
         }
         if (index < indexOfLastEstimator) {
           curDataset = transformer.transform(curDataset)

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index ba5ad4c..2633c06 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends 
PredictorParams
  * The specific squared error loss function used is:
  *   L = 1/2n ||A coefficients - y||^2^
  *
- * This support multiple types of regularization:
+ * This supports multiple types of regularization:
  *  - none (a.k.a. ordinary least squares)
  *  - L2 (ridge regression)
  *  - L1 (Lasso)

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index ecf4285..aceeb8a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -79,13 +79,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
 
   /**
    * Computes [[Statistics]] for this plan. The default implementation assumes 
the output
-   * cardinality is the product of of all child plan's cardinality, i.e. 
applies in the case
+   * cardinality is the product of all child plan's cardinality, i.e. applies 
in the case
    * of cartesian joins.
    *
    * [[LeafNode]]s must override this.
    */
   def statistics: Statistics = {
-    if (children.size == 0) {
+    if (children.isEmpty) {
       throw new UnsupportedOperationException(s"LeafNode $nodeName must 
implement statistics.")
     }
     Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
index d7cd84f..c5df028 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -37,7 +37,7 @@ class ExperimentalMethods private[sql]() {
 
   /**
    * Allows extra strategies to be injected into the query planner at runtime. 
 Note this API
-   * should be consider experimental and is not intended to be stable across 
releases.
+   * should be considered experimental and is not intended to be stable across 
releases.
    *
    * @since 1.3.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f5b083c..0ed1ed4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.collection.CompactBuffer
 /**
  * Performs an inner hash join of two child relations.  When the output RDD of 
this operator is
  * being constructed, a Spark job is asynchronously started to calculate the 
values for the
- * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * broadcast relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
  * relation is not shuffled.
  */
 case class BroadcastHashJoin(

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 7490605..baf947d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2232,7 +2232,7 @@ object functions {
 
   /**
    * Splits str around pattern (pattern is a regular expression).
-   * NOTE: pattern is a string represent the regular expression.
+   * NOTE: pattern is a string representation of the regular expression.
    *
    * @group string_funcs
    * @since 1.5.0
@@ -2267,9 +2267,9 @@ object functions {
 
   /**
    * Translate any character in the src by a character in replaceString.
-   * The characters in replaceString is corresponding to the characters in 
matchingString.
-   * The translate will happen when any character in the string matching with 
the character
-   * in the matchingString.
+   * The characters in replaceString correspond to the characters in 
matchingString.
+   * The translate will happen when any character in the string matches the 
character
+   * in the `matchingString`.
    *
    * @group string_funcs
    * @since 1.5.0
@@ -2692,7 +2692,7 @@ object functions {
   
//////////////////////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Returns true if the array contain the value
+   * Returns true if the array contains `value`
    * @group collection_funcs
    * @since 1.5.0
    */
@@ -2920,7 +2920,7 @@ object functions {
 
   /**
    * Defines a user-defined function (UDF) using a Scala closure. For this 
variant, the caller must
-   * specifcy the output data type, and there is no automatic input type 
coercion.
+   * specify the output data type, and there is no automatic input type 
coercion.
    *
    * @param f  A closure in Scala
    * @param dataType  The output data type of the UDF

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 3a664c4..c1e151d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -132,7 +132,7 @@ class StreamingContext private[streaming] (
       "both SparkContext and checkpoint as null")
   }
 
-  private[streaming] val isCheckpointPresent = (_cp != null)
+  private[streaming] val isCheckpointPresent: Boolean = _cp != null
 
   private[streaming] val sc: SparkContext = {
     if (_sc != null) {
@@ -213,8 +213,8 @@ class StreamingContext private[streaming] (
   def sparkContext: SparkContext = sc
 
   /**
-   * Set each DStreams in this context to remember RDDs it generated in the 
last given duration.
-   * DStreams remember RDDs only for a limited duration of time and releases 
them for garbage
+   * Set each DStream in this context to remember RDDs it generated in the 
last given duration.
+   * DStreams remember RDDs only for a limited duration of time and release 
them for garbage
    * collection. This method allows the developer to specify how long to 
remember the RDDs (
    * if the developer wishes to query old data outside the DStream 
computation).
    * @param duration Minimum duration that each DStream should remember its 
RDDs
@@ -282,13 +282,14 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Create a input stream from TCP source hostname:port. Data is received 
using
+   * Creates an input stream from TCP source hostname:port. Data is received 
using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` 
delimited
    * lines.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
    *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+   * @see [[socketStream]]
    */
   def socketTextStream(
       hostname: String,
@@ -299,7 +300,7 @@ class StreamingContext private[streaming] (
   }
 
   /**
-   * Create a input stream from TCP source hostname:port. Data is received 
using
+   * Creates an input stream from TCP source hostname:port. Data is received 
using
    * a TCP socket and the receive bytes it interpreted as object using the 
given
    * converter.
    * @param hostname      Hostname to connect to for receiving data
@@ -860,7 +861,7 @@ private class StreamingContextPythonHelper {
    */
   def tryRecoverFromCheckpoint(checkpointPath: String): 
Option[StreamingContext] = {
     val checkpointOption = CheckpointReader.read(
-      checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+      checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, 
ignoreReadError = false)
     checkpointOption.map(new StreamingContext(null, _, null))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index b5f86fe..995470e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{StreamingContext, Time}
 
 /**
- * An input stream that always returns the same RDD on each timestep. Useful 
for testing.
+ * An input stream that always returns the same RDD on each time step. Useful 
for testing.
  */
 class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
   extends InputDStream[T](_ssc) {

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index eb7b64e..c40beef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -83,7 +83,7 @@ abstract class DStream[T: ClassTag] (
 
   // RDDs generated, marked as private[streaming] so that testsuites can 
access it
   @transient
-  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
 
   // Time zero for the DStream
   private[streaming] var zeroTime: Time = null
@@ -269,7 +269,7 @@ abstract class DStream[T: ClassTag] (
       checkpointDuration == null || rememberDuration > checkpointDuration,
       s"The remember duration for ${this.getClass.getSimpleName} has been set 
to " +
         s" $rememberDuration which is not more than the checkpoint interval" +
-        s" ($checkpointDuration). Please set it to higher than 
$checkpointDuration."
+        s" ($checkpointDuration). Please set it to a value higher than 
$checkpointDuration."
     )
 
     dependencies.foreach(_.validateAtStart())
@@ -277,7 +277,7 @@ abstract class DStream[T: ClassTag] (
     logInfo(s"Slide time = $slideDuration")
     logInfo(s"Storage level = ${storageLevel.description}")
     logInfo(s"Checkpoint interval = $checkpointDuration")
-    logInfo(s"Remember duration = $rememberDuration")
+    logInfo(s"Remember interval = $rememberDuration")
     logInfo(s"Initialized and validated $this")
   }
 
@@ -535,7 +535,7 @@ abstract class DStream[T: ClassTag] (
   private def readObject(ois: ObjectInputStream): Unit = 
Utils.tryOrIOException {
     logDebug(s"${this.getClass().getSimpleName}.readObject used")
     ois.defaultReadObject()
-    generatedRDDs = new HashMap[Time, RDD[T]] ()
+    generatedRDDs = new HashMap[Time, RDD[T]]()
   }
 
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 365a6bc..431c9db 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time
 import org.apache.spark.util.Utils
 
 private[streaming]
-class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
   extends Serializable with Logging {
   protected val data = new HashMap[Time, AnyRef]()
 
@@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   /**
    * Updates the checkpoint data of the DStream. This gets called every time
    * the graph checkpoint is initiated. Default implementation records the
-   * checkpoint files to which the generate RDDs of the DStream has been saved.
+   * checkpoint files at which the generated RDDs of the DStream have been 
saved.
    */
   def update(time: Time) {
 
@@ -103,7 +103,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: 
DStream[T])
 
   /**
    * Restore the checkpoint data. This gets called once when the DStream graph
-   * (along with its DStreams) are being restored from a graph checkpoint file.
+   * (along with its output DStreams) is being restored from a graph 
checkpoint file.
    * Default implementation restores the RDDs from their checkpoint files.
    */
   def restore() {

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 0b6b191..dc88349 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
  *
  * @param _ssc Streaming context that will execute this input stream
  */
-abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
   extends DStream[T](_ssc) {
 
   private[streaming] var lastValidTime: Time = null
@@ -90,8 +90,8 @@ abstract class InputDStream[T: ClassTag] (_ssc: 
StreamingContext)
     } else {
       // Time is valid, but check it it is more than lastValidTime
       if (lastValidTime != null && time < lastValidTime) {
-        logWarning("isTimeValid called with " + time + " where as last valid 
time is " +
-          lastValidTime)
+        logWarning(s"isTimeValid called with $time whereas the last valid time 
" +
+          s"is $lastValidTime")
       }
       lastValidTime = time
       true

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index a9be2f2..a9e9383 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
     logDebug("Window time = " + windowDuration)
     logDebug("Slide time = " + slideDuration)
-    logDebug("ZeroTime = " + zeroTime)
+    logDebug("Zero time = " + zeroTime)
     logDebug("Current window = " + currentWindow)
     logDebug("Previous window = " + previousWindow)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 68eff89..0379957 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -70,7 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
         // Try to get the parent RDD
         parent.getOrCompute(validTime) match {
           case Some(parentRDD) => {   // If parent RDD exists, then compute as 
usual
-            computeUsingPreviousRDD (parentRDD, prevStateRDD)
+            computeUsingPreviousRDD(parentRDD, prevStateRDD)
           }
           case None => {    // If parent RDD does not exist
 
@@ -98,15 +98,15 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
                 // and then apply the update function
                 val updateFuncLocal = updateFunc
                 val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
-                  updateFuncLocal (iterator.map (tuple => (tuple._1, 
tuple._2.toSeq, None)))
+                  updateFuncLocal(iterator.map(tuple => (tuple._1, 
tuple._2.toSeq, None)))
                 }
 
-                val groupedRDD = parentRDD.groupByKey (partitioner)
-                val sessionRDD = groupedRDD.mapPartitions (finalFunc, 
preservePartitioning)
+                val groupedRDD = parentRDD.groupByKey(partitioner)
+                val sessionRDD = groupedRDD.mapPartitions(finalFunc, 
preservePartitioning)
                 // logDebug("Generating state RDD for time " + validTime + " 
(first)")
-                Some (sessionRDD)
+                Some(sessionRDD)
               }
-              case Some (initialStateRDD) => {
+              case Some(initialStateRDD) => {
                 computeUsingPreviousRDD(parentRDD, initialStateRDD)
               }
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 9c8e68b..5d9a8ac 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -119,7 +119,7 @@ private[streaming] class ReceivedBlockTracker(
         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
         lastAllocatedBatchTime = batchTime
       } else {
-        logInfo(s"Possibly processed batch $batchTime need to be processed 
again in WAL recovery")
+        logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
       }
     } else {
       // This situation occurs when:
@@ -129,7 +129,7 @@ private[streaming] class ReceivedBlockTracker(
       // 2. Slow checkpointing makes recovered batch time older than WAL 
recovered
       // lastAllocatedBatchTime.
       // This situation will only occurs in recovery time.
-      logInfo(s"Possibly processed batch $batchTime need to be processed again 
in WAL recovery")
+      logInfo(s"Possibly processed batch $batchTime needs to be processed 
again in WAL recovery")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index d7210f6..7b2ef68 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -21,18 +21,20 @@ import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Duration
 
 /**
- * A component that estimates the rate at wich an InputDStream should ingest
- * elements, based on updates at every batch completion.
+ * A component that estimates the rate at which an `InputDStream` should ingest
+ * records, based on updates at every batch completion.
+ *
+ * @see [[org.apache.spark.streaming.scheduler.RateController]]
  */
 private[streaming] trait RateEstimator extends Serializable {
 
   /**
-   * Computes the number of elements the stream attached to this 
`RateEstimator`
+   * Computes the number of records the stream attached to this `RateEstimator`
    * should ingest per second, given an update on the size and completion
    * times of the latest batch.
    *
-   * @param time The timetamp of the current batch interval that just finished
-   * @param elements The number of elements that were processed in this batch
+   * @param time The timestamp of the current batch interval that just finished
+   * @param elements The number of records that were processed in this batch
    * @param processingDelay The time in ms that took for the job to complete
    * @param schedulingDelay The time in ms that the job spent in the 
scheduling queue
    */
@@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable 
{
 object RateEstimator {
 
   /**
-   * Return a new RateEstimator based on the value of 
`spark.streaming.RateEstimator`.
+   * Return a new `RateEstimator` based on the value of
+   * `spark.streaming.backpressure.rateEstimator`.
    *
-   * The only known estimator right now is `pid`.
+   * The only known and acceptable estimator right now is `pid`.
    *
    * @return An instance of RateEstimator
-   * @throws IllegalArgumentException if there is a configured RateEstimator 
that doesn't match any
-   *         known estimators.
+   * @throws IllegalArgumentException if the configured RateEstimator is not 
`pid`.
    */
   def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
     conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
@@ -64,6 +66,6 @@ object RateEstimator {
         new PIDRateEstimator(batchInterval.milliseconds, proportional, 
integral, derived, minRate)
 
       case estimator =>
-        throw new IllegalArgumentException(s"Unkown rate estimator: 
$estimator")
+        throw new IllegalArgumentException(s"Unknown rate estimator: 
$estimator")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to