Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread Cody Koeninger
Sorry, realized I probably didn't fully answer your question about my blog
post, as opposed to Michael Nolls.

The direct stream is really blunt, a given RDD partition is just a kafka
topic/partition and an upper / lower bound for the range of offsets.  When
an executor computes the partition, it connects to kafka and pulls only
those messages, then closes the connection.  There's no long running
receiver at all, no caching of connections (I found caching sockets didn't
matter much).

You get much better cluster utilization that way, because if a partition is
relatively small compared to the others in the RDD, the executor gets done
with it and gets scheduled another one to work one.  With long running
receivers spark acts like the receiver takes up a core even if it isn't
doing much.  Look at the CPU graph on slide 13 of the link i posted.

On Thu, May 14, 2015 at 4:21 PM, Cody Koeninger  wrote:

> If the transformation you're trying to do really is per-partition, it
> shouldn't matter whether you're using scala methods or spark methods.  The
> parallel speedup you're getting is all from doing the work on multiple
> machines, and shuffle or caching or other benefits of spark aren't a factor.
>
> If using scala methods bothers you, do all of your transformation using
> spark methods, collect the results back to the driver, and save them with
> the offsets there:
>
> stream.foreachRDD { rdd =>
>   val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges
>   val results = rdd.some.chain.of.spark.calls.collect
>   save(offsets, results)
> }
>
> My work-in-progress slides for my talk at the upcoming spark conference
> are here
>
> http://koeninger.github.io/kafka-exactly-once/
>
> if that clarifies that point a little bit (slides 20 vs 21)
>
> The direct stream doesn't use long-running receivers, so the concerns that
> blog post is trying to address don't really apply.
>
> Under normal operation a given partition of an rdd is only going to be
> handled by a single executor at a time (as long as you don't turn on
> speculative execution... or I suppose it might be possible in some kind of
> network partition situation).  Transactionality should save you even if
> something weird happens though.
>
> On Thu, May 14, 2015 at 3:44 PM, will-ob  wrote:
>
>> Hey Cody (et. al.),
>>
>> Few more questions related to this. It sounds like our missing data issues
>> appear fixed with this approach. Could you shed some light on a few
>> questions that came up?
>>
>> -
>>
>> Processing our data inside a single foreachPartition function appears to
>> be
>> very different from the pattern seen in the programming guide. Does this
>> become problematic with additional, interleaved reduce/filter/map steps?
>>
>> ```
>> # typical?
>> rdd
>>   .map { ... }
>>   .reduce { ... }
>>   .filter { ... }
>>   .reduce { ... }
>>   .foreachRdd { writeToDb }
>>
>> # with foreachPartition
>> rdd.foreachPartition { case (iter) =>
>>   iter
>> .map { ... }
>> .reduce { ... }
>> .filter { ... }
>> .reduce { ... }
>> }
>>
>> ```
>> -
>>
>> Could the above be simplified by having
>>
>> one kafka partition per DStream, rather than
>> one kafka partition per RDD partition
>>
>> ?
>>
>> That way, we wouldn't need to do our processing inside each partition as
>> there would only be one set of kafka metadata to commit.
>>
>> Presumably, one could `join` DStreams when topic-level aggregates were
>> needed.
>>
>> It seems this was the approach of Michael Noll in his blog post.
>> (
>> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
>> )
>> Although, his primary motivation appears to be maintaining
>> high-throughput /
>> parallelism rather than kafka metadata.
>>
>> -
>>
>> From the blog post:
>>
>> "... there is no long-running receiver task that occupies a core per
>> stream
>> regardless of what the message volume is."
>>
>> Is this because data is retrieved by polling rather than maintaining a
>> socket? Is it still the case that there is only one receiver process per
>> DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1
>> .. else discover the machine's NIC limit?
>>
>> Can you think of a reason not to do this? Cluster utilization, or the
>> like,
>> perhaps?
>>
>> 
>>
>> And seems a silly question, but does `foreachPartition` guarantee that a
>> single worker will process the passed function? Or might two workers split
>> the work?
>>
>> Eg. foreachPartition(f)
>>
>> Worker 1: f( Iterator[partition 1 records 1 - 50] )
>> Worker 2: f( Iterator[partition 1 records 51 - 100] )
>>
>> It is unclear from the scaladocs
>> (
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
>> ).
>> But you can imagine, if it is critical that this data be committed in a
>> single transaction, that two workers will have issues.
>>
>

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread Cody Koeninger
If the transformation you're trying to do really is per-partition, it
shouldn't matter whether you're using scala methods or spark methods.  The
parallel speedup you're getting is all from doing the work on multiple
machines, and shuffle or caching or other benefits of spark aren't a factor.

If using scala methods bothers you, do all of your transformation using
spark methods, collect the results back to the driver, and save them with
the offsets there:

stream.foreachRDD { rdd =>
  val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges
  val results = rdd.some.chain.of.spark.calls.collect
  save(offsets, results)
}

My work-in-progress slides for my talk at the upcoming spark conference are
here

http://koeninger.github.io/kafka-exactly-once/

if that clarifies that point a little bit (slides 20 vs 21)

The direct stream doesn't use long-running receivers, so the concerns that
blog post is trying to address don't really apply.

Under normal operation a given partition of an rdd is only going to be
handled by a single executor at a time (as long as you don't turn on
speculative execution... or I suppose it might be possible in some kind of
network partition situation).  Transactionality should save you even if
something weird happens though.

On Thu, May 14, 2015 at 3:44 PM, will-ob  wrote:

> Hey Cody (et. al.),
>
> Few more questions related to this. It sounds like our missing data issues
> appear fixed with this approach. Could you shed some light on a few
> questions that came up?
>
> -
>
> Processing our data inside a single foreachPartition function appears to be
> very different from the pattern seen in the programming guide. Does this
> become problematic with additional, interleaved reduce/filter/map steps?
>
> ```
> # typical?
> rdd
>   .map { ... }
>   .reduce { ... }
>   .filter { ... }
>   .reduce { ... }
>   .foreachRdd { writeToDb }
>
> # with foreachPartition
> rdd.foreachPartition { case (iter) =>
>   iter
> .map { ... }
> .reduce { ... }
> .filter { ... }
> .reduce { ... }
> }
>
> ```
> -
>
> Could the above be simplified by having
>
> one kafka partition per DStream, rather than
> one kafka partition per RDD partition
>
> ?
>
> That way, we wouldn't need to do our processing inside each partition as
> there would only be one set of kafka metadata to commit.
>
> Presumably, one could `join` DStreams when topic-level aggregates were
> needed.
>
> It seems this was the approach of Michael Noll in his blog post.
> (
> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
> )
> Although, his primary motivation appears to be maintaining high-throughput
> /
> parallelism rather than kafka metadata.
>
> -
>
> From the blog post:
>
> "... there is no long-running receiver task that occupies a core per stream
> regardless of what the message volume is."
>
> Is this because data is retrieved by polling rather than maintaining a
> socket? Is it still the case that there is only one receiver process per
> DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1
> .. else discover the machine's NIC limit?
>
> Can you think of a reason not to do this? Cluster utilization, or the like,
> perhaps?
>
> 
>
> And seems a silly question, but does `foreachPartition` guarantee that a
> single worker will process the passed function? Or might two workers split
> the work?
>
> Eg. foreachPartition(f)
>
> Worker 1: f( Iterator[partition 1 records 1 - 50] )
> Worker 2: f( Iterator[partition 1 records 51 - 100] )
>
> It is unclear from the scaladocs
> (
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
> ).
> But you can imagine, if it is critical that this data be committed in a
> single transaction, that two workers will have issues.
>
>
>
> -- Will O
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread will-ob
Hey Cody (et. al.),

Few more questions related to this. It sounds like our missing data issues
appear fixed with this approach. Could you shed some light on a few
questions that came up?

-

Processing our data inside a single foreachPartition function appears to be
very different from the pattern seen in the programming guide. Does this
become problematic with additional, interleaved reduce/filter/map steps?

```
# typical?
rdd
  .map { ... }
  .reduce { ... }
  .filter { ... }
  .reduce { ... }
  .foreachRdd { writeToDb }

# with foreachPartition
rdd.foreachPartition { case (iter) =>
  iter
.map { ... }
.reduce { ... }
.filter { ... }
.reduce { ... }
}

```
-

Could the above be simplified by having

one kafka partition per DStream, rather than
one kafka partition per RDD partition

?

That way, we wouldn't need to do our processing inside each partition as
there would only be one set of kafka metadata to commit.

Presumably, one could `join` DStreams when topic-level aggregates were
needed.

It seems this was the approach of Michael Noll in his blog post.
(http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/)
Although, his primary motivation appears to be maintaining high-throughput /
parallelism rather than kafka metadata.

-

>From the blog post:

"... there is no long-running receiver task that occupies a core per stream
regardless of what the message volume is."

Is this because data is retrieved by polling rather than maintaining a
socket? Is it still the case that there is only one receiver process per
DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1
.. else discover the machine's NIC limit?

Can you think of a reason not to do this? Cluster utilization, or the like,
perhaps?



And seems a silly question, but does `foreachPartition` guarantee that a
single worker will process the passed function? Or might two workers split
the work?

Eg. foreachPartition(f)

Worker 1: f( Iterator[partition 1 records 1 - 50] )
Worker 2: f( Iterator[partition 1 records 51 - 100] )

It is unclear from the scaladocs
(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD).
But you can imagine, if it is critical that this data be committed in a
single transaction, that two workers will have issues.



-- Will O




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-05 Thread Cody Koeninger
Glad that worked out for you.  I updated the post on my github to hopefully
clarify the issue.

On Tue, May 5, 2015 at 9:36 AM, Mark Stewart 
wrote:

