Cody Koeninger-2 wrote
> What's your schema for the offset table, and what's the definition of
> writeOffset ?

The schema is the same as the one in your post: topic | partition| offset
The writeOffset is nearly identical:

  def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
    logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
    if(osr==null) {
      logWarning("no offset provided")
      return
    }

    val updated = sql"""
update txn_offsets set off = ${osr.untilOffset}
  where topic = ${osr.topic} and part = ${osr.partition} and off =
${osr.fromOffset}
""".update.apply()
    if (updated != 1) {
      throw new Exception( Thread.currentThread().toString + s"""failed to
write offset:
${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
    } else {
      logWarning(Thread.currentThread().toString + "offsets updated to " +
osr.untilOffset)
    }

  }


Cody Koeninger-2 wrote
> What key are you reducing on?  Maybe I'm misreading the code, but it looks
> like the per-partition offset is part of the key.  If that's true then you
> could just do your reduction on each partition, rather than after the fact
> on the whole stream.

Yes, the key is a duple comprised of a case class called Key and the
partition's OffsetRange. We piggybacked the OffsetRange in this way so it
would be available within the scope of the partition.

I have tried moving the reduceByKey from the end of the .transform block
into the partition level (at the end of the mapPartitionsWithIndex block.)
This is what you're suggesting, yes? The results didn't correct the offset
update behavior; they still get out of sync pretty quickly.

Some details: I'm using the kafka-console-producer.sh tool to drive the
process, calling it three or four times in succession and piping in 100-1000
messages in each call. Once all the messages have been processed I wait for
the output of the printOffsets method to stop changing and compare it to the
txn_offsets table. (When no data is getting processed the printOffsets
method yields something like the following: [ OffsetRange(topic:
'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
'testmulti', partition: 3, range: [20900 -> 20900]])

Thanks,
Mark






--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to