I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this.
class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None => None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) => Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com> wrote: > Tathagata, thanks for your response. You are right! Everything seems > to work as expected. > > Please could help me understand why the time for processing of all > jobs for a batch is always less than 4 seconds? > > Please see my playground code below. > > The last modified time of the input (lines) RDD dump files seems to > match the Thread.sleep delays (20s or 5s) in the transform operation > or the batching interval (10s): 20s, 5s, 10s. > > However, neither the batch processing time in the Streaming tab nor > the last modified time of the output (words) RDD dump files reflect > the Thread.sleep delays. > > 07:20 3240 001_lines_... > 07:21 117 001_words_... > 07:41 37224 002_lines_... > 07:43 252 002_words_... > 08:00 37728 003_lines_... > 08:02 504 003_words_... > 08:20 38952 004_lines_... > 08:22 756 004_words_... > 08:40 38664 005_lines_... > 08:42 999 005_words_... > 08:45 38160 006_lines_... > 08:47 1134 006_words_... > 08:50 9720 007_lines_... > 08:51 1260 007_words_... > 08:55 9864 008_lines_... > 08:56 1260 008_words_... > 09:00 10656 009_lines_... > 09:01 1395 009_words_... > 09:05 11664 010_lines_... > 09:06 1395 010_words_... > 09:11 10935 011_lines_... > 09:11 1521 011_words_... > 09:16 11745 012_lines_... > 09:16 1530 012_words_... > 09:21 12069 013_lines_... > 09:22 1656 013_words_... > 09:27 10692 014_lines_... > 09:27 1665 014_words_... > 09:32 10449 015_lines_... > 09:32 1791 015_words_... > 09:37 11178 016_lines_... > 09:37 1800 016_words_... > 09:45 17496 017_lines_... > 09:45 1926 017_words_... > 09:55 22032 018_lines_... > 09:56 2061 018_words_... > 10:05 21951 019_lines_... > 10:06 2196 019_words_... > 10:15 21870 020_lines_... > 10:16 2322 020_words_... > 10:25 21303 021_lines_... > 10:26 2340 021_words_... > > > final SparkConf conf = new > SparkConf().setMaster("local[4]").setAppName("WordCount"); > try (final JavaStreamingContext context = new > JavaStreamingContext(conf, Durations.seconds(10))) { > > context.checkpoint("/tmp/checkpoint"); > > final JavaDStream<String> lines = context.union( > context.receiverStream(new GeneratorReceiver()), > ImmutableList.of( > context.receiverStream(new GeneratorReceiver()), > context.receiverStream(new GeneratorReceiver()))); > > lines.print(); > > final Accumulator<Integer> lineRddIndex = > context.sparkContext().accumulator(0); > lines.foreachRDD( rdd -> { > lineRddIndex.add(1); > final String prefix = "/tmp/" + String.format("%03d", > lineRddIndex.localValue()) + "_lines_"; > try (final PrintStream out = new PrintStream(prefix + > UUID.randomUUID())) { > rdd.collect().forEach(s -> out.println(s)); > } > return null; > }); > > final JavaDStream<String> words = > lines.flatMap(x -> Arrays.asList(x.split(" "))); > final JavaPairDStream<String, Integer> pairs = > words.mapToPair(s -> new Tuple2<String, Integer>(s, 1)); > final JavaPairDStream<String, Integer> wordCounts = > pairs.reduceByKey((i1, i2) -> i1 + i2); > > final Accumulator<Integer> sleep = > context.sparkContext().accumulator(0); > final JavaPairDStream<String, Integer> wordCounts2 = > JavaPairDStream.fromJavaDStream( > wordCounts.transform( (rdd) -> { > sleep.add(1); > Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000); > return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag()); > })); > > final Function2<List<Integer>, Optional<Integer>, > Optional<Integer>> updateFunction = > (values, state) -> { > Integer newSum = state.or(0); > for (final Integer value : values) { > newSum += value; > } > return Optional.of(newSum); > }; > > final List<Tuple2<String, Integer>> tuples = > ImmutableList.<Tuple2<String, Integer>> of(); > final JavaPairRDD<String, Integer> initialRDD = > context.sparkContext().parallelizePairs(tuples); > > final JavaPairDStream<String, Integer> wordCountsState = > wordCounts2.updateStateByKey( > updateFunction, > new > HashPartitioner(context.sparkContext().defaultParallelism()), > initialRDD); > > wordCountsState.print(); > > final Accumulator<Integer> rddIndex = > context.sparkContext().accumulator(0); > wordCountsState.foreachRDD( rdd -> { > rddIndex.add(1); > final String prefix = "/tmp/" + String.format("%03d", > rddIndex.localValue()) + "_words_"; > try (final PrintStream out = new PrintStream(prefix + > UUID.randomUUID())) { > rdd.collect().forEach(s -> out.println(s)); > } > return null; > }); > > context.start(); > context.awaitTermination(); > } > > > On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote: > > The default behavior should be that batch X + 1 starts processing only > after > > batch X completes. If you are using Spark 1.4.0, could you show us a > > screenshot of the streaming tab, especially the list of batches? And > could > > you also tell us if you are setting any SparkConf configurations? > > > > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <mici...@gmail.com> > wrote: > >> > >> Is it possible to achieve serial batching with Spark Streaming? > >> > >> Example: > >> > >> I configure the Streaming Context for creating a batch every 3 seconds. > >> > >> Processing of the batch #2 takes longer than 3 seconds and creates a > >> backlog of batches: > >> > >> batch #1 takes 2s > >> batch #2 takes 10s > >> batch #3 takes 2s > >> batch #4 takes 2s > >> > >> Whet testing locally, it seems that processing of multiple batches is > >> finished at the same time: > >> > >> batch #1 finished at 2s > >> batch #2 finished at 12s > >> batch #3 finished at 12s (processed in parallel) > >> batch #4 finished at 15s > >> > >> How can I delay processing of the next batch, so that is processed > >> only after processing of the previous batch has been completed? > >> > >> batch #1 finished at 2s > >> batch #2 finished at 12s > >> batch #3 finished at 14s (processed serially) > >> batch #4 finished at 16s > >> > >> I want to perform a transformation for every key only once in a given > >> period of time (e.g. batch duration). I find all unique keys in a > >> batch and perform the transformation on each key. To ensure that the > >> transformation is done for every key only once, only one batch can be > >> processed at a time. At the same time, I want that single batch to be > >> processed in parallel. > >> > >> context = new JavaStreamingContext(conf, Durations.seconds(10)); > >> stream = context.receiverStream(...); > >> stream > >> .reduceByKey(...) > >> .transform(...) > >> .foreachRDD(output); > >> > >> Any ideas or pointers are very welcome. > >> > >> Thanks! > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >