I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
get the serialized behavior by using default scheduler when there is
failure and retry
so I created a customized stream like this.
class EachSeqRDD[T: ClassTag] (
parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit
) extends DStream[Unit](parent.ssc) {
override def slideDuration: Duration = parent.slideDuration
override def dependencies: List[DStream[_]] = List(parent)
override def compute(validTime: Time): Option[RDD[Unit]] = None
override private[streaming] def generateJob(time: Time): Option[Job] = {
val pendingJobs = ssc.scheduler.getPendingTimes().size
logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time))
// do not generate new RDD if there is pending job
if (pendingJobs == 0) {
parent.getOrCompute(time) match {
case Some(rdd) = {
val jobFunc = () = {
ssc.sparkContext.setCallSite(creationSite)
eachSeqFunc(rdd, time)
}
Some(new Job(time, jobFunc))
}
case None = None
}
}
else {
None
}
}
}
object DStreamEx {
implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
def eachSeqRDD(func: (RDD[T], Time) = Unit) = {
// because the DStream is reachable from the outer object here,
and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to
SparkContext.clean
new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
false)).register()
}
}
}
-Binh
On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote:
Tathagata, thanks for your response. You are right! Everything seems
to work as expected.
Please could help me understand why the time for processing of all
jobs for a batch is always less than 4 seconds?
Please see my playground code below.
The last modified time of the input (lines) RDD dump files seems to
match the Thread.sleep delays (20s or 5s) in the transform operation
or the batching interval (10s): 20s, 5s, 10s.
However, neither the batch processing time in the Streaming tab nor
the last modified time of the output (words) RDD dump files reflect
the Thread.sleep delays.
07:20 3240 001_lines_...
07:21 117 001_words_...
07:41 37224 002_lines_...
07:43 252 002_words_...
08:00 37728 003_lines_...
08:02 504 003_words_...
08:20 38952 004_lines_...
08:22 756 004_words_...
08:40 38664 005_lines_...
08:42 999 005_words_...
08:45 38160 006_lines_...
08:47 1134 006_words_...
08:50 9720 007_lines_...
08:51 1260 007_words_...
08:55 9864 008_lines_...
08:56 1260 008_words_...
09:00 10656 009_lines_...
09:01 1395 009_words_...
09:05 11664 010_lines_...
09:06 1395 010_words_...
09:11 10935 011_lines_...
09:11 1521 011_words_...
09:16 11745 012_lines_...
09:16 1530 012_words_...
09:21 12069 013_lines_...
09:22 1656 013_words_...
09:27 10692 014_lines_...
09:27 1665 014_words_...
09:32 10449 015_lines_...
09:32 1791 015_words_...
09:37 11178 016_lines_...
09:37 1800 016_words_...
09:45 17496 017_lines_...
09:45 1926 017_words_...
09:55 22032 018_lines_...
09:56 2061 018_words_...
10:05 21951 019_lines_...
10:06 2196 019_words_...
10:15 21870 020_lines_...
10:16 2322 020_words_...
10:25 21303 021_lines_...
10:26 2340 021_words_...
final SparkConf conf = new
SparkConf().setMaster(local[4]).setAppName(WordCount);
try (final JavaStreamingContext context = new
JavaStreamingContext(conf, Durations.seconds(10))) {
context.checkpoint(/tmp/checkpoint);
final JavaDStreamString lines = context.union(
context.receiverStream(new GeneratorReceiver()),
ImmutableList.of(
context.receiverStream(new GeneratorReceiver()),
context.receiverStream(new GeneratorReceiver(;
lines.print();
final AccumulatorInteger lineRddIndex =
context.sparkContext().accumulator(0);
lines.foreachRDD( rdd - {
lineRddIndex.add(1);
final String prefix = /tmp/ + String.format(%03d,
lineRddIndex.localValue()) + _lines_;
try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
rdd.collect().forEach(s - out.println(s));
}
return null;
});
final JavaDStreamString words =
lines.flatMap(x - Arrays.asList(x.split( )));
final JavaPairDStreamString, Integer pairs =
words.mapToPair(s - new Tuple2String, Integer(s, 1));
final JavaPairDStreamString, Integer wordCounts =
pairs.reduceByKey((i1, i2) - i1 + i2);
final AccumulatorInteger sleep =