This is what I'm suggesting, in pseudocode

rdd.mapPartitionsWithIndex { case (i, iter) =>
   offset = offsets(i)
   result = yourReductionFunction(iter)
   transaction {
      save(result)
      save(offset)
   }
}.foreach { (_: Nothing) => () }

where yourReductionFunction is just normal scala code.

The code you posted looks like you're only saving offsets once per
partition, but you're doing it after reduceByKey.  Reduction steps in spark
imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
correspondence between spark partiion and kafka partition.  If you want to
verify that's what the problem is, log the value of currentOffset whenever
it changes.



On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com>
wrote:

> Cody Koeninger-2 wrote
> > What's your schema for the offset table, and what's the definition of
> > writeOffset ?
>
> The schema is the same as the one in your post: topic | partition| offset
> The writeOffset is nearly identical:
>
>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
>     logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>     if(osr==null) {
>       logWarning("no offset provided")
>       return
>     }
>
>     val updated = sql"""
> update txn_offsets set off = ${osr.untilOffset}
>   where topic = ${osr.topic} and part = ${osr.partition} and off =
> ${osr.fromOffset}
> """.update.apply()
>     if (updated != 1) {
>       throw new Exception( Thread.currentThread().toString + s"""failed to
> write offset:
> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>     } else {
>       logWarning(Thread.currentThread().toString + "offsets updated to " +
> osr.untilOffset)
>     }
>
>   }
>
>
> Cody Koeninger-2 wrote
> > What key are you reducing on?  Maybe I'm misreading the code, but it
> looks
> > like the per-partition offset is part of the key.  If that's true then
> you
> > could just do your reduction on each partition, rather than after the
> fact
> > on the whole stream.
>
> Yes, the key is a duple comprised of a case class called Key and the
> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
> would be available within the scope of the partition.
>
> I have tried moving the reduceByKey from the end of the .transform block
> into the partition level (at the end of the mapPartitionsWithIndex block.)
> This is what you're suggesting, yes? The results didn't correct the offset
> update behavior; they still get out of sync pretty quickly.
>
> Some details: I'm using the kafka-console-producer.sh tool to drive the
> process, calling it three or four times in succession and piping in
> 100-1000
> messages in each call. Once all the messages have been processed I wait for
> the output of the printOffsets method to stop changing and compare it to
> the
> txn_offsets table. (When no data is getting processed the printOffsets
> method yields something like the following: [ OffsetRange(topic:
> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
> 'testmulti', partition: 3, range: [20900 -> 20900]])
>
> Thanks,
> Mark
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to