Re: Serial batching with Spark Streaming

2015-06-18 Thread Binh Nguyen Van
I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
get the serialized behavior by using default scheduler when there is
failure and retry
so I created a customized stream like this.

class EachSeqRDD[T: ClassTag] (
parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit
  ) extends DStream[Unit](parent.ssc) {

  override def slideDuration: Duration = parent.slideDuration

  override def dependencies: List[DStream[_]] = List(parent)

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override private[streaming] def generateJob(time: Time): Option[Job] = {
val pendingJobs = ssc.scheduler.getPendingTimes().size
logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time))
// do not generate new RDD if there is pending job
if (pendingJobs == 0) {
  parent.getOrCompute(time) match {
case Some(rdd) = {
  val jobFunc = () = {
ssc.sparkContext.setCallSite(creationSite)
eachSeqFunc(rdd, time)
  }
  Some(new Job(time, jobFunc))
}
case None = None
  }
}
else {
  None
}
  }
}
object DStreamEx {
  implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
def eachSeqRDD(func: (RDD[T], Time) = Unit) = {
  // because the DStream is reachable from the outer object here,
and because
  // DStreams can't be serialized with closures, we can't proactively check
  // it for serializability and so we pass the optional false to
SparkContext.clean
  new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
false)).register()
}
  }
}

-Binh
​

On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote:

 Tathagata, thanks for your response. You are right! Everything seems
 to work as expected.

 Please could help me understand why the time for processing of all
 jobs for a batch is always less than 4 seconds?

 Please see my playground code below.

 The last modified time of the input (lines) RDD dump files seems to
 match the Thread.sleep delays (20s or 5s) in the transform operation
 or the batching interval (10s): 20s, 5s, 10s.

 However, neither the batch processing time in the Streaming tab nor
 the last modified time of the output (words) RDD dump files reflect
 the Thread.sleep delays.

 07:20   3240  001_lines_...
   07:21 117   001_words_...
 07:41   37224 002_lines_...
   07:43 252   002_words_...
 08:00   37728 003_lines_...
   08:02 504   003_words_...
 08:20   38952 004_lines_...
   08:22 756   004_words_...
 08:40   38664 005_lines_...
   08:42 999   005_words_...
 08:45   38160 006_lines_...
   08:47 1134  006_words_...
 08:50   9720  007_lines_...
   08:51 1260  007_words_...
 08:55   9864  008_lines_...
   08:56 1260  008_words_...
 09:00   10656 009_lines_...
   09:01 1395  009_words_...
 09:05   11664 010_lines_...
   09:06 1395  010_words_...
 09:11   10935 011_lines_...
   09:11 1521  011_words_...
 09:16   11745 012_lines_...
   09:16 1530  012_words_...
 09:21   12069 013_lines_...
   09:22 1656  013_words_...
 09:27   10692 014_lines_...
   09:27 1665  014_words_...
 09:32   10449 015_lines_...
   09:32 1791  015_words_...
 09:37   11178 016_lines_...
   09:37 1800  016_words_...
 09:45   17496 017_lines_...
   09:45 1926  017_words_...
 09:55   22032 018_lines_...
   09:56 2061  018_words_...
 10:05   21951 019_lines_...
   10:06 2196  019_words_...
 10:15   21870 020_lines_...
   10:16 2322  020_words_...
 10:25   21303 021_lines_...
   10:26 2340  021_words_...


 final SparkConf conf = new
 SparkConf().setMaster(local[4]).setAppName(WordCount);
 try (final JavaStreamingContext context = new
 JavaStreamingContext(conf, Durations.seconds(10))) {

 context.checkpoint(/tmp/checkpoint);

 final JavaDStreamString lines = context.union(
 context.receiverStream(new GeneratorReceiver()),
 ImmutableList.of(
 context.receiverStream(new GeneratorReceiver()),
 context.receiverStream(new GeneratorReceiver(;

 lines.print();

 final AccumulatorInteger lineRddIndex =
 context.sparkContext().accumulator(0);
 lines.foreachRDD( rdd - {
 lineRddIndex.add(1);
 final String prefix = /tmp/ + String.format(%03d,
 lineRddIndex.localValue()) + _lines_;
 try (final PrintStream out = new PrintStream(prefix +
 UUID.randomUUID())) {
 rdd.collect().forEach(s - out.println(s));
 }
 return null;
 });

 final JavaDStreamString words =
 lines.flatMap(x - Arrays.asList(x.split( )));
 final JavaPairDStreamString, Integer pairs =
 words.mapToPair(s - new Tuple2String, Integer(s, 1));
 final JavaPairDStreamString, Integer wordCounts =
 pairs.reduceByKey((i1, i2) - i1 + i2);

 final AccumulatorInteger sleep =
 

Re: Idempotent count

2015-03-18 Thread Binh Nguyen Van
Hi Arush,

Thank you for answering!
When you say checkpoints hold metadata and Data, what is the Data? Is it
the Data that is pulled from input source or is it the state?
If it is state then is it the same number of records that I aggregated
since beginning or only a subset of it? How can I limit the size of
state that is kept in checkpoint?

Thank you
-Binh

On Tue, Mar 17, 2015 at 11:47 PM, Arush Kharbanda 
ar...@sigmoidanalytics.com wrote:

 Hi

 Yes spark streaming is capable of stateful stream processing. With or
 without state is a way of classifying state.
 Checkpoints hold metadata and Data.

 Thanks


 On Wed, Mar 18, 2015 at 4:00 AM, Binh Nguyen Van binhn...@gmail.com
 wrote:

 Hi all,

 I am new to Spark so please forgive me if my questions is stupid.
 I am trying to use Spark-Streaming in an application that read data
 from a queue (Kafka) and do some aggregation (sum, count..) and
 then persist result to an external storage system (MySQL, VoltDB...)

 From my understanding of Spark-Streaming, I can have two ways
 of doing aggregation:

- Stateless: I don't have to keep state and just apply new delta
values to the external system. From my understanding, doing in this way I
may end up with over counting when there is failure and replay.
- Statefull: Use checkpoint to keep state and blindly save new state
to external system. Doing in this way I have correct aggregation result 
 but
I have to keep data in two places (state and external system)

 My questions are:

- Is my understanding of Stateless and Statefull aggregation correct?
If not please correct me!
- For the Statefull aggregation, What does Spark-Streaming keep when
it saves checkpoint?

 Please kindly help!

 Thanks
 -Binh




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Idempotent count

2015-03-17 Thread Binh Nguyen Van
Hi all,

I am new to Spark so please forgive me if my questions is stupid.
I am trying to use Spark-Streaming in an application that read data
from a queue (Kafka) and do some aggregation (sum, count..) and
then persist result to an external storage system (MySQL, VoltDB...)

From my understanding of Spark-Streaming, I can have two ways
of doing aggregation:

   - Stateless: I don't have to keep state and just apply new delta values
   to the external system. From my understanding, doing in this way I may end
   up with over counting when there is failure and replay.
   - Statefull: Use checkpoint to keep state and blindly save new state to
   external system. Doing in this way I have correct aggregation result but I
   have to keep data in two places (state and external system)

My questions are:

   - Is my understanding of Stateless and Statefull aggregation correct? If
   not please correct me!
   - For the Statefull aggregation, What does Spark-Streaming keep when it
   saves checkpoint?

Please kindly help!

Thanks
-Binh