Cody Koeninger-2 wrote > What's your schema for the offset table, and what's the definition of > writeOffset ?
The schema is the same as the one in your post: topic | partition| offset The writeOffset is nearly identical: def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { logWarning(Thread.currentThread().toString + "writeOffset: " + osr) if(osr==null) { logWarning("no offset provided") return } val updated = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (updated != 1) { throw new Exception( Thread.currentThread().toString + s"""failed to write offset: ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") } else { logWarning(Thread.currentThread().toString + "offsets updated to " + osr.untilOffset) } } Cody Koeninger-2 wrote > What key are you reducing on? Maybe I'm misreading the code, but it looks > like the per-partition offset is part of the key. If that's true then you > could just do your reduction on each partition, rather than after the fact > on the whole stream. Yes, the key is a duple comprised of a case class called Key and the partition's OffsetRange. We piggybacked the OffsetRange in this way so it would be available within the scope of the partition. I have tried moving the reduceByKey from the end of the .transform block into the partition level (at the end of the mapPartitionsWithIndex block.) This is what you're suggesting, yes? The results didn't correct the offset update behavior; they still get out of sync pretty quickly. Some details: I'm using the kafka-console-producer.sh tool to drive the process, calling it three or four times in succession and piping in 100-1000 messages in each call. Once all the messages have been processed I wait for the output of the printOffsets method to stop changing and compare it to the txn_offsets table. (When no data is getting processed the printOffsets method yields something like the following: [ OffsetRange(topic: 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: 'testmulti', partition: 3, range: [20900 -> 20900]]) Thanks, Mark -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org