That`s correct, I have 10 seconds batch.
The problem is actually in processing time, it is increasing constantly no 
matter how small or large my window duration is.
I am trying to prepare some example code to clarify my use case.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


09.09.2015, 17:04, "Cody Koeninger" <c...@koeninger.org>:
> It looked like from your graphs that you had a 10 second batch time, but that 
> your processing time was consistently 11 seconds.  If that's correct, then 
> yes your delay is going to keep growing.  You'd need to either increase your 
> batch time, or get your processing time down (either by adding more resources 
> or changing your code).
>
> I'd expect adding a repartition / shuffle to increase processing time, not 
> decrease it.  What are you seeing after adding the partitionBy call?
>
> On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote:
>> Oh my, I implemented one directStream instead of union of three but it is 
>> still growing exponential with window method.
>>
>> --
>> Яндекс.Почта — надёжная почта
>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>>
>> 08.09.2015, 23:53, "Cody Koeninger" <c...@koeninger.org>:
>>
>>> Yeah, that's the general idea.
>>>
>>> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB) 
>>> ?  You should be able to use a variable for that - read it from a config 
>>> file, whatever.
>>>
>>> If you're talking about the match statement, yeah you'd need to hardcode 
>>> your cases.
>>>
>>> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote:
>>>> Ok. I got it!
>>>> But it seems that I need to hard code topic name.
>>>>
>>>> something like that?
>>>>
>>>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], 
>>>> DefaultDecoder, DefaultDecoder](
>>>>   ssc, kafkaParams, Set(topicA, topicB, topicB))
>>>>   .transform{ rdd =>
>>>>     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>     rdd.mapPartitionsWithIndex(
>>>>       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
>>>>         offsetRange(idx).topic match {
>>>>           case "topicA" => ...
>>>>           case "topicB" => ...
>>>>           case _ => ....
>>>>         }
>>>>      )
>>>>     }
>>>>
>>>> 08.09.2015, 19:21, "Cody Koeninger" <c...@koeninger.org>:
>>>>> That doesn't really matter.  With the direct stream you'll get all 
>>>>> objects for a given topicpartition in the same spark partition.  You know 
>>>>> what topic it's from via hasOffsetRanges.  Then you can deserialize 
>>>>> appropriately based on topic.
>>>>>
>>>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.pon...@ya.ru> 
>>>>> wrote:
>>>>>> The thing is, that these topics contain absolutely different AVRO 
>>>>>> objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
>>>>>> objects, filter and then map to tuple (String, String). So i have 3 
>>>>>> streams with different avro object in there. I need to cast them(using 
>>>>>> some business rules) to pairs and unite.
>>>>>>
>>>>>> --
>>>>>> Яндекс.Почта — надёжная почта
>>>>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1
>>>>>>
>>>>>> 08.09.2015, 19:11, "Cody Koeninger" <c...@koeninger.org>:
>>>>>>
>>>>>>> I'm not 100% sure what's going on there, but why are you doing a union 
>>>>>>> in the first place?
>>>>>>>
>>>>>>> If you want multiple topics in a stream, just pass them all in the set 
>>>>>>> of topics to one call to createDirectStream
>>>>>>>
>>>>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.pon...@ya.ru> 
>>>>>>> wrote:
>>>>>>>> Ok.
>>>>>>>> Spark 1.4.1 on yarn
>>>>>>>>
>>>>>>>> Here is my application
>>>>>>>> I have 4 different Kafka topics(different object streams)
>>>>>>>>
>>>>>>>> type Edge = (String,String)
>>>>>>>>
>>>>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( 
>>>>>>>> nonEmpty ).map( toEdge )
>>>>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( 
>>>>>>>> nonEmpty ).map( toEdge )
>>>>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( 
>>>>>>>> nonEmpty ).map( toEdge )
>>>>>>>>
>>>>>>>> val u = a union b union c
>>>>>>>>
>>>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( 
>>>>>>>> nonEmpty ).map( toEdge )
>>>>>>>>
>>>>>>>> val joinResult = source.rightOuterJoin( z )
>>>>>>>> joinResult.foreachRDD { rdd=>
>>>>>>>>   rdd.foreachPartition { partition =>
>>>>>>>>      .... // save to result topic in kafka
>>>>>>>>    }
>>>>>>>>  }
>>>>>>>>
>>>>>>>> The 'window' function in the code above is constantly growing,
>>>>>>>> no matter how many events appeared in corresponding kafka topics
>>>>>>>>
>>>>>>>> but if I change one line from
>>>>>>>>
>>>>>>>> val source = u.window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>>>>>>>
>>>>>>>> val source = u.transform(_.partitionBy(partitioner.value) 
>>>>>>>> ).window(Seconds(600), Seconds(10))
>>>>>>>>
>>>>>>>> Everything works perfect.
>>>>>>>>
>>>>>>>> Perhaps the problem was in WindowedDStream
>>>>>>>>
>>>>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the same 
>>>>>>>> partitioner ) instead of UnionRDD.
>>>>>>>>
>>>>>>>> Nonetheless I did not see any hints about such a bahaviour in doc.
>>>>>>>> Is it a bug or absolutely normal behaviour?
>>>>>>>>
>>>>>>>> 08.09.2015, 17:03, "Cody Koeninger" <c...@koeninger.org>:
>>>>>>>>
>>>>>>>>>  Can you provide more info (what version of spark, code example)?
>>>>>>>>>
>>>>>>>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.pon...@ya.ru> 
>>>>>>>>> wrote:
>>>>>>>>>>  Hi,
>>>>>>>>>>
>>>>>>>>>>  I have an application with 2 streams, which are joined together.
>>>>>>>>>>  Stream1 - is simple DStream(relativly small size batch chunks)
>>>>>>>>>>  Stream2 - is a windowed DStream(with duration for example 60 
>>>>>>>>>> seconds)
>>>>>>>>>>
>>>>>>>>>>  Stream1 and Stream2 are Kafka direct stream.
>>>>>>>>>>  The problem is that according to logs window operation is 
>>>>>>>>>> constantly increasing(<a 
>>>>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen</a>).
>>>>>>>>>>  And also I see gap in pocessing window(<a 
>>>>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen</a>)
>>>>>>>>>>  in logs there are no events in that period.
>>>>>>>>>>  So what is happen in that gap and why window is constantly 
>>>>>>>>>> insreasing?
>>>>>>>>>>
>>>>>>>>>>  Thank you in advance
>>>>>>>>>>
>>>>>>>>>>  
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>  For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to