In fact, you're using the 2 arg form of reduce by key to shrink it down to 1 partition
reduceByKey(sumFunc, 1) But you started with 4 kafka partitions? So they're definitely no longer 1:1 On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger <c...@koeninger.org> wrote: > This is what I'm suggesting, in pseudocode > > rdd.mapPartitionsWithIndex { case (i, iter) => > offset = offsets(i) > result = yourReductionFunction(iter) > transaction { > save(result) > save(offset) > } > }.foreach { (_: Nothing) => () } > > where yourReductionFunction is just normal scala code. > > The code you posted looks like you're only saving offsets once per > partition, but you're doing it after reduceByKey. Reduction steps in spark > imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 > correspondence between spark partiion and kafka partition. If you want to > verify that's what the problem is, log the value of currentOffset whenever > it changes. > > > > On Thu, Apr 30, 2015 at 1:38 PM, badgerpants <mark.stew...@tapjoy.com> > wrote: > >> Cody Koeninger-2 wrote >> > What's your schema for the offset table, and what's the definition of >> > writeOffset ? >> >> The schema is the same as the one in your post: topic | partition| offset >> The writeOffset is nearly identical: >> >> def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { >> logWarning(Thread.currentThread().toString + "writeOffset: " + osr) >> if(osr==null) { >> logWarning("no offset provided") >> return >> } >> >> val updated = sql""" >> update txn_offsets set off = ${osr.untilOffset} >> where topic = ${osr.topic} and part = ${osr.partition} and off = >> ${osr.fromOffset} >> """.update.apply() >> if (updated != 1) { >> throw new Exception( Thread.currentThread().toString + s"""failed to >> write offset: >> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") >> } else { >> logWarning(Thread.currentThread().toString + "offsets updated to " + >> osr.untilOffset) >> } >> >> } >> >> >> Cody Koeninger-2 wrote >> > What key are you reducing on? Maybe I'm misreading the code, but it >> looks >> > like the per-partition offset is part of the key. If that's true then >> you >> > could just do your reduction on each partition, rather than after the >> fact >> > on the whole stream. >> >> Yes, the key is a duple comprised of a case class called Key and the >> partition's OffsetRange. We piggybacked the OffsetRange in this way so it >> would be available within the scope of the partition. >> >> I have tried moving the reduceByKey from the end of the .transform block >> into the partition level (at the end of the mapPartitionsWithIndex block.) >> This is what you're suggesting, yes? The results didn't correct the offset >> update behavior; they still get out of sync pretty quickly. >> >> Some details: I'm using the kafka-console-producer.sh tool to drive the >> process, calling it three or four times in succession and piping in >> 100-1000 >> messages in each call. Once all the messages have been processed I wait >> for >> the output of the printOffsets method to stop changing and compare it to >> the >> txn_offsets table. (When no data is getting processed the printOffsets >> method yields something like the following: [ OffsetRange(topic: >> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: >> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: >> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: >> 'testmulti', partition: 3, range: [20900 -> 20900]]) >> >> Thanks, >> Mark >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html >> Sent from the Apache Spark Developers List mailing list archive at >> Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >