In fact, you're using the 2 arg form of reduce by key to shrink it down to
1 partition

 reduceByKey(sumFunc, 1)

But you started with 4 kafka partitions?  So they're definitely no longer
1:1


On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger <c...@koeninger.org> wrote:

> This is what I'm suggesting, in pseudocode
>
> rdd.mapPartitionsWithIndex { case (i, iter) =>
>    offset = offsets(i)
>    result = yourReductionFunction(iter)
>    transaction {
>       save(result)
>       save(offset)
>    }
> }.foreach { (_: Nothing) => () }
>
> where yourReductionFunction is just normal scala code.
>
> The code you posted looks like you're only saving offsets once per
> partition, but you're doing it after reduceByKey.  Reduction steps in spark
> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
> correspondence between spark partiion and kafka partition.  If you want to
> verify that's what the problem is, log the value of currentOffset whenever
> it changes.
>
>
>
> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com>
> wrote:
>
>> Cody Koeninger-2 wrote
>> > What's your schema for the offset table, and what's the definition of
>> > writeOffset ?
>>
>> The schema is the same as the one in your post: topic | partition| offset
>> The writeOffset is nearly identical:
>>
>>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
>>     logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>>     if(osr==null) {
>>       logWarning("no offset provided")
>>       return
>>     }
>>
>>     val updated = sql"""
>> update txn_offsets set off = ${osr.untilOffset}
>>   where topic = ${osr.topic} and part = ${osr.partition} and off =
>> ${osr.fromOffset}
>> """.update.apply()
>>     if (updated != 1) {
>>       throw new Exception( Thread.currentThread().toString + s"""failed to
>> write offset:
>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>>     } else {
>>       logWarning(Thread.currentThread().toString + "offsets updated to " +
>> osr.untilOffset)
>>     }
>>
>>   }
>>
>>
>> Cody Koeninger-2 wrote
>> > What key are you reducing on?  Maybe I'm misreading the code, but it
>> looks
>> > like the per-partition offset is part of the key.  If that's true then
>> you
>> > could just do your reduction on each partition, rather than after the
>> fact
>> > on the whole stream.
>>
>> Yes, the key is a duple comprised of a case class called Key and the
>> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
>> would be available within the scope of the partition.
>>
>> I have tried moving the reduceByKey from the end of the .transform block
>> into the partition level (at the end of the mapPartitionsWithIndex block.)
>> This is what you're suggesting, yes? The results didn't correct the offset
>> update behavior; they still get out of sync pretty quickly.
>>
>> Some details: I'm using the kafka-console-producer.sh tool to drive the
>> process, calling it three or four times in succession and piping in
>> 100-1000
>> messages in each call. Once all the messages have been processed I wait
>> for
>> the output of the printOffsets method to stop changing and compare it to
>> the
>> txn_offsets table. (When no data is getting processed the printOffsets
>> method yields something like the following: [ OffsetRange(topic:
>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
>> 'testmulti', partition: 3, range: [20900 -> 20900]])
>>
>> Thanks,
>> Mark
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>

Reply via email to