Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
Sorry, realized I probably didn't fully answer your question about my blog post, as opposed to Michael Nolls. The direct stream is really blunt, a given RDD partition is just a kafka topic/partition and an upper / lower bound for the range of offsets. When an executor computes the partition, it connects to kafka and pulls only those messages, then closes the connection. There's no long running receiver at all, no caching of connections (I found caching sockets didn't matter much). You get much better cluster utilization that way, because if a partition is relatively small compared to the others in the RDD, the executor gets done with it and gets scheduled another one to work one. With long running receivers spark acts like the receiver takes up a core even if it isn't doing much. Look at the CPU graph on slide 13 of the link i posted. On Thu, May 14, 2015 at 4:21 PM, Cody Koeninger wrote: > If the transformation you're trying to do really is per-partition, it > shouldn't matter whether you're using scala methods or spark methods. The > parallel speedup you're getting is all from doing the work on multiple > machines, and shuffle or caching or other benefits of spark aren't a factor. > > If using scala methods bothers you, do all of your transformation using > spark methods, collect the results back to the driver, and save them with > the offsets there: > > stream.foreachRDD { rdd => > val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges > val results = rdd.some.chain.of.spark.calls.collect > save(offsets, results) > } > > My work-in-progress slides for my talk at the upcoming spark conference > are here > > http://koeninger.github.io/kafka-exactly-once/ > > if that clarifies that point a little bit (slides 20 vs 21) > > The direct stream doesn't use long-running receivers, so the concerns that > blog post is trying to address don't really apply. > > Under normal operation a given partition of an rdd is only going to be > handled by a single executor at a time (as long as you don't turn on > speculative execution... or I suppose it might be possible in some kind of > network partition situation). Transactionality should save you even if > something weird happens though. > > On Thu, May 14, 2015 at 3:44 PM, will-ob wrote: > >> Hey Cody (et. al.), >> >> Few more questions related to this. It sounds like our missing data issues >> appear fixed with this approach. Could you shed some light on a few >> questions that came up? >> >> - >> >> Processing our data inside a single foreachPartition function appears to >> be >> very different from the pattern seen in the programming guide. Does this >> become problematic with additional, interleaved reduce/filter/map steps? >> >> ``` >> # typical? >> rdd >> .map { ... } >> .reduce { ... } >> .filter { ... } >> .reduce { ... } >> .foreachRdd { writeToDb } >> >> # with foreachPartition >> rdd.foreachPartition { case (iter) => >> iter >> .map { ... } >> .reduce { ... } >> .filter { ... } >> .reduce { ... } >> } >> >> ``` >> - >> >> Could the above be simplified by having >> >> one kafka partition per DStream, rather than >> one kafka partition per RDD partition >> >> ? >> >> That way, we wouldn't need to do our processing inside each partition as >> there would only be one set of kafka metadata to commit. >> >> Presumably, one could `join` DStreams when topic-level aggregates were >> needed. >> >> It seems this was the approach of Michael Noll in his blog post. >> ( >> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ >> ) >> Although, his primary motivation appears to be maintaining >> high-throughput / >> parallelism rather than kafka metadata. >> >> - >> >> From the blog post: >> >> "... there is no long-running receiver task that occupies a core per >> stream >> regardless of what the message volume is." >> >> Is this because data is retrieved by polling rather than maintaining a >> socket? Is it still the case that there is only one receiver process per >> DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1 >> .. else discover the machine's NIC limit? >> >> Can you think of a reason not to do this? Cluster utilization, or the >> like, >> perhaps? >> >> >> >> And seems a silly question, but does `foreachPartition` guarantee that a >> single worker will process the passed function? Or might two workers split >> the work? >> >> Eg. foreachPartition(f) >> >> Worker 1: f( Iterator[partition 1 records 1 - 50] ) >> Worker 2: f( Iterator[partition 1 records 51 - 100] ) >> >> It is unclear from the scaladocs >> ( >> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD >> ). >> But you can imagine, if it is critical that this data be committed in a >> single transaction, that two workers will have issues. >> >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
If the transformation you're trying to do really is per-partition, it shouldn't matter whether you're using scala methods or spark methods. The parallel speedup you're getting is all from doing the work on multiple machines, and shuffle or caching or other benefits of spark aren't a factor. If using scala methods bothers you, do all of your transformation using spark methods, collect the results back to the driver, and save them with the offsets there: stream.foreachRDD { rdd => val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges val results = rdd.some.chain.of.spark.calls.collect save(offsets, results) } My work-in-progress slides for my talk at the upcoming spark conference are here http://koeninger.github.io/kafka-exactly-once/ if that clarifies that point a little bit (slides 20 vs 21) The direct stream doesn't use long-running receivers, so the concerns that blog post is trying to address don't really apply. Under normal operation a given partition of an rdd is only going to be handled by a single executor at a time (as long as you don't turn on speculative execution... or I suppose it might be possible in some kind of network partition situation). Transactionality should save you even if something weird happens though. On Thu, May 14, 2015 at 3:44 PM, will-ob wrote: > Hey Cody (et. al.), > > Few more questions related to this. It sounds like our missing data issues > appear fixed with this approach. Could you shed some light on a few > questions that came up? > > - > > Processing our data inside a single foreachPartition function appears to be > very different from the pattern seen in the programming guide. Does this > become problematic with additional, interleaved reduce/filter/map steps? > > ``` > # typical? > rdd > .map { ... } > .reduce { ... } > .filter { ... } > .reduce { ... } > .foreachRdd { writeToDb } > > # with foreachPartition > rdd.foreachPartition { case (iter) => > iter > .map { ... } > .reduce { ... } > .filter { ... } > .reduce { ... } > } > > ``` > - > > Could the above be simplified by having > > one kafka partition per DStream, rather than > one kafka partition per RDD partition > > ? > > That way, we wouldn't need to do our processing inside each partition as > there would only be one set of kafka metadata to commit. > > Presumably, one could `join` DStreams when topic-level aggregates were > needed. > > It seems this was the approach of Michael Noll in his blog post. > ( > http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ > ) > Although, his primary motivation appears to be maintaining high-throughput > / > parallelism rather than kafka metadata. > > - > > From the blog post: > > "... there is no long-running receiver task that occupies a core per stream > regardless of what the message volume is." > > Is this because data is retrieved by polling rather than maintaining a > socket? Is it still the case that there is only one receiver process per > DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1 > .. else discover the machine's NIC limit? > > Can you think of a reason not to do this? Cluster utilization, or the like, > perhaps? > > > > And seems a silly question, but does `foreachPartition` guarantee that a > single worker will process the passed function? Or might two workers split > the work? > > Eg. foreachPartition(f) > > Worker 1: f( Iterator[partition 1 records 1 - 50] ) > Worker 2: f( Iterator[partition 1 records 51 - 100] ) > > It is unclear from the scaladocs > ( > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD > ). > But you can imagine, if it is critical that this data be committed in a > single transaction, that two workers will have issues. > > > > -- Will O > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
Hey Cody (et. al.), Few more questions related to this. It sounds like our missing data issues appear fixed with this approach. Could you shed some light on a few questions that came up? - Processing our data inside a single foreachPartition function appears to be very different from the pattern seen in the programming guide. Does this become problematic with additional, interleaved reduce/filter/map steps? ``` # typical? rdd .map { ... } .reduce { ... } .filter { ... } .reduce { ... } .foreachRdd { writeToDb } # with foreachPartition rdd.foreachPartition { case (iter) => iter .map { ... } .reduce { ... } .filter { ... } .reduce { ... } } ``` - Could the above be simplified by having one kafka partition per DStream, rather than one kafka partition per RDD partition ? That way, we wouldn't need to do our processing inside each partition as there would only be one set of kafka metadata to commit. Presumably, one could `join` DStreams when topic-level aggregates were needed. It seems this was the approach of Michael Noll in his blog post. (http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/) Although, his primary motivation appears to be maintaining high-throughput / parallelism rather than kafka metadata. - >From the blog post: "... there is no long-running receiver task that occupies a core per stream regardless of what the message volume is." Is this because data is retrieved by polling rather than maintaining a socket? Is it still the case that there is only one receiver process per DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1 .. else discover the machine's NIC limit? Can you think of a reason not to do this? Cluster utilization, or the like, perhaps? And seems a silly question, but does `foreachPartition` guarantee that a single worker will process the passed function? Or might two workers split the work? Eg. foreachPartition(f) Worker 1: f( Iterator[partition 1 records 1 - 50] ) Worker 2: f( Iterator[partition 1 records 51 - 100] ) It is unclear from the scaladocs (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD). But you can imagine, if it is critical that this data be committed in a single transaction, that two workers will have issues. -- Will O -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
Glad that worked out for you. I updated the post on my github to hopefully clarify the issue. On Tue, May 5, 2015 at 9:36 AM, Mark Stewart wrote: > In case anyone else was having similar issues, the reordering and dropping > of the reduceByKey solved the issues we were having. Thank you kindly, Mr. > Koeninger. > > On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger > wrote: > >> In fact, you're using the 2 arg form of reduce by key to shrink it down >> to 1 partition >> >> reduceByKey(sumFunc, 1) >> >> But you started with 4 kafka partitions? So they're definitely no longer >> 1:1 >> >> >> On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger >> wrote: >> >>> This is what I'm suggesting, in pseudocode >>> >>> rdd.mapPartitionsWithIndex { case (i, iter) => >>>offset = offsets(i) >>>result = yourReductionFunction(iter) >>>transaction { >>> save(result) >>> save(offset) >>>} >>> }.foreach { (_: Nothing) => () } >>> >>> where yourReductionFunction is just normal scala code. >>> >>> The code you posted looks like you're only saving offsets once per >>> partition, but you're doing it after reduceByKey. Reduction steps in spark >>> imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 >>> correspondence between spark partiion and kafka partition. If you want to >>> verify that's what the problem is, log the value of currentOffset whenever >>> it changes. >>> >>> >>> >>> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants >>> wrote: >>> Cody Koeninger-2 wrote > What's your schema for the offset table, and what's the definition of > writeOffset ? The schema is the same as the one in your post: topic | partition| offset The writeOffset is nearly identical: def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { logWarning(Thread.currentThread().toString + "writeOffset: " + osr) if(osr==null) { logWarning("no offset provided") return } val updated = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (updated != 1) { throw new Exception( Thread.currentThread().toString + s"""failed to write offset: ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") } else { logWarning(Thread.currentThread().toString + "offsets updated to " + osr.untilOffset) } } Cody Koeninger-2 wrote > What key are you reducing on? Maybe I'm misreading the code, but it looks > like the per-partition offset is part of the key. If that's true then you > could just do your reduction on each partition, rather than after the fact > on the whole stream. Yes, the key is a duple comprised of a case class called Key and the partition's OffsetRange. We piggybacked the OffsetRange in this way so it would be available within the scope of the partition. I have tried moving the reduceByKey from the end of the .transform block into the partition level (at the end of the mapPartitionsWithIndex block.) This is what you're suggesting, yes? The results didn't correct the offset update behavior; they still get out of sync pretty quickly. Some details: I'm using the kafka-console-producer.sh tool to drive the process, calling it three or four times in succession and piping in 100-1000 messages in each call. Once all the messages have been processed I wait for the output of the printOffsets method to stop changing and compare it to the txn_offsets table. (When no data is getting processed the printOffsets method yields something like the following: [ OffsetRange(topic: 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: 'testmulti', partition: 3, range: [20900 -> 20900]]) Thanks, Mark -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org >>> >> >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
In case anyone else was having similar issues, the reordering and dropping of the reduceByKey solved the issues we were having. Thank you kindly, Mr. Koeninger. On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger wrote: > In fact, you're using the 2 arg form of reduce by key to shrink it down to > 1 partition > > reduceByKey(sumFunc, 1) > > But you started with 4 kafka partitions? So they're definitely no longer > 1:1 > > > On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger > wrote: > >> This is what I'm suggesting, in pseudocode >> >> rdd.mapPartitionsWithIndex { case (i, iter) => >>offset = offsets(i) >>result = yourReductionFunction(iter) >>transaction { >> save(result) >> save(offset) >>} >> }.foreach { (_: Nothing) => () } >> >> where yourReductionFunction is just normal scala code. >> >> The code you posted looks like you're only saving offsets once per >> partition, but you're doing it after reduceByKey. Reduction steps in spark >> imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 >> correspondence between spark partiion and kafka partition. If you want to >> verify that's what the problem is, log the value of currentOffset whenever >> it changes. >> >> >> >> On Thu, Apr 30, 2015 at 1:38 PM, badgerpants >> wrote: >> >>> Cody Koeninger-2 wrote >>> > What's your schema for the offset table, and what's the definition of >>> > writeOffset ? >>> >>> The schema is the same as the one in your post: topic | partition| offset >>> The writeOffset is nearly identical: >>> >>> def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = >>> { >>> logWarning(Thread.currentThread().toString + "writeOffset: " + osr) >>> if(osr==null) { >>> logWarning("no offset provided") >>> return >>> } >>> >>> val updated = sql""" >>> update txn_offsets set off = ${osr.untilOffset} >>> where topic = ${osr.topic} and part = ${osr.partition} and off = >>> ${osr.fromOffset} >>> """.update.apply() >>> if (updated != 1) { >>> throw new Exception( Thread.currentThread().toString + s"""failed >>> to >>> write offset: >>> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") >>> } else { >>> logWarning(Thread.currentThread().toString + "offsets updated to " >>> + >>> osr.untilOffset) >>> } >>> >>> } >>> >>> >>> Cody Koeninger-2 wrote >>> > What key are you reducing on? Maybe I'm misreading the code, but it >>> looks >>> > like the per-partition offset is part of the key. If that's true then >>> you >>> > could just do your reduction on each partition, rather than after the >>> fact >>> > on the whole stream. >>> >>> Yes, the key is a duple comprised of a case class called Key and the >>> partition's OffsetRange. We piggybacked the OffsetRange in this way so it >>> would be available within the scope of the partition. >>> >>> I have tried moving the reduceByKey from the end of the .transform block >>> into the partition level (at the end of the mapPartitionsWithIndex >>> block.) >>> This is what you're suggesting, yes? The results didn't correct the >>> offset >>> update behavior; they still get out of sync pretty quickly. >>> >>> Some details: I'm using the kafka-console-producer.sh tool to drive the >>> process, calling it three or four times in succession and piping in >>> 100-1000 >>> messages in each call. Once all the messages have been processed I wait >>> for >>> the output of the printOffsets method to stop changing and compare it to >>> the >>> txn_offsets table. (When no data is getting processed the printOffsets >>> method yields something like the following: [ OffsetRange(topic: >>> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: >>> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: >>> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: >>> 'testmulti', partition: 3, range: [20900 -> 20900]]) >>> >>> Thanks, >>> Mark >>> >>> >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html >>> Sent from the Apache Spark Developers List mailing list archive at >>> Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >>> >> >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
Cody Koeninger-2 wrote > In fact, you're using the 2 arg form of reduce by key to shrink it down to > 1 partition > > reduceByKey(sumFunc, 1) > > But you started with 4 kafka partitions? So they're definitely no longer > 1:1 True. I added the second arg because we were seeing multiple threads attempting to update the same offset. Setting it to 1 prevented that but doesn't fix the core issue. Cody Koeninger-2 wrote >> This is what I'm suggesting, in pseudocode >> >> rdd.mapPartitionsWithIndex { case (i, iter) => >>offset = offsets(i) >>result = yourReductionFunction(iter) >>transaction { >> save(result) >> save(offset) >>} >> }.foreach { (_: Nothing) => () } >> >> where yourReductionFunction is just normal scala code. >> I'll give this a try. Thanks, Cody. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11928.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
In fact, you're using the 2 arg form of reduce by key to shrink it down to 1 partition reduceByKey(sumFunc, 1) But you started with 4 kafka partitions? So they're definitely no longer 1:1 On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger wrote: > This is what I'm suggesting, in pseudocode > > rdd.mapPartitionsWithIndex { case (i, iter) => >offset = offsets(i) >result = yourReductionFunction(iter) >transaction { > save(result) > save(offset) >} > }.foreach { (_: Nothing) => () } > > where yourReductionFunction is just normal scala code. > > The code you posted looks like you're only saving offsets once per > partition, but you're doing it after reduceByKey. Reduction steps in spark > imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 > correspondence between spark partiion and kafka partition. If you want to > verify that's what the problem is, log the value of currentOffset whenever > it changes. > > > > On Thu, Apr 30, 2015 at 1:38 PM, badgerpants > wrote: > >> Cody Koeninger-2 wrote >> > What's your schema for the offset table, and what's the definition of >> > writeOffset ? >> >> The schema is the same as the one in your post: topic | partition| offset >> The writeOffset is nearly identical: >> >> def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { >> logWarning(Thread.currentThread().toString + "writeOffset: " + osr) >> if(osr==null) { >> logWarning("no offset provided") >> return >> } >> >> val updated = sql""" >> update txn_offsets set off = ${osr.untilOffset} >> where topic = ${osr.topic} and part = ${osr.partition} and off = >> ${osr.fromOffset} >> """.update.apply() >> if (updated != 1) { >> throw new Exception( Thread.currentThread().toString + s"""failed to >> write offset: >> ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") >> } else { >> logWarning(Thread.currentThread().toString + "offsets updated to " + >> osr.untilOffset) >> } >> >> } >> >> >> Cody Koeninger-2 wrote >> > What key are you reducing on? Maybe I'm misreading the code, but it >> looks >> > like the per-partition offset is part of the key. If that's true then >> you >> > could just do your reduction on each partition, rather than after the >> fact >> > on the whole stream. >> >> Yes, the key is a duple comprised of a case class called Key and the >> partition's OffsetRange. We piggybacked the OffsetRange in this way so it >> would be available within the scope of the partition. >> >> I have tried moving the reduceByKey from the end of the .transform block >> into the partition level (at the end of the mapPartitionsWithIndex block.) >> This is what you're suggesting, yes? The results didn't correct the offset >> update behavior; they still get out of sync pretty quickly. >> >> Some details: I'm using the kafka-console-producer.sh tool to drive the >> process, calling it three or four times in succession and piping in >> 100-1000 >> messages in each call. Once all the messages have been processed I wait >> for >> the output of the printOffsets method to stop changing and compare it to >> the >> txn_offsets table. (When no data is getting processed the printOffsets >> method yields something like the following: [ OffsetRange(topic: >> 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: >> 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: >> 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: >> 'testmulti', partition: 3, range: [20900 -> 20900]]) >> >> Thanks, >> Mark >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html >> Sent from the Apache Spark Developers List mailing list archive at >> Nabble.com. >> >> - >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >> For additional commands, e-mail: dev-h...@spark.apache.org >> >> >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
This is what I'm suggesting, in pseudocode rdd.mapPartitionsWithIndex { case (i, iter) => offset = offsets(i) result = yourReductionFunction(iter) transaction { save(result) save(offset) } }.foreach { (_: Nothing) => () } where yourReductionFunction is just normal scala code. The code you posted looks like you're only saving offsets once per partition, but you're doing it after reduceByKey. Reduction steps in spark imply a shuffle. After a shuffle you no longer have a guaranteed 1:1 correspondence between spark partiion and kafka partition. If you want to verify that's what the problem is, log the value of currentOffset whenever it changes. On Thu, Apr 30, 2015 at 1:38 PM, badgerpants wrote: > Cody Koeninger-2 wrote > > What's your schema for the offset table, and what's the definition of > > writeOffset ? > > The schema is the same as the one in your post: topic | partition| offset > The writeOffset is nearly identical: > > def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { > logWarning(Thread.currentThread().toString + "writeOffset: " + osr) > if(osr==null) { > logWarning("no offset provided") > return > } > > val updated = sql""" > update txn_offsets set off = ${osr.untilOffset} > where topic = ${osr.topic} and part = ${osr.partition} and off = > ${osr.fromOffset} > """.update.apply() > if (updated != 1) { > throw new Exception( Thread.currentThread().toString + s"""failed to > write offset: > ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") > } else { > logWarning(Thread.currentThread().toString + "offsets updated to " + > osr.untilOffset) > } > > } > > > Cody Koeninger-2 wrote > > What key are you reducing on? Maybe I'm misreading the code, but it > looks > > like the per-partition offset is part of the key. If that's true then > you > > could just do your reduction on each partition, rather than after the > fact > > on the whole stream. > > Yes, the key is a duple comprised of a case class called Key and the > partition's OffsetRange. We piggybacked the OffsetRange in this way so it > would be available within the scope of the partition. > > I have tried moving the reduceByKey from the end of the .transform block > into the partition level (at the end of the mapPartitionsWithIndex block.) > This is what you're suggesting, yes? The results didn't correct the offset > update behavior; they still get out of sync pretty quickly. > > Some details: I'm using the kafka-console-producer.sh tool to drive the > process, calling it three or four times in succession and piping in > 100-1000 > messages in each call. Once all the messages have been processed I wait for > the output of the printOffsets method to stop changing and compare it to > the > txn_offsets table. (When no data is getting processed the printOffsets > method yields something like the following: [ OffsetRange(topic: > 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: > 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: > 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: > 'testmulti', partition: 3, range: [20900 -> 20900]]) > > Thanks, > Mark > > > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
Cody Koeninger-2 wrote > What's your schema for the offset table, and what's the definition of > writeOffset ? The schema is the same as the one in your post: topic | partition| offset The writeOffset is nearly identical: def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = { logWarning(Thread.currentThread().toString + "writeOffset: " + osr) if(osr==null) { logWarning("no offset provided") return } val updated = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (updated != 1) { throw new Exception( Thread.currentThread().toString + s"""failed to write offset: ${osr.topic},${osr.partition},${osr.fromOffset}-${osr.untilOffset}""") } else { logWarning(Thread.currentThread().toString + "offsets updated to " + osr.untilOffset) } } Cody Koeninger-2 wrote > What key are you reducing on? Maybe I'm misreading the code, but it looks > like the per-partition offset is part of the key. If that's true then you > could just do your reduction on each partition, rather than after the fact > on the whole stream. Yes, the key is a duple comprised of a case class called Key and the partition's OffsetRange. We piggybacked the OffsetRange in this way so it would be available within the scope of the partition. I have tried moving the reduceByKey from the end of the .transform block into the partition level (at the end of the mapPartitionsWithIndex block.) This is what you're suggesting, yes? The results didn't correct the offset update behavior; they still get out of sync pretty quickly. Some details: I'm using the kafka-console-producer.sh tool to drive the process, calling it three or four times in succession and piping in 100-1000 messages in each call. Once all the messages have been processed I wait for the output of the printOffsets method to stop changing and compare it to the txn_offsets table. (When no data is getting processed the printOffsets method yields something like the following: [ OffsetRange(topic: 'testmulti', partition: 1, range: [23602 -> 23602] OffsetRange(topic: 'testmulti', partition: 2, range: [32503 -> 32503] OffsetRange(topic: 'testmulti', partition: 0, range: [26100 -> 26100] OffsetRange(topic: 'testmulti', partition: 3, range: [20900 -> 20900]]) Thanks, Mark -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p11921.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream
What's your schema for the offset table, and what's the definition of writeOffset ? What key are you reducing on? Maybe I'm misreading the code, but it looks like the per-partition offset is part of the key. If that's true then you could just do your reduction on each partition, rather than after the fact on the whole stream. On Thu, Apr 30, 2015 at 12:10 PM, badgerpants wrote: > We're a group of experienced backend developers who are fairly new to Spark > Streaming (and Scala) and very interested in using the new (in 1.3) > DirectKafkaInputDStream impl as part of the metrics reporting service we're > building. > > Our flow involves reading in metric events, lightly modifying some of the > data values, and then creating aggregates via reduceByKey. We're following > the approach in Cody Koeninger's blog on exactly-once streaming > (https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md) > in > which the Kakfa OffsetRanges are grabbed from the RDD and persisted to a > tracking table within the same db transaction as the data within said > ranges. > > Within a short time frame the offsets in the table fall out of synch with > the offsets. It appears that the writeOffsets method (see code below) > occasionally doesn't get called which also indicates that some blocks of > data aren't being processed either; the aggregate operation makes this > difficult to eyeball from the data that's written to the db. > > Note that we do understand that the reduce operation alters that > size/boundaries of the partitions we end up processing. Indeed, without the > reduceByKey operation our code seems to work perfectly. But without the > reduceByKey operation the db has to perform *a lot* more updates. It's > certainly a significant restriction to place on what is such a promising > approach. I'm hoping there simply something we're missing. > > Any workarounds or thoughts are welcome. Here's the code we've got: > > def run(stream: DStream[Input], conf: Config, args: List[String]): Unit = { > ... > val sumFunc: (BigDecimal, BigDecimal) => BigDecimal = (_ + _) > > val transformStream = stream.transform { rdd => > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > printOffsets(offsets) // just prints out the offsets for reference > rdd.mapPartitionsWithIndex { case (i, iter) => > iter.flatMap { case (name, msg) => extractMetrics(msg) } > .map { case (k,v) => ( ( keyWithFlooredTimestamp(k), offsets(i) > ), > v ) } > } > }.reduceByKey(sumFunc, 1) > > transformStream.foreachRDD { rdd => > rdd.foreachPartition { partition => > val conn = DriverManager.getConnection(dbUrl, dbUser, dbPass) > val db = DB(conn) > db.autoClose(false) > > db.autoCommit { implicit session => > var currentOffset: OffsetRange = null > partition.foreach { case (key, value) => > currentOffset = key._2 > writeMetrics(key._1, value, table) > } > writeOffset(currentOffset) // updates the offset positions > } > db.close() > } > } > > Thanks, > Mark > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >