Glad that worked out for you.  I updated the post on my github to hopefully
clarify the issue.

On Tue, May 5, 2015 at 9:36 AM, Mark Stewart <mark.stew...@tapjoy.com>
wrote:

> In case anyone else was having similar issues, the reordering and dropping
> of the reduceByKey solved the issues we were having. Thank you kindly, Mr.
> Koeninger.
>
> On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> In fact, you're using the 2 arg form of reduce by key to shrink it down
>> to 1 partition
>>
>>  reduceByKey(sumFunc, 1)
>>
>> But you started with 4 kafka partitions?  So they're definitely no longer
>> 1:1
>>
>>
>> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> This is what I'm suggesting, in pseudocode
>>>
>>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>>    offset = offsets(i)
>>>    result = yourReductionFunction(iter)
>>>    transaction {
>>>       save(result)
>>>       save(offset)
>>>    }
>>> }.foreach { (_: Nothing) => () }
>>>
>>> where yourReductionFunction is just normal scala code.
>>>
>>> The code you posted looks like you're only saving offsets once per
>>> partition, but you're doing it after reduceByKey.  Reduction steps in spark
>>> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
>>> correspondence between spark partiion and kafka partition.  If you want to
>>> verify that's what the problem is, log the value of currentOffset whenever
>>> it changes.
>>>
>>>
>>>
>>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com>
>>> wrote:
>>>
>>>> Cody Koeninger-2 wrote
>>>> > What's your schema for the offset table, and what's the definition of
>>>> > writeOffset ?
>>>>
>>>> The schema is the same as the one in your post: topic | partition|
>>>> offset
>>>> The writeOffset is nearly identical:
>>>>
>>>>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit
>>>> = {
>>>>     logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>>>>     if(osr==null) {
>>>>       logWarning("no offset provided")
>>>>       return
>>>>     }
>>>>
>>>>     val updated = sql"""
>>>> update txn_offsets set off = ${osr.untilOffset}
>>>>   where topic = ${osr.topic} and part = ${osr.partition} and off =
>>>> ${osr.fromOffset}
>>>> """.update.apply()
>>>>     if (updated != 1) {
>>>>       throw new Exception( Thread.currentThread().toString + s"""failed
>>>> to
>>>> write offset:
>>>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>>>>     } else {
>>>>       logWarning(Thread.currentThread().toString + "offsets updated to
>>>> " +
>>>> osr.untilOffset)
>>>>     }
>>>>
>>>>   }
>>>>
>>>>
>>>> Cody Koeninger-2 wrote
>>>> > What key are you reducing on?  Maybe I'm misreading the code, but it
>>>> looks
>>>> > like the per-partition offset is part of the key.  If that's true
>>>> then you
>>>> > could just do your reduction on each partition, rather than after the
>>>> fact
>>>> > on the whole stream.
>>>>
>>>> Yes, the key is a duple comprised of a case class called Key and the
>>>> partition's OffsetRange. We piggybacked the OffsetRange in this way so
>>>> it
>>>> would be available within the scope of the partition.
>>>>
>>>> I have tried moving the reduceByKey from the end of the .transform block
>>>> into the partition level (at the end of the mapPartitionsWithIndex
>>>> block.)
>>>> This is what you're suggesting, yes? The results didn't correct the
>>>> offset
>>>> update behavior; they still get out of sync pretty quickly.
>>>>
>>>> Some details: I'm using the kafka-console-producer.sh tool to drive the
>>>> process, calling it three or four times in succession and piping in
>>>> 100-1000
>>>> messages in each call. Once all the messages have been processed I wait
>>>> for
>>>> the output of the printOffsets method to stop changing and compare it
>>>> to the
>>>> txn_offsets table. (When no data is getting processed the printOffsets
>>>> method yields something like the following: [ OffsetRange(topic:
>>>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
>>>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
>>>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
>>>> 'testmulti', partition: 3, range: [20900 -> 20900]])
>>>>
>>>> Thanks,
>>>> Mark
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
>>>> Sent from the Apache Spark Developers List mailing list archive at
>>>> Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to