What's your schema for the offset table, and what's the definition of
writeOffset ?

What key are you reducing on?  Maybe I'm misreading the code, but it looks
like the per-partition offset is part of the key.  If that's true then you
could just do your reduction on each partition, rather than after the fact
on the whole stream.

On Thu, Apr 30, 2015 at 12:10 PM, badgerpants <mark.stew...@tapjoy.com>
wrote:

> We're a group of experienced backend developers who are fairly new to Spark
> Streaming (and Scala) and very interested in using the new (in 1.3)
> DirectKafkaInputDStream impl as part of the metrics reporting service we're
> building.
>
> Our flow involves reading in metric events, lightly modifying some of the
> data values, and then creating aggregates via reduceByKey. We're following
> the approach in Cody Koeninger's blog on exactly-once streaming
> (https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md)
> in
> which the Kakfa OffsetRanges are grabbed from the RDD and persisted to a
> tracking table within the same db transaction as the data within said
> ranges.
>
> Within a short time frame the offsets in the table fall out of synch with
> the offsets. It appears that the writeOffsets method (see code below)
> occasionally doesn't get called which also indicates that some blocks of
> data aren't being processed either; the aggregate operation makes this
> difficult to eyeball from the data that's written to the db.
>
> Note that we do understand that the reduce operation alters that
> size/boundaries of the partitions we end up processing. Indeed, without the
> reduceByKey operation our code seems to work perfectly. But without the
> reduceByKey operation the db has to perform *a lot* more updates. It's
> certainly a significant restriction to place on what is such a promising
> approach. I'm hoping there simply something we're missing.
>
> Any workarounds or thoughts are welcome. Here's the code we've got:
>
> def run(stream: DStream[Input], conf: Config, args: List[String]): Unit = {
>     ...
>     val sumFunc: (BigDecimal, BigDecimal) => BigDecimal = (_ + _)
>
>     val transformStream = stream.transform { rdd =>
>       val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       printOffsets(offsets) // just prints out the offsets for reference
>       rdd.mapPartitionsWithIndex { case (i, iter) =>
>         iter.flatMap { case (name, msg) => extractMetrics(msg) }
>           .map { case (k,v) => ( ( keyWithFlooredTimestamp(k), offsets(i)
> ),
> v ) }
>       }
>     }.reduceByKey(sumFunc, 1)
>
>     transformStream.foreachRDD { rdd =>
>       rdd.foreachPartition { partition =>
>         val conn = DriverManager.getConnection(dbUrl, dbUser, dbPass)
>         val db = DB(conn)
>         db.autoClose(false)
>
>         db.autoCommit { implicit session =>
>           var currentOffset: OffsetRange = null
>           partition.foreach { case (key, value) =>
>             currentOffset = key._2
>             writeMetrics(key._1, value, table)
>           }
>           writeOffset(currentOffset) // updates the offset positions
>         }
>         db.close()
>       }
>     }
>
> Thanks,
> Mark
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to