Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then.
Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 19 June 2015 at 16:58, Tathagata Das <t...@databricks.com> wrote: > I see what is the problem. You are adding sleep in the transform > operation. The transform function is called at the time of preparing the > Spark jobs for a batch. It should not be running any time consuming > operation like a RDD action or a sleep. Since this operation needs to run > every batch interval, doing blocking long running operation messes with the > need to run every batch interval. > > I will try to make this clearer in the guide. I had not seen anyone do > something like this before and therefore it did not occur to me that this > could happen. As long as you dont do time consuming blocking operation in > the transform function, the batches will be generated, scheduled and > executed in serial order by default. > > On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia <mici...@gmail.com> > wrote: > >> Binh, thank you very much for your comment and code. Please could you >> outline an example use of your stream? I am a newbie to Spark. Thanks again! >> >> On 18 June 2015 at 14:29, Binh Nguyen Van <binhn...@gmail.com> wrote: >> >>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could >>> not get the serialized behavior by using default scheduler when there is >>> failure and retry >>> so I created a customized stream like this. >>> >>> class EachSeqRDD[T: ClassTag] ( >>> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit >>> ) extends DStream[Unit](parent.ssc) { >>> >>> override def slideDuration: Duration = parent.slideDuration >>> >>> override def dependencies: List[DStream[_]] = List(parent) >>> >>> override def compute(validTime: Time): Option[RDD[Unit]] = None >>> >>> override private[streaming] def generateJob(time: Time): Option[Job] = { >>> val pendingJobs = ssc.scheduler.getPendingTimes().size >>> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) >>> // do not generate new RDD if there is pending job >>> if (pendingJobs == 0) { >>> parent.getOrCompute(time) match { >>> case Some(rdd) => { >>> val jobFunc = () => { >>> ssc.sparkContext.setCallSite(creationSite) >>> eachSeqFunc(rdd, time) >>> } >>> Some(new Job(time, jobFunc)) >>> } >>> case None => None >>> } >>> } >>> else { >>> None >>> } >>> } >>> } >>> object DStreamEx { >>> implicit class EDStream[T: ClassTag](dStream: DStream[T]) { >>> def eachSeqRDD(func: (RDD[T], Time) => Unit) = { >>> // because the DStream is reachable from the outer object here, and >>> because >>> // DStreams can't be serialized with closures, we can't proactively >>> check >>> // it for serializability and so we pass the optional false to >>> SparkContext.clean >>> new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, >>> false)).register() >>> } >>> } >>> } >>> >>> -Binh >>> >>> >>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com> >>> wrote: >>> >>>> Tathagata, thanks for your response. You are right! Everything seems >>>> to work as expected. >>>> >>>> Please could help me understand why the time for processing of all >>>> jobs for a batch is always less than 4 seconds? >>>> >>>> Please see my playground code below. >>>> >>>> The last modified time of the input (lines) RDD dump files seems to >>>> match the Thread.sleep delays (20s or 5s) in the transform operation >>>> or the batching interval (10s): 20s, 5s, 10s. >>>> >>>> However, neither the batch processing time in the Streaming tab nor >>>> the last modified time of the output (words) RDD dump files reflect >>>> the Thread.sleep delays. >>>> >>>> 07:20 3240 001_lines_... >>>> 07:21 117 001_words_... >>>> 07:41 37224 002_lines_... >>>> 07:43 252 002_words_... >>>> 08:00 37728 003_lines_... >>>> 08:02 504 003_words_... >>>> 08:20 38952 004_lines_... >>>> 08:22 756 004_words_... >>>> 08:40 38664 005_lines_... >>>> 08:42 999 005_words_... >>>> 08:45 38160 006_lines_... >>>> 08:47 1134 006_words_... >>>> 08:50 9720 007_lines_... >>>> 08:51 1260 007_words_... >>>> 08:55 9864 008_lines_... >>>> 08:56 1260 008_words_... >>>> 09:00 10656 009_lines_... >>>> 09:01 1395 009_words_... >>>> 09:05 11664 010_lines_... >>>> 09:06 1395 010_words_... >>>> 09:11 10935 011_lines_... >>>> 09:11 1521 011_words_... >>>> 09:16 11745 012_lines_... >>>> 09:16 1530 012_words_... >>>> 09:21 12069 013_lines_... >>>> 09:22 1656 013_words_... >>>> 09:27 10692 014_lines_... >>>> 09:27 1665 014_words_... >>>> 09:32 10449 015_lines_... >>>> 09:32 1791 015_words_... >>>> 09:37 11178 016_lines_... >>>> 09:37 1800 016_words_... >>>> 09:45 17496 017_lines_... >>>> 09:45 1926 017_words_... >>>> 09:55 22032 018_lines_... >>>> 09:56 2061 018_words_... >>>> 10:05 21951 019_lines_... >>>> 10:06 2196 019_words_... >>>> 10:15 21870 020_lines_... >>>> 10:16 2322 020_words_... >>>> 10:25 21303 021_lines_... >>>> 10:26 2340 021_words_... >>>> >>>> >>>> final SparkConf conf = new >>>> SparkConf().setMaster("local[4]").setAppName("WordCount"); >>>> try (final JavaStreamingContext context = new >>>> JavaStreamingContext(conf, Durations.seconds(10))) { >>>> >>>> context.checkpoint("/tmp/checkpoint"); >>>> >>>> final JavaDStream<String> lines = context.union( >>>> context.receiverStream(new GeneratorReceiver()), >>>> ImmutableList.of( >>>> context.receiverStream(new GeneratorReceiver()), >>>> context.receiverStream(new GeneratorReceiver()))); >>>> >>>> lines.print(); >>>> >>>> final Accumulator<Integer> lineRddIndex = >>>> context.sparkContext().accumulator(0); >>>> lines.foreachRDD( rdd -> { >>>> lineRddIndex.add(1); >>>> final String prefix = "/tmp/" + String.format("%03d", >>>> lineRddIndex.localValue()) + "_lines_"; >>>> try (final PrintStream out = new PrintStream(prefix + >>>> UUID.randomUUID())) { >>>> rdd.collect().forEach(s -> out.println(s)); >>>> } >>>> return null; >>>> }); >>>> >>>> final JavaDStream<String> words = >>>> lines.flatMap(x -> Arrays.asList(x.split(" "))); >>>> final JavaPairDStream<String, Integer> pairs = >>>> words.mapToPair(s -> new Tuple2<String, Integer>(s, 1)); >>>> final JavaPairDStream<String, Integer> wordCounts = >>>> pairs.reduceByKey((i1, i2) -> i1 + i2); >>>> >>>> final Accumulator<Integer> sleep = >>>> context.sparkContext().accumulator(0); >>>> final JavaPairDStream<String, Integer> wordCounts2 = >>>> JavaPairDStream.fromJavaDStream( >>>> wordCounts.transform( (rdd) -> { >>>> sleep.add(1); >>>> Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000); >>>> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >>>> rdd.classTag()); >>>> })); >>>> >>>> final Function2<List<Integer>, Optional<Integer>, >>>> Optional<Integer>> updateFunction = >>>> (values, state) -> { >>>> Integer newSum = state.or(0); >>>> for (final Integer value : values) { >>>> newSum += value; >>>> } >>>> return Optional.of(newSum); >>>> }; >>>> >>>> final List<Tuple2<String, Integer>> tuples = >>>> ImmutableList.<Tuple2<String, Integer>> of(); >>>> final JavaPairRDD<String, Integer> initialRDD = >>>> context.sparkContext().parallelizePairs(tuples); >>>> >>>> final JavaPairDStream<String, Integer> wordCountsState = >>>> wordCounts2.updateStateByKey( >>>> updateFunction, >>>> new >>>> HashPartitioner(context.sparkContext().defaultParallelism()), >>>> initialRDD); >>>> >>>> wordCountsState.print(); >>>> >>>> final Accumulator<Integer> rddIndex = >>>> context.sparkContext().accumulator(0); >>>> wordCountsState.foreachRDD( rdd -> { >>>> rddIndex.add(1); >>>> final String prefix = "/tmp/" + String.format("%03d", >>>> rddIndex.localValue()) + "_words_"; >>>> try (final PrintStream out = new PrintStream(prefix + >>>> UUID.randomUUID())) { >>>> rdd.collect().forEach(s -> out.println(s)); >>>> } >>>> return null; >>>> }); >>>> >>>> context.start(); >>>> context.awaitTermination(); >>>> } >>>> >>>> >>>> On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote: >>>> > The default behavior should be that batch X + 1 starts processing >>>> only after >>>> > batch X completes. If you are using Spark 1.4.0, could you show us a >>>> > screenshot of the streaming tab, especially the list of batches? And >>>> could >>>> > you also tell us if you are setting any SparkConf configurations? >>>> > >>>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <mici...@gmail.com> >>>> wrote: >>>> >> >>>> >> Is it possible to achieve serial batching with Spark Streaming? >>>> >> >>>> >> Example: >>>> >> >>>> >> I configure the Streaming Context for creating a batch every 3 >>>> seconds. >>>> >> >>>> >> Processing of the batch #2 takes longer than 3 seconds and creates a >>>> >> backlog of batches: >>>> >> >>>> >> batch #1 takes 2s >>>> >> batch #2 takes 10s >>>> >> batch #3 takes 2s >>>> >> batch #4 takes 2s >>>> >> >>>> >> Whet testing locally, it seems that processing of multiple batches is >>>> >> finished at the same time: >>>> >> >>>> >> batch #1 finished at 2s >>>> >> batch #2 finished at 12s >>>> >> batch #3 finished at 12s (processed in parallel) >>>> >> batch #4 finished at 15s >>>> >> >>>> >> How can I delay processing of the next batch, so that is processed >>>> >> only after processing of the previous batch has been completed? >>>> >> >>>> >> batch #1 finished at 2s >>>> >> batch #2 finished at 12s >>>> >> batch #3 finished at 14s (processed serially) >>>> >> batch #4 finished at 16s >>>> >> >>>> >> I want to perform a transformation for every key only once in a given >>>> >> period of time (e.g. batch duration). I find all unique keys in a >>>> >> batch and perform the transformation on each key. To ensure that the >>>> >> transformation is done for every key only once, only one batch can be >>>> >> processed at a time. At the same time, I want that single batch to be >>>> >> processed in parallel. >>>> >> >>>> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >>>> >> stream = context.receiverStream(...); >>>> >> stream >>>> >> .reduceByKey(...) >>>> >> .transform(...) >>>> >> .foreachRDD(output); >>>> >> >>>> >> Any ideas or pointers are very welcome. >>>> >> >>>> >> Thanks! >>>> >> >>>> >> --------------------------------------------------------------------- >>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>> >> >>>> > >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >