It looked like from your graphs that you had a 10 second batch time, but
that your processing time was consistently 11 seconds.  If that's correct,
then yes your delay is going to keep growing.  You'd need to either
increase your batch time, or get your processing time down (either by
adding more resources or changing your code).

I'd expect adding a repartition / shuffle to increase processing time, not
decrease it.  What are you seeing after adding the partitionBy call?

On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote:

> Oh my, I implemented one directStream instead of union of three but it is
> still growing exponential with window method.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>
>
> 08.09.2015, 23:53, "Cody Koeninger" <c...@koeninger.org>:
> > Yeah, that's the general idea.
> >
> > When you say hard code topic name, do you mean  Set(topicA, topicB,
> topicB) ?  You should be able to use a variable for that - read it from a
> config file, whatever.
> >
> > If you're talking about the match statement, yeah you'd need to hardcode
> your cases.
> >
> > On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.pon...@ya.ru>
> wrote:
> >> Ok. I got it!
> >> But it seems that I need to hard code topic name.
> >>
> >> something like that?
> >>
> >> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](
> >>   ssc, kafkaParams, Set(topicA, topicB, topicB))
> >>   .transform{ rdd =>
> >>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >>     rdd.mapPartitionsWithIndex(
> >>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
> >>         offsetRange(idx).topic match {
> >>           case "topicA" => ...
> >>           case "topicB" => ...
> >>           case _ => ....
> >>         }
> >>      )
> >>     }
> >>
> >> 08.09.2015, 19:21, "Cody Koeninger" <c...@koeninger.org>:
> >>> That doesn't really matter.  With the direct stream you'll get all
> objects for a given topicpartition in the same spark partition.  You know
> what topic it's from via hasOffsetRanges.  Then you can deserialize
> appropriately based on topic.
> >>>
> >>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.pon...@ya.ru>
> wrote:
> >>>> The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
> >>>>
> >>>> --
> >>>> Яндекс.Почта — надёжная почта
> >>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
> >>>>
> >>>> 08.09.2015, 19:11, "Cody Koeninger" <c...@koeninger.org>:
> >>>>
> >>>>> I'm not 100% sure what's going on there, but why are you doing a
> union in the first place?
> >>>>>
> >>>>> If you want multiple topics in a stream, just pass them all in the
> set of topics to one call to createDirectStream
> >>>>>
> >>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.pon...@ya.ru>
> wrote:
> >>>>>> Ok.
> >>>>>> Spark 1.4.1 on yarn
> >>>>>>
> >>>>>> Here is my application
> >>>>>> I have 4 different Kafka topics(different object streams)
> >>>>>>
> >>>>>> type Edge = (String,String)
> >>>>>>
> >>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>>>>>
> >>>>>> val u = a union b union c
> >>>>>>
> >>>>>> val source = u.window(Seconds(600), Seconds(10))
> >>>>>>
> >>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>>>>>
> >>>>>> val joinResult = source.rightOuterJoin( z )
> >>>>>> joinResult.foreachRDD { rdd=>
> >>>>>>   rdd.foreachPartition { partition =>
> >>>>>>      .... // save to result topic in kafka
> >>>>>>    }
> >>>>>>  }
> >>>>>>
> >>>>>> The 'window' function in the code above is constantly growing,
> >>>>>> no matter how many events appeared in corresponding kafka topics
> >>>>>>
> >>>>>> but if I change one line from
> >>>>>>
> >>>>>> val source = u.window(Seconds(600), Seconds(10))
> >>>>>>
> >>>>>> to
> >>>>>>
> >>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>>>>>
> >>>>>> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>>>>>
> >>>>>> Everything works perfect.
> >>>>>>
> >>>>>> Perhaps the problem was in WindowedDStream
> >>>>>>
> >>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>>>>>
> >>>>>> Nonetheless I did not see any hints about such a bahaviour in doc.
> >>>>>> Is it a bug or absolutely normal behaviour?
> >>>>>>
> >>>>>> 08.09.2015, 17:03, "Cody Koeninger" <c...@koeninger.org>:
> >>>>>>
> >>>>>>>  Can you provide more info (what version of spark, code example)?
> >>>>>>>
> >>>>>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <
> alexey.pon...@ya.ru> wrote:
> >>>>>>>>  Hi,
> >>>>>>>>
> >>>>>>>>  I have an application with 2 streams, which are joined together.
> >>>>>>>>  Stream1 - is simple DStream(relativly small size batch chunks)
> >>>>>>>>  Stream2 - is a windowed DStream(with duration for example 60
> seconds)
> >>>>>>>>
> >>>>>>>>  Stream1 and Stream2 are Kafka direct stream.
> >>>>>>>>  The problem is that according to logs window operation is
> constantly increasing(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php
> ">screen</a>).
> >>>>>>>>  And also I see gap in pocessing window(<a href="
> http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen</a>)
> in logs there are no events in that period.
> >>>>>>>>  So what is happen in that gap and why window is constantly
> insreasing?
> >>>>>>>>
> >>>>>>>>  Thank you in advance
> >>>>>>>>
> >>>>>>>>
>  ---------------------------------------------------------------------
> >>>>>>>>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>>>>>>>  For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to