Repository: spark Updated Branches: refs/heads/master c035c0f2d -> c25ca7c5a
SPARK-3276 Added a new configuration spark.streaming.minRememberDuration SPARK-3276 Added a new configuration parameter ``spark.streaming.minRememberDuration``, with a default value of 1 minute. So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering. Author: emres <emre.sev...@gmail.com> Closes #5438 from emres/SPARK-3276 and squashes the following commits: 766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds method. affee1d [emres] SPARK-3276 Changed the property name and variable name for minRememberDuration c9d58ca [emres] SPARK-3276 Minor code re-formatting. 1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf, and also getLong method directly. bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields. 43cc1ce [emres] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c25ca7c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c25ca7c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c25ca7c5 Branch: refs/heads/master Commit: c25ca7c5a1f2a4f88f40b0c5cdbfa927c186cfa8 Parents: c035c0f Author: emres <emre.sev...@gmail.com> Authored: Tue Apr 21 16:39:56 2015 -0400 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Apr 21 16:39:56 2015 -0400 ---------------------------------------------------------------------- .../streaming/dstream/FileInputDStream.scala | 30 +++++++++++--------- 1 file changed, 17 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c25ca7c5/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 66d5191..eca69f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkConf, SerializableWritable} import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * the streaming app. * - If a file is to be visible in the directory listings, it must be visible within a certain * duration of the mod time of the file. This duration is the "remember window", which is set to - * 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be + * 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be * selected as the mod time will be less than the ignore threshold when it becomes visible. * - Once a file is visible, the mod time cannot change. If it does due to appends, then the * processing semantics are undefined. @@ -80,6 +80,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private val serializableConfOpt = conf.map(new SerializableWritable(_)) + /** + * Minimum duration of remembering the information of selected files. Defaults to 60 seconds. + * + * Files with mod times older than this "window" of remembering will be ignored. So if new + * files are visible within this window, then the file will get selected in the next batch. + */ + private val minRememberDurationS = + Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s")) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -95,7 +104,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * This would allow us to filter away not-too-old files which have already been recently * selected and processed. */ - private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration) + private val numBatchesToRemember = FileInputDStream + .calculateNumBatchesToRemember(slideDuration, minRememberDurationS) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -330,20 +340,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private[streaming] object FileInputDStream { - /** - * Minimum duration of remembering the information of selected files. Files with mod times - * older than this "window" of remembering will be ignored. So if new files are visible - * within this window, then the file will get selected in the next batch. - */ - private val MIN_REMEMBER_DURATION = Minutes(1) - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") /** * Calculate the number of last batches to remember, such that all the files selected in - * at least last MIN_REMEMBER_DURATION duration can be remembered. + * at least last minRememberDurationS duration can be remembered. */ - def calculateNumBatchesToRemember(batchDuration: Duration): Int = { - math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt + def calculateNumBatchesToRemember(batchDuration: Duration, + minRememberDurationS: Duration): Int = { + math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org