In case anyone else was having similar issues, the reordering and dropping
of the reduceByKey solved the issues we were having. Thank you kindly, Mr.
Koeninger.

On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger <c...@koeninger.org> wrote:

> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1 partition
>
>  reduceByKey(sumFunc, 1)
>
> But you started with 4 kafka partitions?  So they're definitely no longer
> 1:1
>
>
> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> This is what I'm suggesting, in pseudocode
>>
>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>    offset = offsets(i)
>>    result = yourReductionFunction(iter)
>>    transaction {
>>       save(result)
>>       save(offset)
>>    }
>> }.foreach { (_: Nothing) => () }
>>
>> where yourReductionFunction is just normal scala code.
>>
>> The code you posted looks like you're only saving offsets once per
>> partition, but you're doing it after reduceByKey.  Reduction steps in spark
>> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
>> correspondence between spark partiion and kafka partition.  If you want to
>> verify that's what the problem is, log the value of currentOffset whenever
>> it changes.
>>
>>
>>
>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com>
>> wrote:
>>
>>> Cody Koeninger-2 wrote
>>> > What's your schema for the offset table, and what's the definition of
>>> > writeOffset ?
>>>
>>> The schema is the same as the one in your post: topic | partition| offset
>>> The writeOffset is nearly identical:
>>>
>>>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit =
>>> {
>>>     logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>>>     if(osr==null) {
>>>       logWarning("no offset provided")
>>>       return
>>>     }
>>>
>>>     val updated = sql"""
>>> update txn_offsets set off = ${osr.untilOffset}
>>>   where topic = ${osr.topic} and part = ${osr.partition} and off =
>>> ${osr.fromOffset}
>>> """.update.apply()
>>>     if (updated != 1) {
>>>       throw new Exception( Thread.currentThread().toString + s"""failed
>>> to
>>> write offset:
>>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>>>     } else {
>>>       logWarning(Thread.currentThread().toString + "offsets updated to "
>>> +
>>> osr.untilOffset)
>>>     }
>>>
>>>   }
>>>
>>>
>>> Cody Koeninger-2 wrote
>>> > What key are you reducing on?  Maybe I'm misreading the code, but it
>>> looks
>>> > like the per-partition offset is part of the key.  If that's true then
>>> you
>>> > could just do your reduction on each partition, rather than after the
>>> fact
>>> > on the whole stream.
>>>
>>> Yes, the key is a duple comprised of a case class called Key and the
>>> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
>>> would be available within the scope of the partition.
>>>
>>> I have tried moving the reduceByKey from the end of the .transform block
>>> into the partition level (at the end of the mapPartitionsWithIndex
>>> block.)
>>> This is what you're suggesting, yes? The results didn't correct the
>>> offset
>>> update behavior; they still get out of sync pretty quickly.
>>>
>>> Some details: I'm using the kafka-console-producer.sh tool to drive the
>>> process, calling it three or four times in succession and piping in
>>> 100-1000
>>> messages in each call. Once all the messages have been processed I wait
>>> for
>>> the output of the printOffsets method to stop changing and compare it to
>>> the
>>> txn_offsets table. (When no data is getting processed the printOffsets
>>> method yields something like the following: [ OffsetRange(topic:
>>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
>>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
>>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
>>> 'testmulti', partition: 3, range: [20900 -> 20900]])
>>>
>>> Thanks,
>>> Mark
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to