Glad that worked out for you. I updated the post on my github to hopefully clarify the issue.
On Tue, May 5, 2015 at 9:36 AM, Mark Stewart <mark.stew...@tapjoy.com> wrote: > In case anyone else was having similar issues, the reordering and dropping > of the reduceByKey solved the issues we were having. Thank you kindly, Mr. > Koeninger. > > On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> In fact, you're using the 2 arg form of reduce by key to shrink it down >> to 1 partition >> >> reduceByKey(sumFunc, 1) >> >> But you started with 4 kafka partitions? So they're definitely no longer >> 1:1 >> >> >> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> This is what I'm suggesting, in pseudocode >>> >>> rdd.mapPartitionsWithIndex { case (i, iter) => >>> offset = offsets(i) >>> result = yourReductionFunction(iter) >>> transaction { >>> save(result) >>> save(offset) >>> } >>> }.foreach { (_: Nothing) => () } >>> >>> where yourReductionFunction is just normal scala code. >>> >>> The code you posted looks like you're only saving offsets once per >>> partition, but you're doing it after reduceByKey. Reduction steps in spark >>> imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 >>> correspondence between spark partiion and kafka partition. If you want to >>> verify that's what the problem is, log the value of currentOffset whenever >>> it changes. >>> >>> >>> >>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com> >>> wrote: >>> >>>> Cody Koeninger-2 wrote >>>> > What's your schema for the offset table, and what's the definition of >>>> > writeOffset ? >>>> >>>> The schema is the same as the one in your post: topic | partition| >>>> offset >>>> The writeOffset is nearly identical: >>>> >>>> def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit >>>> = { >>>> logWarning(Thread.currentThread().toString + "writeOffset: " + osr) >>>> if(osr==null) { >>>> logWarning("no offset provided") >>>> return >>>> } >>>> >>>> val updated = sql""" >>>> update txn_offsets set off = ${osr.untilOffset} >>>> where topic = ${osr.topic} and part = ${osr.partition} and off = >>>> ${osr.fromOffset} >>>> """.update.apply() >>>> if (updated != 1) { >>>> throw new Exception( Thread.currentThread().toString + s"""failed >>>> to >>>> write offset: >>>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") >>>> } else { >>>> logWarning(Thread.currentThread().toString + "offsets updated to >>>> " + >>>> osr.untilOffset) >>>> } >>>> >>>> } >>>> >>>> >>>> Cody Koeninger-2 wrote >>>> > What key are you reducing on? Maybe I'm misreading the code, but it >>>> looks >>>> > like the per-partition offset is part of the key. If that's true >>>> then you >>>> > could just do your reduction on each partition, rather than after the >>>> fact >>>> > on the whole stream. >>>> >>>> Yes, the key is a duple comprised of a case class called Key and the >>>> partition's OffsetRange. We piggybacked the OffsetRange in this way so >>>> it >>>> would be available within the scope of the partition. >>>> >>>> I have tried moving the reduceByKey from the end of the .transform block >>>> into the partition level (at the end of the mapPartitionsWithIndex >>>> block.) >>>> This is what you're suggesting, yes? The results didn't correct the >>>> offset >>>> update behavior; they still get out of sync pretty quickly. >>>> >>>> Some details: I'm using the kafka-console-producer.sh tool to drive the >>>> process, calling it three or four times in succession and piping in >>>> 100-1000 >>>> messages in each call. Once all the messages have been processed I wait >>>> for >>>> the output of the printOffsets method to stop changing and compare it >>>> to the >>>> txn_offsets table. (When no data is getting processed the printOffsets >>>> method yields something like the following: [ OffsetRange(topic: >>>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: >>>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: >>>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: >>>> 'testmulti', partition: 3, range: [20900 -> 20900]]) >>>> >>>> Thanks, >>>> Mark >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html >>>> Sent from the Apache Spark Developers List mailing list archive at >>>> Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>>> >>>> >>> >> >