I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
get the serialized behavior by using default scheduler when there is
failure and retry
so I created a customized stream like this.

class EachSeqRDD[T: ClassTag] (
    parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
  ) extends DStream[Unit](parent.ssc) {

  override def slideDuration: Duration = parent.slideDuration

  override def dependencies: List[DStream[_]] = List(parent)

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override private[streaming] def generateJob(time: Time): Option[Job] = {
    val pendingJobs = ssc.scheduler.getPendingTimes().size
    logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
    // do not generate new RDD if there is pending job
    if (pendingJobs == 0) {
      parent.getOrCompute(time) match {
        case Some(rdd) => {
          val jobFunc = () => {
            ssc.sparkContext.setCallSite(creationSite)
            eachSeqFunc(rdd, time)
          }
          Some(new Job(time, jobFunc))
        }
        case None => None
      }
    }
    else {
      None
    }
  }
}
object DStreamEx {
  implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
    def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
      // because the DStream is reachable from the outer object here,
and because
      // DStreams can't be serialized with closures, we can't proactively check
      // it for serializability and so we pass the optional false to
SparkContext.clean
      new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
false)).register()
    }
  }
}

-Binh
​

On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <mici...@gmail.com> wrote:

> Tathagata, thanks for your response. You are right! Everything seems
> to work as expected.
>
> Please could help me understand why the time for processing of all
> jobs for a batch is always less than 4 seconds?
>
> Please see my playground code below.
>
> The last modified time of the input (lines) RDD dump files seems to
> match the Thread.sleep delays (20s or 5s) in the transform operation
> or the batching interval (10s): 20s, 5s, 10s.
>
> However, neither the batch processing time in the Streaming tab nor
> the last modified time of the output (words) RDD dump files reflect
> the Thread.sleep delays.
>
> 07:20       3240  001_lines_...
>       07:21 117   001_words_...
> 07:41       37224 002_lines_...
>       07:43 252   002_words_...
> 08:00       37728 003_lines_...
>       08:02 504   003_words_...
> 08:20       38952 004_lines_...
>       08:22 756   004_words_...
> 08:40       38664 005_lines_...
>       08:42 999   005_words_...
> 08:45       38160 006_lines_...
>       08:47 1134  006_words_...
> 08:50       9720  007_lines_...
>       08:51 1260  007_words_...
> 08:55       9864  008_lines_...
>       08:56 1260  008_words_...
> 09:00       10656 009_lines_...
>       09:01 1395  009_words_...
> 09:05       11664 010_lines_...
>       09:06 1395  010_words_...
> 09:11       10935 011_lines_...
>       09:11 1521  011_words_...
> 09:16       11745 012_lines_...
>       09:16 1530  012_words_...
> 09:21       12069 013_lines_...
>       09:22 1656  013_words_...
> 09:27       10692 014_lines_...
>       09:27 1665  014_words_...
> 09:32       10449 015_lines_...
>       09:32 1791  015_words_...
> 09:37       11178 016_lines_...
>       09:37 1800  016_words_...
> 09:45       17496 017_lines_...
>       09:45 1926  017_words_...
> 09:55       22032 018_lines_...
>       09:56 2061  018_words_...
> 10:05       21951 019_lines_...
>       10:06 2196  019_words_...
> 10:15       21870 020_lines_...
>       10:16 2322  020_words_...
> 10:25       21303 021_lines_...
>       10:26 2340  021_words_...
>
>
> final SparkConf conf = new
> SparkConf().setMaster("local[4]").setAppName("WordCount");
> try (final JavaStreamingContext context = new
> JavaStreamingContext(conf, Durations.seconds(10))) {
>
>     context.checkpoint("/tmp/checkpoint");
>
>     final JavaDStream<String> lines = context.union(
>         context.receiverStream(new GeneratorReceiver()),
>         ImmutableList.of(
>             context.receiverStream(new GeneratorReceiver()),
>             context.receiverStream(new GeneratorReceiver())));
>
>     lines.print();
>
>     final Accumulator<Integer> lineRddIndex =
> context.sparkContext().accumulator(0);
>     lines.foreachRDD( rdd -> {
>         lineRddIndex.add(1);
>         final String prefix = "/tmp/" + String.format("%03d",
> lineRddIndex.localValue()) + "_lines_";
>         try (final PrintStream out = new PrintStream(prefix +
> UUID.randomUUID())) {
>             rdd.collect().forEach(s -> out.println(s));
>         }
>         return null;
>     });
>
>     final JavaDStream<String> words =
>         lines.flatMap(x -> Arrays.asList(x.split(" ")));
>     final JavaPairDStream<String, Integer> pairs =
>         words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
>     final JavaPairDStream<String, Integer> wordCounts =
>         pairs.reduceByKey((i1, i2) -> i1 + i2);
>
>     final Accumulator<Integer> sleep =
> context.sparkContext().accumulator(0);
>     final JavaPairDStream<String, Integer> wordCounts2 =
> JavaPairDStream.fromJavaDStream(
>         wordCounts.transform( (rdd) -> {
>             sleep.add(1);
>             Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
>             return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
>         }));
>
>     final Function2<List<Integer>, Optional<Integer>,
> Optional<Integer>> updateFunction =
>         (values, state) -> {
>             Integer newSum = state.or(0);
>             for (final Integer value : values) {
>                 newSum += value;
>             }
>             return Optional.of(newSum);
>         };
>
>     final List<Tuple2<String, Integer>> tuples =
> ImmutableList.<Tuple2<String, Integer>> of();
>     final JavaPairRDD<String, Integer> initialRDD =
> context.sparkContext().parallelizePairs(tuples);
>
>     final JavaPairDStream<String, Integer> wordCountsState =
>         wordCounts2.updateStateByKey(
>              updateFunction,
>              new
> HashPartitioner(context.sparkContext().defaultParallelism()),
> initialRDD);
>
>     wordCountsState.print();
>
>     final Accumulator<Integer> rddIndex =
> context.sparkContext().accumulator(0);
>     wordCountsState.foreachRDD( rdd -> {
>         rddIndex.add(1);
>         final String prefix = "/tmp/" + String.format("%03d",
> rddIndex.localValue()) + "_words_";
>         try (final PrintStream out = new PrintStream(prefix +
> UUID.randomUUID())) {
>             rdd.collect().forEach(s -> out.println(s));
>         }
>         return null;
>     });
>
>     context.start();
>     context.awaitTermination();
> }
>
>
> On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote:
> > The default behavior should be that batch X + 1 starts processing only
> after
> > batch X completes. If you are using Spark 1.4.0, could you show us a
> > screenshot of the streaming tab, especially the list of batches? And
> could
> > you also tell us if you are setting any SparkConf configurations?
> >
> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <mici...@gmail.com>
> wrote:
> >>
> >> Is it possible to achieve serial batching with Spark Streaming?
> >>
> >> Example:
> >>
> >> I configure the Streaming Context for creating a batch every 3 seconds.
> >>
> >> Processing of the batch #2 takes longer than 3 seconds and creates a
> >> backlog of batches:
> >>
> >> batch #1 takes 2s
> >> batch #2 takes 10s
> >> batch #3 takes 2s
> >> batch #4 takes 2s
> >>
> >> Whet testing locally, it seems that processing of multiple batches is
> >> finished at the same time:
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 12s (processed in parallel)
> >> batch #4 finished at 15s
> >>
> >> How can I delay processing of the next batch, so that is processed
> >> only after processing of the previous batch has been completed?
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 14s (processed serially)
> >> batch #4 finished at 16s
> >>
> >> I want to perform a transformation for every key only once in a given
> >> period of time (e.g. batch duration). I find all unique keys in a
> >> batch and perform the transformation on each key. To ensure that the
> >> transformation is done for every key only once, only one batch can be
> >> processed at a time. At the same time, I want that single batch to be
> >> processed in parallel.
> >>
> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
> >> stream = context.receiverStream(...);
> >> stream
> >>     .reduceByKey(...)
> >>     .transform(...)
> >>     .foreachRDD(output);
> >>
> >> Any ideas or pointers are very welcome.
> >>
> >> Thanks!
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to