Repository: spark
Updated Branches:
  refs/heads/master c035c0f2d -> c25ca7c5a


SPARK-3276 Added a new configuration spark.streaming.minRememberDuration

SPARK-3276 Added a new configuration parameter 
``spark.streaming.minRememberDuration``, with a default value of 1 minute.

So that when a Spark Streaming application is started, an arbitrary number of 
minutes can be taken as threshold for remembering.

Author: emres <emre.sev...@gmail.com>

Closes #5438 from emres/SPARK-3276 and squashes the following commits:

766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds 
method.
affee1d [emres] SPARK-3276 Changed the property name and variable name for 
minRememberDuration
c9d58ca [emres] SPARK-3276 Minor code re-formatting.
1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than 
ssc.sparkContext.getConf,  and also getLong method directly.
bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class
daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of 
value and reduced number of fields.
43cc1ce [emres] SPARK-3276 Added a new configuration parameter 
spark.streaming.minRemember duration, with a default value of 1 minute.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c25ca7c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c25ca7c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c25ca7c5

Branch: refs/heads/master
Commit: c25ca7c5a1f2a4f88f40b0c5cdbfa927c186cfa8
Parents: c035c0f
Author: emres <emre.sev...@gmail.com>
Authored: Tue Apr 21 16:39:56 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Apr 21 16:39:56 2015 -0400

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 30 +++++++++++---------
 1 file changed, 17 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c25ca7c5/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 66d5191..eca69f0 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
-import org.apache.spark.SerializableWritable
+import org.apache.spark.{SparkConf, SerializableWritable}
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.util.{TimeStampedHashMap, Utils}
@@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
  *   the streaming app.
  * - If a file is to be visible in the directory listings, it must be visible 
within a certain
  *   duration of the mod time of the file. This duration is the "remember 
window", which is set to
- *   1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the 
file will never be
+ *   1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the 
file will never be
  *   selected as the mod time will be less than the ignore threshold when it 
becomes visible.
  * - Once a file is visible, the mod time cannot change. If it does due to 
appends, then the
  *   processing semantics are undefined.
@@ -80,6 +80,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 
   private val serializableConfOpt = conf.map(new SerializableWritable(_))
 
+  /**
+   * Minimum duration of remembering the information of selected files. 
Defaults to 60 seconds.
+   *
+   * Files with mod times older than this "window" of remembering will be 
ignored. So if new
+   * files are visible within this window, then the file will get selected in 
the next batch.
+   */
+  private val minRememberDurationS =
+    Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", 
"60s"))
+
   // This is a def so that it works during checkpoint recovery:
   private def clock = ssc.scheduler.clock
 
@@ -95,7 +104,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
    * This would allow us to filter away not-too-old files which have already 
been recently
    * selected and processed.
    */
-  private val numBatchesToRemember = 
FileInputDStream.calculateNumBatchesToRemember(slideDuration)
+  private val numBatchesToRemember = FileInputDStream
+    .calculateNumBatchesToRemember(slideDuration, minRememberDurationS)
   private val durationToRemember = slideDuration * numBatchesToRemember
   remember(durationToRemember)
 
@@ -330,20 +340,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 private[streaming]
 object FileInputDStream {
 
-  /**
-   * Minimum duration of remembering the information of selected files. Files 
with mod times
-   * older than this "window" of remembering will be ignored. So if new files 
are visible
-   * within this window, then the file will get selected in the next batch.
-   */
-  private val MIN_REMEMBER_DURATION = Minutes(1)
-
   def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
 
   /**
    * Calculate the number of last batches to remember, such that all the files 
selected in
-   * at least last MIN_REMEMBER_DURATION duration can be remembered.
+   * at least last minRememberDurationS duration can be remembered.
    */
-  def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
-    math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / 
batchDuration.milliseconds).toInt
+  def calculateNumBatchesToRemember(batchDuration: Duration,
+                                    minRememberDurationS: Duration): Int = {
+    math.ceil(minRememberDurationS.milliseconds.toDouble / 
batchDuration.milliseconds).toInt
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to