> In case anyone else was having similar issues, the reordering and dropping
> of the reduceByKey solved the issues we were having. Thank you kindly, Mr.
> Koeninger.
>
> On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger 
> wrote:
>
>> In fact, you're using the 2 arg form of reduce by key to shrink it down
>> to 1 partition
>>
>>  reduceByKey(sumFunc, 1)
>>
>> But you started with 4 kafka partitions?  So they're definitely no longer
>> 1:1
>>
>>
>> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger 
>> wrote:
>>
>>> This is what I'm suggesting, in pseudocode
>>>
>>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>>offset = offsets(i)
>>>result = yourReductionFunction(iter)
>>>transaction {
>>>   save(result)
>>>   save(offset)
>>>}
>>> }.foreach { (_: Nothing) => () }
>>>
>>> where yourReductionFunction is just normal scala code.
>>>
>>> The code you posted looks like you're only saving offsets once per
>>> partition, but you're doing it after reduceByKey.  Reduction steps in spark
>>> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
>>> correspondence between spark partiion and kafka partition.  If you want to
>>> verify that's what the problem is, log the value of currentOffset whenever
>>> it changes.
>>>
>>>
>>>
>>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants 
>>> wrote:
>>>
 Cody Koeninger-2 wrote
 > What's your schema for the offset table, and what's the definition of
 > writeOffset ?

 The schema is the same as the one in your post: topic | partition|
 offset
 The writeOffset is nearly identical:

   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit
 = {
 logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
 if(osr==null) {
   logWarning("no offset provided")
   return
 }

 val updated = sql"""
 update txn_offsets set off = ${osr.untilOffset}
   where topic = ${osr.topic} and part = ${osr.partition} and off =
 ${osr.fromOffset}
 """.update.apply()
 if (updated != 1) {
   throw new Exception( Thread.currentThread().toString + s"""failed
 to
 write offset:
 ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
 } else {
   logWarning(Thread.currentThread().toString + "offsets updated to
 " +
 osr.untilOffset)
 }

   }


 Cody Koeninger-2 wrote
 > What key are you reducing on?  Maybe I'm misreading the code, but it
 looks
 > like the per-partition offset is part of the key.  If that's true
 then you
 > could just do your reduction on each partition, rather than after the
 fact
 > on the whole stream.

 Yes, the key is a duple comprised of a case class called Key and the
 partition's OffsetRange. We piggybacked the OffsetRange in this way so
 it
 would be available within the scope of the partition.

 I have tried moving the reduceByKey from the end of the .transform block
 into the partition level (at the end of the mapPartitionsWithIndex
 block.)
 This is what you're suggesting, yes? The results didn't correct the
 offset
 update behavior; they still get out of sync pretty quickly.

 Some details: I'm using the kafka-console-producer.sh tool to drive the
 process, calling it three or four times in succession and piping in
 100-1000
 messages in each call. Once all the messages have been processed I wait
 for
 the output of the printOffsets method to stop changing and compare it
 to the
 txn_offsets table. (When no data is getting processed the printOffsets
 method yields something like the following: [ OffsetRange(topic:
 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
 'testmulti', partition: 3, range: [20900 -> 20900]])

 Thanks,
 Mark






 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


>>>
>>
>


Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-05 Thread Mark Stewart
In case anyone else was having similar issues, the reordering and dropping
of the reduceByKey solved the issues we were having. Thank you kindly, Mr.
Koeninger.

On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger  wrote:

> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1 partition
>
>  reduceByKey(sumFunc, 1)
>
> But you started with 4 kafka partitions?  So they're definitely no longer
> 1:1
>
>
> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger 
> wrote:
>
>> This is what I'm suggesting, in pseudocode
>>
>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>offset = offsets(i)
>>result = yourReductionFunction(iter)
>>transaction {
>>   save(result)
>>   save(offset)
>>}
>> }.foreach { (_: Nothing) => () }
>>
>> where yourReductionFunction is just normal scala code.
>>
>> The code you posted looks like you're only saving offsets once per
>> partition, but you're doing it after reduceByKey.  Reduction steps in spark
>> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
>> correspondence between spark partiion and kafka partition.  If you want to
>> verify that's what the problem is, log the value of currentOffset whenever
>> it changes.
>>
>>
>>
>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants 
>> wrote:
>>
>>> Cody Koeninger-2 wrote
>>> > What's your schema for the offset table, and what's the definition of
>>> > writeOffset ?
>>>
>>> The schema is the same as the one in your post: topic | partition| offset
>>> The writeOffset is nearly identical:
>>>
>>>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit =
>>> {
>>> logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>>> if(osr==null) {
>>>   logWarning("no offset provided")
>>>   return
>>> }
>>>
>>> val updated = sql"""
>>> update txn_offsets set off = ${osr.untilOffset}
>>>   where topic = ${osr.topic} and part = ${osr.partition} and off =
>>> ${osr.fromOffset}
>>> """.update.apply()
>>> if (updated != 1) {
>>>   throw new Exception( Thread.currentThread().toString + s"""failed
>>> to
>>> write offset:
>>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>>> } else {
>>>   logWarning(Thread.currentThread().toString + "offsets updated to "
>>> +
>>> osr.untilOffset)
>>> }
>>>
>>>   }
>>>
>>>
>>> Cody Koeninger-2 wrote
>>> > What key are you reducing on?  Maybe I'm misreading the code, but it
>>> looks
>>> > like the per-partition offset is part of the key.  If that's true then
>>> you
>>> > could just do your reduction on each partition, rather than after the
>>> fact
>>> > on the whole stream.
>>>
>>> Yes, the key is a duple comprised of a case class called Key and the
>>> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
>>> would be available within the scope of the partition.
>>>
>>> I have tried moving the reduceByKey from the end of the .transform block
>>> into the partition level (at the end of the mapPartitionsWithIndex
>>> block.)
>>> This is what you're suggesting, yes? The results didn't correct the
>>> offset
>>> update behavior; they still get out of sync pretty quickly.
>>>
>>> Some details: I'm using the kafka-console-producer.sh tool to drive the
>>> process, calling it three or four times in succession and piping in
>>> 100-1000
>>> messages in each call. Once all the messages have been processed I wait
>>> for
>>> the output of the printOffsets method to stop changing and compare it to
>>> the
>>> txn_offsets table. (When no data is getting processed the printOffsets
>>> method yields something like the following: [ OffsetRange(topic:
>>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
>>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
>>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
>>> 'testmulti', partition: 3, range: [20900 -> 20900]])
>>>
>>> Thanks,
>>> Mark
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>
>


Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread badgerpants
Cody Koeninger-2 wrote
> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1 partition
> 
>  reduceByKey(sumFunc, 1)
> 
> But you started with 4 kafka partitions?  So they're definitely no longer
> 1:1

True. I added the second arg because we were seeing multiple threads
attempting to update the same offset. Setting it to 1 prevented that but
doesn't fix the core issue.


Cody Koeninger-2 wrote
>> This is what I'm suggesting, in pseudocode
>>
>> rdd.mapPartitionsWithIndex { case (i, iter) =>
>>offset = offsets(i)
>>result = yourReductionFunction(iter)
>>transaction {
>>   save(result)
>>   save(offset)
>>}
>> }.foreach { (_: Nothing) => () }
>>
>> where yourReductionFunction is just normal scala code.
>>

I'll give this a try. Thanks, Cody.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11928.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
In fact, you're using the 2 arg form of reduce by key to shrink it down to
1 partition

 reduceByKey(sumFunc, 1)

But you started with 4 kafka partitions?  So they're definitely no longer
1:1


On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger  wrote:

> This is what I'm suggesting, in pseudocode
>
> rdd.mapPartitionsWithIndex { case (i, iter) =>
>offset = offsets(i)
>result = yourReductionFunction(iter)
>transaction {
>   save(result)
>   save(offset)
>}
> }.foreach { (_: Nothing) => () }
>
> where yourReductionFunction is just normal scala code.
>
> The code you posted looks like you're only saving offsets once per
> partition, but you're doing it after reduceByKey.  Reduction steps in spark
> imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
> correspondence between spark partiion and kafka partition.  If you want to
> verify that's what the problem is, log the value of currentOffset whenever
> it changes.
>
>
>
> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants 
> wrote:
>
>> Cody Koeninger-2 wrote
>> > What's your schema for the offset table, and what's the definition of
>> > writeOffset ?
>>
>> The schema is the same as the one in your post: topic | partition| offset
>> The writeOffset is nearly identical:
>>
>>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
>> logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
>> if(osr==null) {
>>   logWarning("no offset provided")
>>   return
>> }
>>
>> val updated = sql"""
>> update txn_offsets set off = ${osr.untilOffset}
>>   where topic = ${osr.topic} and part = ${osr.partition} and off =
>> ${osr.fromOffset}
>> """.update.apply()
>> if (updated != 1) {
>>   throw new Exception( Thread.currentThread().toString + s"""failed to
>> write offset:
>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
>> } else {
>>   logWarning(Thread.currentThread().toString + "offsets updated to " +
>> osr.untilOffset)
>> }
>>
>>   }
>>
>>
>> Cody Koeninger-2 wrote
>> > What key are you reducing on?  Maybe I'm misreading the code, but it
>> looks
>> > like the per-partition offset is part of the key.  If that's true then
>> you
>> > could just do your reduction on each partition, rather than after the
>> fact
>> > on the whole stream.
>>
>> Yes, the key is a duple comprised of a case class called Key and the
>> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
>> would be available within the scope of the partition.
>>
>> I have tried moving the reduceByKey from the end of the .transform block
>> into the partition level (at the end of the mapPartitionsWithIndex block.)
>> This is what you're suggesting, yes? The results didn't correct the offset
>> update behavior; they still get out of sync pretty quickly.
>>
>> Some details: I'm using the kafka-console-producer.sh tool to drive the
>> process, calling it three or four times in succession and piping in
>> 100-1000
>> messages in each call. Once all the messages have been processed I wait
>> for
>> the output of the printOffsets method to stop changing and compare it to
>> the
>> txn_offsets table. (When no data is getting processed the printOffsets
>> method yields something like the following: [ OffsetRange(topic:
>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
>> 'testmulti', partition: 3, range: [20900 -> 20900]])
>>
>> Thanks,
>> Mark
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
This is what I'm suggesting, in pseudocode

rdd.mapPartitionsWithIndex { case (i, iter) =>
   offset = offsets(i)
   result = yourReductionFunction(iter)
   transaction {
  save(result)
  save(offset)
   }
}.foreach { (_: Nothing) => () }

where yourReductionFunction is just normal scala code.

The code you posted looks like you're only saving offsets once per
partition, but you're doing it after reduceByKey.  Reduction steps in spark
imply a shuffle.  After a shuffle you no longer have a guaranteed 1:1
correspondence between spark partiion and kafka partition.  If you want to
verify that's what the problem is, log the value of currentOffset whenever
it changes.



On Thu, Apr 30, 2015 at 1:38 PM, badgerpants 
wrote:

> Cody Koeninger-2 wrote
> > What's your schema for the offset table, and what's the definition of
> > writeOffset ?
>
> The schema is the same as the one in your post: topic | partition| offset
> The writeOffset is nearly identical:
>
>   def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
> logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
> if(osr==null) {
>   logWarning("no offset provided")
>   return
> }
>
> val updated = sql"""
> update txn_offsets set off = ${osr.untilOffset}
>   where topic = ${osr.topic} and part = ${osr.partition} and off =
> ${osr.fromOffset}
> """.update.apply()
> if (updated != 1) {
>   throw new Exception( Thread.currentThread().toString + s"""failed to
> write offset:
> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
> } else {
>   logWarning(Thread.currentThread().toString + "offsets updated to " +
> osr.untilOffset)
> }
>
>   }
>
>
> Cody Koeninger-2 wrote
> > What key are you reducing on?  Maybe I'm misreading the code, but it
> looks
> > like the per-partition offset is part of the key.  If that's true then
> you
> > could just do your reduction on each partition, rather than after the
> fact
> > on the whole stream.
>
> Yes, the key is a duple comprised of a case class called Key and the
> partition's OffsetRange. We piggybacked the OffsetRange in this way so it
> would be available within the scope of the partition.
>
> I have tried moving the reduceByKey from the end of the .transform block
> into the partition level (at the end of the mapPartitionsWithIndex block.)
> This is what you're suggesting, yes? The results didn't correct the offset
> update behavior; they still get out of sync pretty quickly.
>
> Some details: I'm using the kafka-console-producer.sh tool to drive the
> process, calling it three or four times in succession and piping in
> 100-1000
> messages in each call. Once all the messages have been processed I wait for
> the output of the printOffsets method to stop changing and compare it to
> the
> txn_offsets table. (When no data is getting processed the printOffsets
> method yields something like the following: [ OffsetRange(topic:
> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
> 'testmulti', partition: 3, range: [20900 -> 20900]])
>
> Thanks,
> Mark
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread badgerpants
Cody Koeninger-2 wrote
> What's your schema for the offset table, and what's the definition of
> writeOffset ?

The schema is the same as the one in your post: topic | partition| offset
The writeOffset is nearly identical:

  def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
logWarning(Thread.currentThread().toString + "writeOffset: " + osr)
if(osr==null) {
  logWarning("no offset provided")
  return
}

val updated = sql"""
update txn_offsets set off = ${osr.untilOffset}
  where topic = ${osr.topic} and part = ${osr.partition} and off =
${osr.fromOffset}
""".update.apply()
if (updated != 1) {
  throw new Exception( Thread.currentThread().toString + s"""failed to
write offset:
${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""")
} else {
  logWarning(Thread.currentThread().toString + "offsets updated to " +
osr.untilOffset)
}

  }


Cody Koeninger-2 wrote
> What key are you reducing on?  Maybe I'm misreading the code, but it looks
> like the per-partition offset is part of the key.  If that's true then you
> could just do your reduction on each partition, rather than after the fact
> on the whole stream.

Yes, the key is a duple comprised of a case class called Key and the
partition's OffsetRange. We piggybacked the OffsetRange in this way so it
would be available within the scope of the partition.

I have tried moving the reduceByKey from the end of the .transform block
into the partition level (at the end of the mapPartitionsWithIndex block.)
This is what you're suggesting, yes? The results didn't correct the offset
update behavior; they still get out of sync pretty quickly.

Some details: I'm using the kafka-console-producer.sh tool to drive the
process, calling it three or four times in succession and piping in 100-1000
messages in each call. Once all the messages have been processed I wait for
the output of the printOffsets method to stop changing and compare it to the
txn_offsets table. (When no data is getting processed the printOffsets
method yields something like the following: [ OffsetRange(topic:
'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic:
'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic:
'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic:
'testmulti', partition: 3, range: [20900 -> 20900]])

Thanks,
Mark






--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
What's your schema for the offset table, and what's the definition of
writeOffset ?

What key are you reducing on?  Maybe I'm misreading the code, but it looks
like the per-partition offset is part of the key.  If that's true then you
could just do your reduction on each partition, rather than after the fact
on the whole stream.

On Thu, Apr 30, 2015 at 12:10 PM, badgerpants 
wrote:

> We're a group of experienced backend developers who are fairly new to Spark
> Streaming (and Scala) and very interested in using the new (in 1.3)
> DirectKafkaInputDStream impl as part of the metrics reporting service we're
> building.
>
> Our flow involves reading in metric events, lightly modifying some of the
> data values, and then creating aggregates via reduceByKey. We're following
> the approach in Cody Koeninger's blog on exactly-once streaming
> (https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md)
> in
> which the Kakfa OffsetRanges are grabbed from the RDD and persisted to a
> tracking table within the same db transaction as the data within said
> ranges.
>
> Within a short time frame the offsets in the table fall out of synch with
> the offsets. It appears that the writeOffsets method (see code below)
> occasionally doesn't get called which also indicates that some blocks of
> data aren't being processed either; the aggregate operation makes this
> difficult to eyeball from the data that's written to the db.
>
> Note that we do understand that the reduce operation alters that
> size/boundaries of the partitions we end up processing. Indeed, without the
> reduceByKey operation our code seems to work perfectly. But without the
> reduceByKey operation the db has to perform *a lot* more updates. It's
> certainly a significant restriction to place on what is such a promising
> approach. I'm hoping there simply something we're missing.
>
> Any workarounds or thoughts are welcome. Here's the code we've got:
>
> def run(stream: DStream[Input], conf: Config, args: List[String]): Unit = {
> ...
> val sumFunc: (BigDecimal, BigDecimal) => BigDecimal = (_ + _)
>
> val transformStream = stream.transform { rdd =>
>   val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   printOffsets(offsets) // just prints out the offsets for reference
>   rdd.mapPartitionsWithIndex { case (i, iter) =>
> iter.flatMap { case (name, msg) => extractMetrics(msg) }
>   .map { case (k,v) => ( ( keyWithFlooredTimestamp(k), offsets(i)
> ),
> v ) }
>   }
> }.reduceByKey(sumFunc, 1)
>
> transformStream.foreachRDD { rdd =>
>   rdd.foreachPartition { partition =>
> val conn = DriverManager.getConnection(dbUrl, dbUser, dbPass)
> val db = DB(conn)
> db.autoClose(false)
>
> db.autoCommit { implicit session =>
>   var currentOffset: OffsetRange = null
>   partition.foreach { case (key, value) =>
> currentOffset = key._2
> writeMetrics(key._1, value, table)
>   }
>   writeOffset(currentOffset) // updates the offset positions
> }
> db.close()
>   }
> }
>
> Thanks,
> Mark
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>