[jira] [Commented] (SPARK-12775) Couldn't find leader offsets exception when hostname can't be resolved
[ https://issues.apache.org/jira/browse/SPARK-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15096550#comment-15096550 ] Cody Koeninger commented on SPARK-12775: Couldn't find leader offsets ... because you couldn't resolve the leader hostname... doesn't seem that misleading to me. If you want to be able to improve the error message, can you post the full stacktrace? > Couldn't find leader offsets exception when hostname can't be resolved > -- > > Key: SPARK-12775 > URL: https://issues.apache.org/jira/browse/SPARK-12775 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Sebastian Piu >Priority: Minor > > When hostname resolution fails for a broker an unclear/misleading error is > shown: > org.apache.spark.SparkException: java.nio.channels.ClosedChannelException > org.apache.spark.SparkException: Couldn't find leader offsets for > Set([mytopic,0], [mytopic,18], [mytopic,12], [mytopic,6]) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > Error above ocurred when a broker was added to the cluster and my machine > could not resolve its hostname -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12693) OffsetOutOfRangeException caused by retention
[ https://issues.apache.org/jira/browse/SPARK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089306#comment-15089306 ] Cody Koeninger commented on SPARK-12693: If an executor is given a range of kakfa offsets that no longer exist in kafka, it's going to error. It doesn't matter whether this is caused by you setting kafka retention parameters to several orders of magnitude smaller than default, or deleting offsets from kafka between the time when the driver created an rdd and the executor started to compute it, or trying to restart from a checkpoint that has old offsets, or manually creating an RDD for offsets that don't exist, or whatever else. If the data doesn't exist any more, the only reasonable thing to do is error. You don't really need to experiment with all the different ways you can cause this to happen, the documentation pretty explicitly says you need adequate kafka retention. Regarding your most recent comment, as the documentation says, 'If __not__ starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" to determine where the stream starts'. You're starting from a checkpoint, you're already specifying where you want the stream to start from, and if those offsets no longer exist, it's not going to work. As I said, changing this would impact correctness (automatically retrying with different offsets would mean silently losing data) and performance (the count optimizations assume that the messages specified in the offset range are actually the messages that will be processed), so I don't see this being likely to change. It's your job to make sure kafka has the data you're asking for. > OffsetOutOfRangeException caused by retention > - > > Key: SPARK-12693 > URL: https://issues.apache.org/jira/browse/SPARK-12693 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.0 > Environment: Ubuntu 64bit, Intel i7 >Reporter: Rado Buransky >Priority: Minor > Labels: kafka > Attachments: kafka-log.txt, log.txt > > > I am running Kafka server locally with extremely low retention of 3 seconds > and with 1 second segmentation. I create direct Kafka stream with > auto.offset.reset = smallest. > In case of bad luck (happens actually quite often in my case) the smallest > offset retrieved druing stream initialization doesn't already exists when > streaming actually starts. > Complete source code of the Spark Streaming application is here: > https://github.com/pygmalios/spark-checkpoint-experience/blob/cb27ab83b7a29e619386b56e68a755d7bd73fc46/src/main/scala/com/pygmalios/sparkCheckpointExperience/spark/SparkApp.scala > The application ends in an endless loop trying to get that non-existing > offset and has to be killed. Check attached logs from Spark and also from > Kafka server. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12693) OffsetOutOfRangeException cause by retention
[ https://issues.apache.org/jira/browse/SPARK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15088725#comment-15088725 ] Cody Koeninger commented on SPARK-12693: What is your actual use case for changing the log retention check from the default 5 minutes down to 100 millis? As I said originally, if you have such a short retention time, and want to avoid the issue, use offset reset largest rather than smallest. Even then you need to make sure your batch time is small enough and you never lag behind on processing. On Thu, Jan 7, 2016 at 10:51 PM, Rado Buransky (JIRA) > OffsetOutOfRangeException cause by retention > > > Key: SPARK-12693 > URL: https://issues.apache.org/jira/browse/SPARK-12693 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.6.0 > Environment: Ubuntu 64bit, Intel i7 >Reporter: Rado Buransky >Priority: Minor > Labels: kafka > Attachments: kafka-log.txt, log.txt > > > I am running Kafka server locally with extremely low retention of 3 seconds > and with 1 second segmentation. I create direct Kafka stream with > auto.offset.reset = smallest. > In case of bad luck (happens actually quite often in my case) the smallest > offset retrieved druing stream initialization doesn't already exists when > streaming actually starts. > Complete source code of the Spark Streaming application is here: > https://github.com/pygmalios/spark-checkpoint-experience/blob/cb27ab83b7a29e619386b56e68a755d7bd73fc46/src/main/scala/com/pygmalios/sparkCheckpointExperience/spark/SparkApp.scala > The application ends in an endless loop trying to get that non-existing > offset and has to be killed. Check attached logs from Spark and also from > Kafka server. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10963) Make KafkaCluster api public
[ https://issues.apache.org/jira/browse/SPARK-10963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076536#comment-15076536 ] Cody Koeninger commented on SPARK-10963: There's a nonzero chance that the 0.9 integration will be a totally separate subproject. > Make KafkaCluster api public > > > Key: SPARK-10963 > URL: https://issues.apache.org/jira/browse/SPARK-10963 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Cody Koeninger >Priority: Minor > > per mailing list discussion, theres enough interest in people using > KafkaCluster (e.g. to access latest offsets) to justify making it public -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12524) Group by key in a pairrdd without any shuffle
[ https://issues.apache.org/jira/browse/SPARK-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071884#comment-15071884 ] Cody Koeninger commented on SPARK-12524: mapPartitions is just giving you an iterator, what you do with it is up to you. Have you actually tried it for your use case? > Group by key in a pairrdd without any shuffle > - > > Key: SPARK-12524 > URL: https://issues.apache.org/jira/browse/SPARK-12524 > Project: Spark > Issue Type: Improvement > Components: Build, Java API >Affects Versions: 1.5.2 >Reporter: Shushant Arora > Original Estimate: 504h > Remaining Estimate: 504h > > In a PairRDD. When we are all values of same key are in same partition > and want to perform group by key locally and no reduce/aggregation operation > afterwords just further tranformation on grouped rdd. There is no facility > for that. We have to perform shuffle which is costly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12524) Group by key in a pairrdd without any shuffle
[ https://issues.apache.org/jira/browse/SPARK-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071872#comment-15071872 ] Cody Koeninger commented on SPARK-12524: Why can't you just use mapPartitions? > Group by key in a pairrdd without any shuffle > - > > Key: SPARK-12524 > URL: https://issues.apache.org/jira/browse/SPARK-12524 > Project: Spark > Issue Type: Improvement > Components: Build, Java API >Affects Versions: 1.5.2 >Reporter: Shushant Arora > Original Estimate: 504h > Remaining Estimate: 504h > > In a PairRDD. When we are all values of same key are in same partition > and want to perform group by key locally and no reduce/aggregation operation > afterwords just further tranformation on grouped rdd. There is no facility > for that. We have to perform shuffle which is costly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15069101#comment-15069101 ] Cody Koeninger commented on SPARK-11045: Direct stream isn't doing any caching unless you specifically ask for it, in which case you can set storage level On Dec 22, 2015 9:30 PM, "Balaji Ramamoorthy (JIRA)" > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062313#comment-15062313 ] Cody Koeninger commented on SPARK-12177: Honestly SSL / auth is the only compelling thing about 0.9, not sure it makes sense to have 0.9 it without it. On Wed, Dec 16, 2015 at 3:16 AM, Nikita Tarasenko (JIRA) > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges
[ https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048938#comment-15048938 ] Cody Koeninger commented on SPARK-9059: --- To be clear, I think SPARK-8389 should make it possible to write an equivalent python snippet, there's just not one in the docs. jerryshao would know better. > Update Python Direct Kafka Word count examples to show the use of > HasOffsetRanges > - > > Key: SPARK-9059 > URL: https://issues.apache.org/jira/browse/SPARK-9059 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Tathagata Das >Priority: Minor > Labels: starter > > Update Python examples of Direct Kafka word count to access the offset ranges > using HasOffsetRanges and print it. For example in Scala, > > {code} > var offsetRanges: Array[OffsetRange] = _ > ... > directKafkaDStream.foreachRDD { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges] > } > ... > transformedDStream.foreachRDD { rdd => > // some operation > println("Processed ranges: " + offsetRanges) > } > {code} > See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for > more info, and the master source code for more updated information on python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges
[ https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048904#comment-15048904 ] Cody Koeninger edited comment on SPARK-9059 at 12/9/15 4:19 PM: HasOffsetRanges is explained here http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers There's not an equivalent python code snippet like the java and scala ones. was (Author: c...@koeninger.org): HasOffsetRanges is explained here http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers There's not an equivalent python code snippet for the java and scala ones. > Update Python Direct Kafka Word count examples to show the use of > HasOffsetRanges > - > > Key: SPARK-9059 > URL: https://issues.apache.org/jira/browse/SPARK-9059 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Tathagata Das >Priority: Minor > Labels: starter > > Update Python examples of Direct Kafka word count to access the offset ranges > using HasOffsetRanges and print it. For example in Scala, > > {code} > var offsetRanges: Array[OffsetRange] = _ > ... > directKafkaDStream.foreachRDD { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges] > } > ... > transformedDStream.foreachRDD { rdd => > // some operation > println("Processed ranges: " + offsetRanges) > } > {code} > See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for > more info, and the master source code for more updated information on python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges
[ https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048904#comment-15048904 ] Cody Koeninger commented on SPARK-9059: --- HasOffsetRanges is explained here http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers There's not an equivalent python code snippet for the java and scala ones. > Update Python Direct Kafka Word count examples to show the use of > HasOffsetRanges > - > > Key: SPARK-9059 > URL: https://issues.apache.org/jira/browse/SPARK-9059 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Tathagata Das >Priority: Minor > Labels: starter > > Update Python examples of Direct Kafka word count to access the offset ranges > using HasOffsetRanges and print it. For example in Scala, > > {code} > var offsetRanges: Array[OffsetRange] = _ > ... > directKafkaDStream.foreachRDD { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges] > } > ... > transformedDStream.foreachRDD { rdd => > // some operation > println("Processed ranges: " + offsetRanges) > } > {code} > See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for > more info, and the master source code for more updated information on python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6051) Add an option for DirectKafkaInputDStream to commit the offsets into ZK
[ https://issues.apache.org/jira/browse/SPARK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048895#comment-15048895 ] Cody Koeninger commented on SPARK-6051: --- Responded on the mailing list, but for posterity's sake: Which version of spark are you on? I thought that was added to the spark UI in recent versions. DIrect api doesn't have any inherent interaction with zookeeper. If you need number of messages per batch and aren't on a recent enough version of spark to see them in the ui, you can get them programmatically from the offset ranges. See the definition of count() in recent versions of KafkaRDD for an example. > Add an option for DirectKafkaInputDStream to commit the offsets into ZK > --- > > Key: SPARK-6051 > URL: https://issues.apache.org/jira/browse/SPARK-6051 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Saisai Shao > > Currently in DirectKafkaInputDStream, offset is managed by Spark Streaming > itself without ZK or Kafka involved, which will make several third-party > offset monitoring tools fail to monitor the status of Kafka consumer. So here > as a option to commit the offset to ZK when each job is finished, the process > is implemented as a asynchronized way, so the main processing flow will not > be blocked, already tested with KafkaOffsetMonitor tools. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12203) Add KafkaDirectInputDStream that directly pulls messages from Kafka Brokers using receivers
[ https://issues.apache.org/jira/browse/SPARK-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048879#comment-15048879 ] Cody Koeninger commented on SPARK-12203: Commented on the PR. I don't think this makes sense for inclusion in spark, at least in its current state. I think efforts towards minimizing latency of the direct stream (assuming that just tuning your batch sizes smaller isn't sufficient) would be better spent pursuing pre-fetching / caching on the executors... but that's a noticeable increase in complexity. > Add KafkaDirectInputDStream that directly pulls messages from Kafka Brokers > using receivers > --- > > Key: SPARK-12203 > URL: https://issues.apache.org/jira/browse/SPARK-12203 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Liang-Chi Hsieh > > Currently, we have DirectKafkaInputDStream, which directly pulls messages > from Kafka Brokers without any receivers, and KafkaInputDStream, which pulls > messages from a Kafka Broker using receiver with zookeeper. > As we observed, because DirectKafkaInputDStream retrieves messages from Kafka > after each batch finishes, it posts a latency compared with KafkaInputDStream > that continues to pull messages during each batch window. > So we try to add KafkaDirectInputDStream that directly pulls messages from > Kafka Brokers as DirectKafkaInputDStream, but it uses receivers as > KafkaInputDStream and pulls messages during each batch window. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046975#comment-15046975 ] Cody Koeninger commented on SPARK-12177: I really think this needs to be handled as a separate subproject, or otherwise in a fully backwards compatible way. The new consumer api will require upgrading kafka brokers, which is a big ask just in order for people to upgrade spark versions. > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger reopened SPARK-12103: PR for doc change: https://github.com/apache/spark/pull/10132 > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15038084#comment-15038084 ] Cody Koeninger commented on SPARK-12103: On the off chance you're acting in good faith, actually did read the Kafka documentation and Spark API documentation, and were still somehow unclear that K meant the type of the message Key and V meant the type of the message Value... I'll submit a pr to add @tparam docs for every type in the KafkaUtils api > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037986#comment-15037986 ] Cody Koeninger commented on SPARK-12103: Knowing that kafka messages have a key and value isn't an "Kafka expert" thing. Serious question - did you read http://kafka.apache.org/documentation.html before posting this? The api documentation for createStream at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$ keyTypeClass Key type of DStream valueTypeClass value type of Dstream keyDecoderClass Type of kafka key decoder valueDecoderClass Type of kafka value decoder How is this not clear? If you have a specific improvement to docs, send a PR > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037773#comment-15037773 ] Cody Koeninger commented on SPARK-12103: A cursory review of the Kafka project documentation should reveal that messages have a key (used for distribution among partitions) and a value. Why would one reasonably expect that Spark documentation referring to a Kafka message key was instead supposed to be the message topic? If you really want the topic name in each item of the rdd, create your separate streams, map over them to add the topic name, then union them together into a single stream. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12002) offsetRanges attribute missing in Kafka RDD when resuming from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029058#comment-15029058 ] Cody Koeninger commented on SPARK-12002: Just to be clear, this is a only an issue with python? > offsetRanges attribute missing in Kafka RDD when resuming from checkpoint > - > > Key: SPARK-12002 > URL: https://issues.apache.org/jira/browse/SPARK-12002 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming >Reporter: Amit Ramesh > > SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed > the issue of not ending up with non-Kafka RDDs when chaining transforms to > Kafka RDDs. It appears that this issue remains for the case where a streaming > application using Kafka direct streams is initialized from the checkpoint > directory. The following is a representative example where everything works > as expected during the first run, but exceptions are thrown on a subsequent > run when the context is being initialized from the checkpoint directory. > {code:title=test_checkpoint.py|language=python} > from pyspark import SparkContext > > from pyspark.streaming import StreamingContext > > from pyspark.streaming.kafka import KafkaUtils > > def attach_kafka_metadata(kafka_rdd): > > offset_ranges = kafka_rdd.offsetRanges() > > > > return kafka_rdd > > > > > > def create_context(): > > sc = SparkContext(appName='kafka-test') > > ssc = StreamingContext(sc, 10) > > ssc.checkpoint(CHECKPOINT_URI) > > > > kafka_stream = KafkaUtils.createDirectStream( > > ssc, > > [TOPIC], > > kafkaParams={ > > 'metadata.broker.list': BROKERS, > > }, > > ) > > kafka_stream.transform(attach_kafka_metadata).count().pprint() > > > > return ssc > > > > > > if __name__ == "__main__": > > ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context) > > ssc.start()
[jira] [Commented] (SPARK-11693) spark kafka direct streaming exception
[ https://issues.apache.org/jira/browse/SPARK-11693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004072#comment-15004072 ] Cody Koeninger commented on SPARK-11693: You've under-provisioned Kafka storage and / or Spark compute capacity. The result is that data is being deleted before it has been processed. I personally think the proper response to a system being broken is for it to obviously break in a noticeable way, rather than silently giving the wrong result. My recommended way to handle this would be to monitor your stream, and have a restart policy that's appropriate for your situation. If you want to modify the area of the code you noted to silently catch the exception and start at the next available offset, you can do so pretty straightforwardly (streaming-kafka is an external module so you shouldn't have to re-deploy all of spark). I don't think that's a modification that makes sense for the general use case however. > spark kafka direct streaming exception > -- > > Key: SPARK-11693 > URL: https://issues.apache.org/jira/browse/SPARK-11693 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.5.1 >Reporter: xiaoxiaoluo >Priority: Minor > > We are using spark kafka direct streaming in our test enviroment. We have > limited the kafka partition size to avoid to exhaust the disk space.So when > the speed of data writing to kafka faster than the speed of spark streaming > reading data. There will be some exception in spark streaming, and the > application will be shut down. > {noformat} > 15/11/11 10:17:35 ERROR Executor: Exception in task 0.3 in stage 1626659.0 > (TID 1134180) > kafka.common.OffsetOutOfRangeException > at sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/11/11 10:17:42 ERROR CoarseGrainedExecutorBackend: Driver 10.1.92.44:49939 > disassociated! Shutting down. > {noformat} > Could streaming get the current smallest offset from this partition? and go > on to process streaming data? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004052#comment-15004052 ] Cody Koeninger commented on SPARK-10320: This is for practical purposes blocked on SPARK-10963 > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11698) Add option to ignore kafka messages that are out of limit rate
[ https://issues.apache.org/jira/browse/SPARK-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004050#comment-15004050 ] Cody Koeninger commented on SPARK-11698: That looks like a reasonable way to solve your particular use case. I think my preference would be to make the strategy for generating the next batch of offsets user-configurable in a more general way, rather than adding knobs for each possible use case. Have you seen the discussion in https://issues.apache.org/jira/browse/SPARK-10320 and would it likely accommodate this modification? Outside of the specifics of the direct stream, this is also related to having different strategies for dealing with backpressure, and I'm not sure what the long-term plan there is. > Add option to ignore kafka messages that are out of limit rate > -- > > Key: SPARK-11698 > URL: https://issues.apache.org/jira/browse/SPARK-11698 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Liang-Chi Hsieh > > With spark.streaming.kafka.maxRatePerPartition, we can control the max rate > limit. However, we can not ignore these messages out of limit. These messages > will be consumed in next iteration. We have a use case that we need to ignore > these messages and process latest messages in next iteration. > In other words, we simply want to consume part of messages in each iteration > and ignore remaining messages that are not consumed. > We add an option for this purpose. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11211) Kafka - offsetOutOfRange forces to largest
[ https://issues.apache.org/jira/browse/SPARK-11211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971834#comment-14971834 ] Cody Koeninger commented on SPARK-11211: In an attempt to replicate, I used the following: - spark 1.5.1 - a kafka broker with pathologically small retention and frequent log check / deletion log.retention.bytes=2048 log.segment.bytes=1024 log.retention.check.interval.ms=3 - the IdempotentExample job from https://github.com/koeninger/kafka-exactly-once, which has a batch size of 60 seconds - simple 1 msg / sec sent from the console producer, e.g. for i in {1..1}; do echo "`date +%s`"; sleep 1; done | bin/kafka-console-producer.sh --broker-list 'localhost:9092' --topic test This results in (at least the first batch) failing as I'd expect, since kafka messages are being deleted before they can be processed. I can see the failed batch in the master ui, and there actually is an exception visible on the driver (although it's a classNotFound exception caused by deserializing the OffsetOutOfRangeException; there's a separate ticket SPARK-11195 related to that). This is what I see in the logs on the executor: 15/10/23 15:16:00 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 0 offsets 4727 -> 4867 15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 1 offsets 2900 -> 2973 15/10/23 15:16:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) kafka.common.OffsetOutOfRangeException // (repeats total of 4 times, then that job fails) 15/10/23 15:20:31 INFO KafkaRDD: Computing topic test, partition 0 offsets 4867 -> 4926 15/10/23 15:20:31 INFO KafkaRDD: Beginning offset 2973 is the same as ending offset skipping test 1 You'll notice there isn't any gap in offsets between the batch that failed and the next batch. It's not that the consumer is suddenly skipping to the largest value on the kafka topic, it's that the first batch failed. This is a situation where the underlying assumption (ie that you have enough kafka retention) has gone wrong, so the best thing to do in my opinion is fail. I'm not sure exactly what you'd expect to happen in this case. Silently submitting another batch that duplicates some smaller portion of the first batch seems like a really bad idea. I'd like to know when I've lost data, and I'd like separate batches to contain strictly separate groups of messages. If you want to reduce the scope of the number of messages that can be lost in this kind of situation, use maxMessagesPerPartition and / or smaller batch sizes. But in any case, you've lost data because Kafka deleted it. Changing spark behavior won't get you your data back. If I'm misunderstanding the nature of the problem, please report the version of spark you're using, an actual observation of behavior (e.g. log output), and the behavior you expect? What you're reporting is more of a (probably inaccurate) theory as to the root cause, as opposed to observed behavior. > Kafka - offsetOutOfRange forces to largest > -- > > Key: SPARK-11211 > URL: https://issues.apache.org/jira/browse/SPARK-11211 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.1, 1.5.1 >Reporter: Daniel Strassler > > This problem relates to how DStreams using the Direct Approach of connecting > to a Kafka topic behave when they request an offset that does not exist on > the topic. Currently it appears the "auto.offset.reset" configuration value > is being ignored and the default value of “largest” is always being used. > > When using the Direct Approach of connecting to a Kafka topic using a > DStream, even if you have the Kafka configuration "auto.offset.reset" set to > smallest, the behavior in the event of a > kafka.common.OffsetOutOfRangeException exception is to move the next offset > to be consumed value to the largest value on the Kafka topic. It appears > that the exception is being eaten and not propagated up to the driver as > well, so a work around triggered by the propagation of the error can not be > implemented either. > > The current behavior of setting to largest means that any data on the Kafka > topic at the time of the exception being thrown is skipped(lost) to > consumption and only data produced to the topic after the exception will be > consumed. Two possible fixes are listed below. > > Fix 1: When “auto.offset.reset" is set to “smallest”, the DStream should set > the next consumed offset to be the smallest offset value on the Kafka topic. > > Fix 2: Propagate the error to the Driver to allow it to react as it deems > appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332) ---
[jira] [Commented] (SPARK-11195) Exception thrown on executor throws ClassNotFound on driver
[ https://issues.apache.org/jira/browse/SPARK-11195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971783#comment-14971783 ] Cody Koeninger commented on SPARK-11195: I'm seeing this on 1.5.1 as well > Exception thrown on executor throws ClassNotFound on driver > --- > > Key: SPARK-11195 > URL: https://issues.apache.org/jira/browse/SPARK-11195 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 >Reporter: Hurshal Patel > > I have a minimal repro job > {code:title=Repro.scala} > package repro > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.SparkException > class MyException(message: String) extends Exception(message: String) > object Repro { > def main(args: Array[String]) { > val conf = new SparkConf().setAppName("MyException ClassNotFound Repro") > val sc = new SparkContext(conf) > sc.parallelize(List(1)).map { x => > throw new repro.MyException("this is a failure") > true > }.collect() > } > } > {code} > On Spark 1.4.1, I get a task failure with the reason correctly set to > MyException. > On Spark 1.5.1, I _expect_ the same behavior, but instead I get a task > failure with an UnknownReason caused by ClassNotFoundException. > > here is the job on vanilla Spark 1.4.1: > {code:title=spark_1.5.1_log} > $ ./bin/spark-submit --master local --deploy-mode client --class repro.Repro > /home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 15/10/19 11:55:20 INFO SparkContext: Running Spark version 1.4.1 > 15/10/19 11:55:21 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 15/10/19 11:55:22 WARN Utils: Your hostname, choochootrain resolves to a > loopback address: 127.0.1.1; using 10.0.1.97 instead (on interface wlan0) > 15/10/19 11:55:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > another address > 15/10/19 11:55:22 INFO SecurityManager: Changing view acls to: root > 15/10/19 11:55:22 INFO SecurityManager: Changing modify acls to: root > 15/10/19 11:55:22 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root); users > with modify permissions: Set(root) > 15/10/19 11:55:24 INFO Slf4jLogger: Slf4jLogger started > 15/10/19 11:55:24 INFO Remoting: Starting remoting > 15/10/19 11:55:24 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@10.0.1.97:46683] > 15/10/19 11:55:24 INFO Utils: Successfully started service 'sparkDriver' on > port 46683. > 15/10/19 11:55:24 INFO SparkEnv: Registering MapOutputTracker > 15/10/19 11:55:24 INFO SparkEnv: Registering BlockManagerMaster > 15/10/19 11:55:24 INFO DiskBlockManager: Created local directory at > /tmp/spark-0348a320-0ca3-4528-9ab5-9ba37d3c2e07/blockmgr-08496143-1d9d-41c8-a581-b6220edf00d5 > 15/10/19 11:55:24 INFO MemoryStore: MemoryStore started with capacity 265.4 MB > 15/10/19 11:55:25 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-0348a320-0ca3-4528-9ab5-9ba37d3c2e07/httpd-52c396d2-b47f-45a5-bb76-d10aa864e6d5 > 15/10/19 11:55:25 INFO HttpServer: Starting HTTP Server > 15/10/19 11:55:25 INFO Utils: Successfully started service 'HTTP file server' > on port 47915. > 15/10/19 11:55:25 INFO SparkEnv: Registering OutputCommitCoordinator > 15/10/19 11:55:25 INFO Utils: Successfully started service 'SparkUI' on port > 4040. > 15/10/19 11:55:25 INFO SparkUI: Started SparkUI at http://10.0.1.97:4040 > 15/10/19 11:55:25 INFO SparkContext: Added JAR > file:/home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar at > http://10.0.1.97:47915/jars/repro-assembly-0.0.1.jar with timestamp > 1445280925969 > 15/10/19 11:55:26 INFO Executor: Starting executor ID driver on host localhost > 15/10/19 11:55:26 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46569. > 15/10/19 11:55:26 INFO NettyBlockTransferService: Server created on 46569 > 15/10/19 11:55:26 INFO BlockManagerMaster: Trying to register BlockManager > 15/10/19 11:55:26 INFO BlockManagerMasterEndpoint: Registering block manager > localhost:46569 with 265.4 MB RAM, BlockManagerId(driver, localhost, 46569) > 15/10/19 11:55:26 INFO BlockManagerMaster: Registered BlockManager > 15/10/19 11:55:27 INFO SparkContext: Starting job: collect at repro.scala:18 > 15/10/19 11:55:27 INFO DAGScheduler: Got job 0 (collect at repro.scala:18) > with 1 output partitions (allowLocal=false) > 15/10/19 11:55:27 INFO DAGScheduler: Final stage: ResultStage 0(collect at > repro.scala:18) > 15/10/19 11:55:27 INFO DAGScheduler: Parents of f
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952444#comment-14952444 ] Cody Koeninger commented on SPARK-11045: Direct stream partitions are 1:1 with kafka topicpartitions. Thus the ordering semantics are pretty much what you'd expect from Kafka. You have to proactively do something to change that, like a shuffle, or concurrent jobs, or speculative execution. As far as failure semantics go, clearly if you don't have exactly once you're going to either lose or repeat messages. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952373#comment-14952373 ] Cody Koeninger commented on SPARK-11045: What you're saying doesn't address my point regarding parallelism. It's certainly possible to take advantage of Kafka ordering guarantees without sticking everything in one partition. E.g. I want events for a given customer to have a defined ordering, so I hash partition my topic by customer id. As a kafka user, I'd expect those messages to be consumed in the correct order, not some unpredictable order, unless I specifically ask for it by shuffling. Even in the case of 1 topicpartition, your suggestions regarding parallelism would result in broken semantics. As soon as you split a given kafka topicpartition into multiple spark partitions for a given batch, you allow for messages from a single topicpartition to be processed simultaneously, and thus out of order. Saying your approach gives "much better parallelism" because it's semantically broken by default isn't much of a feature claim. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka cons
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952299#comment-14952299 ] Cody Koeninger commented on SPARK-11045: "in Receiver based model, the number of partitions is dictated by Block Intervals and Batch Interval. If your block interval is 200 Ms, and Batch interval is 10 seconds , your RDD will have 50 partitions !" Note that if your receiver actually works this way, then you've broken Kafka per-partition ordering guarantees, which is kind of a big problem. "Regarding the state of spark-packages code, your comment is not at good taste" Don't misunderstand me, I am not attacking you personally. A +1 on merging code into spark is basically saying "I'd be ok with maintaining this if it was added to spark". I would not be comfortable doing that (to the extent my opinion counts). That has nothing to do with which companies are using the code in production. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Rec
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14951877#comment-14951877 ] Cody Koeninger commented on SPARK-11045: The comments regarding parallelism are not accurate. Your read parallelism from Kafka is ultimately limited by number of Kafka partitions regardless of which consumer you use. Spark checkpoint recovery is a problem, again regardless of what consumer you use. Zookeeper as an offset store also has its own problems. At least the direct stream allows you to choose what works for you. It seems like the main substantive complaint is rebalance behavior of the high level consumer. Frankly, given the state of the spark packages code the last time I looked at it, I'd rather effort be spent on addressing problems with the existing receiver rather than incorporating the spark packages code as is into spark. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent b
[jira] [Created] (SPARK-10963) Make KafkaCluster api public
Cody Koeninger created SPARK-10963: -- Summary: Make KafkaCluster api public Key: SPARK-10963 URL: https://issues.apache.org/jira/browse/SPARK-10963 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Cody Koeninger Priority: Minor per mailing list discussion, theres enough interest in people using KafkaCluster (e.g. to access latest offsets) to justify making it public -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
[ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945091#comment-14945091 ] Cody Koeninger commented on SPARK-5569: --- The gist I originally posted, linked at the top of this ticket, has a 3 line change that should reproduce the issue. > Checkpoints cannot reference classes defined outside of Spark's assembly > > > Key: SPARK-5569 > URL: https://issues.apache.org/jira/browse/SPARK-5569 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Patrick Wendell > > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior. > First documented by Cody Koeninger: > https://gist.github.com/koeninger/561a61482cd1b5b3600c > {code} > 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from > file file:/var/tmp/cp/checkpoint-142110041.bk > 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file > file:/var/tmp/cp/checkpoint-142110041.bk > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.rdd.kafka.KafkaRDDPartition > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.WrappedArray.fo
[jira] [Commented] (SPARK-9472) Consistent hadoop config for streaming
[ https://issues.apache.org/jira/browse/SPARK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14937483#comment-14937483 ] Cody Koeninger commented on SPARK-9472: --- I'd suggest patching your distribution then, I doubt there's going to be much interest in officially backporting it On Wed, Sep 30, 2015 at 11:27 AM, Russell Alexander Spitzer (JIRA) < > Consistent hadoop config for streaming > -- > > Key: SPARK-9472 > URL: https://issues.apache.org/jira/browse/SPARK-9472 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Cody Koeninger >Assignee: Cody Koeninger >Priority: Minor > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9472) Consistent hadoop config for streaming
[ https://issues.apache.org/jira/browse/SPARK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14936866#comment-14936866 ] Cody Koeninger commented on SPARK-9472: --- You can get around this by passing in the hadoop context that you want as an argument to getOrCreate StreamingContext.getOrCreate(somePath, () => someFunction, SparkHadoopUtil .get.newConfiguration(someSparkConf) On Tue, Sep 29, 2015 at 10:00 PM, Russell Alexander Spitzer (JIRA) < > Consistent hadoop config for streaming > -- > > Key: SPARK-9472 > URL: https://issues.apache.org/jira/browse/SPARK-9472 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Cody Koeninger >Assignee: Cody Koeninger >Priority: Minor > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-10732) Starting spark streaming from a specific point in time.
[ https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903231#comment-14903231 ] Cody Koeninger edited comment on SPARK-10732 at 9/22/15 7:02 PM: - Yeah, even if that gets implemented it will likely be at 1 minute granularity, which might be ok for some failure recovery situations but is unlikely to work for your SPARK-10734 ticket was (Author: c...@koeninger.org): Yeah, even if that gets implemented it will likely be at 1 minute granularity, which might be ok for some failure recovery situations but is unlikely to work for your SPARK-1074 ticket > Starting spark streaming from a specific point in time. > --- > > Key: SPARK-10732 > URL: https://issues.apache.org/jira/browse/SPARK-10732 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Bijay Singh Bisht > > Currently, spark streaming either starts from current time or from the latest > checkpoint. It would be extremely useful to start from any arbitrary point. > This would be useful in replay scenarios or in running regression tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.
[ https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903231#comment-14903231 ] Cody Koeninger commented on SPARK-10732: Yeah, even if that gets implemented it will likely be at 1 minute granularity, which might be ok for some failure recovery situations but is unlikely to work for your SPARK-1074 ticket > Starting spark streaming from a specific point in time. > --- > > Key: SPARK-10732 > URL: https://issues.apache.org/jira/browse/SPARK-10732 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Bijay Singh Bisht > > Currently, spark streaming either starts from current time or from the latest > checkpoint. It would be extremely useful to start from any arbitrary point. > This would be useful in replay scenarios or in running regression tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.
[ https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903110#comment-14903110 ] Cody Koeninger commented on SPARK-10732: As I already said, kafka's implementation of getOffsetsBefore is based on log file timestamps, not a granular index, so it really isn't accurate enough for this. > Starting spark streaming from a specific point in time. > --- > > Key: SPARK-10732 > URL: https://issues.apache.org/jira/browse/SPARK-10732 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Bijay Singh Bisht > > Currently, spark streaming either starts from current time or from the latest > checkpoint. It would be extremely useful to start from any arbitrary point. > This would be useful in replay scenarios or in running regression tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10734) DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest offset, however using the batch time would be more desireable.
[ https://issues.apache.org/jira/browse/SPARK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903106#comment-14903106 ] Cody Koeninger commented on SPARK-10734: as I explained in SPARK-10732 , kafka's getOffsetsBefore api is limited to the timestamps on log file segments, so its granularity is quite poor and doesn't really behave as one might expect. > DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest > offset, however using the batch time would be more desireable. > --- > > Key: SPARK-10734 > URL: https://issues.apache.org/jira/browse/SPARK-10734 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Reporter: Bijay Singh Bisht > > DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest > offset, however since OffsetRequest.LatestTime is a relative thing, its > depends on when the batch is scheduled. One would imagine that given an input > data set the data in the batches should be predictable, irrespective of the > system conditions. Using the batch time implies that the stream processing > will have the same batches irrespective of whether when the processing was > started and the load conditions on the system. > This along with [SPARK-10732] provides for a nice regression scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
[ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14902692#comment-14902692 ] Cody Koeninger commented on SPARK-5569: --- I'm still not clear on what you're doing in "captureOffsetRanges(..)" that is leading to the exception you first mentioned. But regarding the question of whether transform / foreachRDD etc runs on the driver or the executor, http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd should make it pretty clear which portions run where On Mon, Sep 21, 2015 at 6:29 PM, Jon Buffington (JIRA) > Checkpoints cannot reference classes defined outside of Spark's assembly > > > Key: SPARK-5569 > URL: https://issues.apache.org/jira/browse/SPARK-5569 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Patrick Wendell > > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior. > First documented by Cody Koeninger: > https://gist.github.com/koeninger/561a61482cd1b5b3600c > {code} > 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from > file file:/var/tmp/cp/checkpoint-142110041.bk > 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file > file:/var/tmp/cp/checkpoint-142110041.bk > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.rdd.kafka.KafkaRDDPartition > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.
[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.
[ https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14902648#comment-14902648 ] Cody Koeninger commented on SPARK-10732: It doesn't make sense to decouple this from the implementation. Not all stream implementations have the ability to replay starting at a given point. In order to actually do this for Kafka, you need an index from time to offset. Spark has no built in storage that would be appropriate for maintaining this index. Kafka has a time-based api, but it is based on log file timestamps, not a granular index, so is totally inaccurate for most use cases. For now, you'll need to keep the index yourself, and then provide the corresponding starting offsets as Sean mentioned. > Starting spark streaming from a specific point in time. > --- > > Key: SPARK-10732 > URL: https://issues.apache.org/jira/browse/SPARK-10732 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Bijay Singh Bisht > > Currently, spark streaming either starts from current time or from the latest > checkpoint. It would be extremely useful to start from any arbitrary point. > This would be useful in replay scenarios or in running regression tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
[ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901435#comment-14901435 ] Cody Koeninger commented on SPARK-5569: --- Yeah, spark-streaming can be marked as provided. Try to get a small reproducible case that demonstrates the issue. On Mon, Sep 21, 2015 at 3:58 PM, Jon Buffington (JIRA) > Checkpoints cannot reference classes defined outside of Spark's assembly > > > Key: SPARK-5569 > URL: https://issues.apache.org/jira/browse/SPARK-5569 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Patrick Wendell > > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior. > First documented by Cody Koeninger: > https://gist.github.com/koeninger/561a61482cd1b5b3600c > {code} > 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from > file file:/var/tmp/cp/checkpoint-142110041.bk > 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file > file:/var/tmp/cp/checkpoint-142110041.bk > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.rdd.kafka.KafkaRDDPartition > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
[ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901364#comment-14901364 ] Cody Koeninger commented on SPARK-5569: --- You can typically use sbt's merge strategy to deal with that kind of thing example: https://github.com/databricks/learning-spark/blob/master/build.sbt On Mon, Sep 21, 2015 at 3:23 PM, Jon Buffington (JIRA) > Checkpoints cannot reference classes defined outside of Spark's assembly > > > Key: SPARK-5569 > URL: https://issues.apache.org/jira/browse/SPARK-5569 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Patrick Wendell > > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior. > First documented by Cody Koeninger: > https://gist.github.com/koeninger/561a61482cd1b5b3600c > {code} > 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from > file file:/var/tmp/cp/checkpoint-142110041.bk > 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file > file:/var/tmp/cp/checkpoint-142110041.bk > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.rdd.kafka.KafkaRDDPartition > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239) > at > scala.collection.IndexedSeqOptimized$class.
[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly
[ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901106#comment-14901106 ] Cody Koeninger commented on SPARK-5569: --- The direct stream doesn't save offset ranges, it saves tuples and converts them to / from offset ranges. That workaround has been in place since the original release. You're probably running into something else, can you make a minimal reproducible code example? > Checkpoints cannot reference classes defined outside of Spark's assembly > > > Key: SPARK-5569 > URL: https://issues.apache.org/jira/browse/SPARK-5569 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Patrick Wendell > > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior. > First documented by Cody Koeninger: > https://gist.github.com/koeninger/561a61482cd1b5b3600c > {code} > 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from > file file:/var/tmp/cp/checkpoint-142110041.bk > 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file > file:/var/tmp/cp/checkpoint-142110041.bk > java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.rdd.kafka.KafkaRDDPartition > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) > at > org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251) > at > org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239) >
[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14790843#comment-14790843 ] Cody Koeninger commented on SPARK-10320: I don't think there's much benefit to multiple dstreams with the direct api, because it's straightforward to filter or match on the topic on a per-partition basis. I'm not sure that adding entirely new dstreams after the streaming context has been started makes sense. As far as defaults go... I don't see a clearly reasonable default like messageHandler has. Maybe an example implementation of a function that maintains just a list of topic names and handles the offset lookups. The other thing is, in order to get much use out of this, the api for communicating with the kafka cluster would need to be made public, and there had been some reluctance on that point previously. [~tdas] Any thoughts on making the KafkaCluster api public? > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726076#comment-14726076 ] Cody Koeninger commented on SPARK-10320: You would supply a function, similar to the way createDirectStream currently takes a messageHandler: MessageAndMetadata[K, V] => R The type of that function would be (Time, Map[TopicAndPartition, Long], Map[TopicAndPartition, LeaderOffset]) => (Map[TopicAndPartition, Long, Map[TopicAndPartition, LeaderOffset]) in other words (time, fromOffsets, untilOffsets) => (fromOffsets, untilOffsets) Your function would be called in the compute() method of the dstream, after contacting the leaders and before making the rdd for the next batch. That would let you make arbitrary modifications to the topics / partitions / offsets. As far as the desire for a general solution, I think this is a kafka-specific concern. Not all streams have topics. > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725719#comment-14725719 ] Cody Koeninger commented on SPARK-10320: So if you're changing topics in an event handler, it's almost certainly not the same thread the spark context was started from... At any rate, do you think a callback along the lines of what I described would be sufficient for your use case? > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724113#comment-14724113 ] Cody Koeninger commented on SPARK-10320: It's possible this might be solvable with a user-supplied callback of the form (Map[TopicAndPartition, LeaderOffset]) => Map[TopicAndPartition, LeaderOffset] or maybe (Time, Map[TopicAndPartition, LeaderOffset]) => Map[TopicAndPartition, LeaderOffset] that got called in the compute method of DelayedKafkaInputDStream. That would avoid threading issues, and also allow for more-or-less arbitrary modification of the topics and offsets, including some use cases that people have to subclass for currently. Actually, that just handles the ending offsets of the batch, so it'd need to be a pair of maps, one for the beginning and the other for the ending. But the basic idea remains. > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger updated SPARK-10320: --- Summary: Kafka Support new topic subscriptions without requiring restart of the streaming context (was: Support new topic subscriptions without requiring restart of the streaming context) > Kafka Support new topic subscriptions without requiring restart of the > streaming context > > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724074#comment-14724074 ] Cody Koeninger commented on SPARK-10320: " If you restart the job and specify a new offset, that is where consumption should start, in effect overriding any saved offsets." That's not the way checkpoints work. You're either restarting from a checkpoint, or you're not restarting from a checkpoint, the decision is up to you. If you want to specify a new offset, start the job clean. "The topic changes happen in the same thread of execution where the initial list of topics was provided before starting the streaming context." Can you say a little more about what you're actually doing here? How do you know when topics need to be modified? Typically streaming jobs just call ssc.awaitTermination in their main thread, which seems incompatible with what you're describing. > Support new topic subscriptions without requiring restart of the streaming > context > -- > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10320) Support new topic subscriptions without requiring restart of the streaming context
[ https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14717342#comment-14717342 ] Cody Koeninger commented on SPARK-10320: As I said on the list, the best way to deal with this currently is start a new app with your new code, before stopping the old app. In terms of a potential feature addition, I think there are a number of questions that would need to be cleared up... e.g. - when would you change topics? During a streaming listener onbatch completed handler? From a separate thread? - when adding a topic, what would the expectations around starting offset be? As in the current api, provide explicit offsets per partition, start at beginning, or start at end? - if you add partitions for topics that currently exist, and specify a starting offset that's different from where the job is currently, what would the expectation be? - if you add, later remove, then later re-add a topic, what would the expectation regarding saved checkpoints be? > Support new topic subscriptions without requiring restart of the streaming > context > -- > > Key: SPARK-10320 > URL: https://issues.apache.org/jira/browse/SPARK-10320 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Sudarshan Kadambi > > Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe > to current ones once the streaming context has been started. Restarting the > streaming context increases the latency of update handling. > Consider a streaming application subscribed to n topics. Let's say 1 of the > topics is no longer needed in streaming analytics and hence should be > dropped. We could do this by stopping the streaming context, removing that > topic from the topic list and restarting the streaming context. Since with > some DStreams such as DirectKafkaStream, the per-partition offsets are > maintained by Spark, we should be able to resume uninterrupted (I think?) > from where we left off with a minor delay. However, in instances where > expensive state initialization (from an external datastore) may be needed for > datasets published to all topics, before streaming updates can be applied to > it, it is more convenient to only subscribe or unsubcribe to the incremental > changes to the topic list. Without such a feature, updates go unprocessed for > longer than they need to be, thus affecting QoS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697188#comment-14697188 ] Cody Koeninger commented on SPARK-6249: --- If you want an api that has imprecise semantics and stores stuff in ZK, use the receiver based stream. This ticket has been closed for a while, I'd suggest further discussion would be better suited for the mailing list. > Get Kafka offsets from consumer group in ZK when using direct stream > > > Key: SPARK-6249 > URL: https://issues.apache.org/jira/browse/SPARK-6249 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Tathagata Das > > This is the proposal. > The simpler direct API (the one that does not take explicit offsets) can be > modified to also pick up the initial offset from ZK if group.id is specified. > This is exactly similar to how we find the latest or earliest offset in that > API, just that instead of latest/earliest offset of the topic we want to find > the offset from the consumer group. The group offsets is ZK is not used at > all for any further processing and restarting, so the exactly-once semantics > is not broken. > The use case where this is useful is simplified code upgrade. If the user > wants to upgrade the code, he/she can the context stop gracefully which will > ensure the ZK consumer group offset will be updated with the last offsets > processed. Then the new code is started (not restarted from checkpoint) can > pickup the consumer group offset from ZK and continue where the previous > code had left off. > Without the functionality of picking up consumer group offsets to start (that > is, currently) the only way to do this is for the users to save the offsets > somewhere (file, database, etc.) and manage the offsets themselves. I just > want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697193#comment-14697193 ] Cody Koeninger commented on SPARK-9947: --- Didn't you already say that you were saving updateStateByKey state yourself? > Separate Metadata and State Checkpoint Data > --- > > Key: SPARK-9947 > URL: https://issues.apache.org/jira/browse/SPARK-9947 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow > Original Estimate: 168h > Remaining Estimate: 168h > > Problem: When updating an application that has checkpointing enabled to > support the updateStateByKey and 24/7 operation functionality, you encounter > the problem where you might like to maintain state data between restarts but > delete the metadata containing execution state. > If checkpoint data exists between code redeployment, the program may not > execute properly or at all. My current workaround for this issue is to wrap > updateStateByKey with my own function that persists the state after every > update to my own separate directory. (That allows me to delete the checkpoint > with its metadata before redeploying) Then, when I restart the application, I > initialize the state with this persisted data. This incurs additional > overhead due to persisting of the same data twice: once in the checkpoint and > once in my persisted data folder. > If Kafka Direct API offsets could be stored in another separate checkpoint > directory, that would help address the problem of having to blow that away > between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697166#comment-14697166 ] Cody Koeninger commented on SPARK-9947: --- You can't re-use checkpoint data across application upgrades anyway, so that honestly seems kind of pointless. > Separate Metadata and State Checkpoint Data > --- > > Key: SPARK-9947 > URL: https://issues.apache.org/jira/browse/SPARK-9947 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow > Original Estimate: 168h > Remaining Estimate: 168h > > Problem: When updating an application that has checkpointing enabled to > support the updateStateByKey and 24/7 operation functionality, you encounter > the problem where you might like to maintain state data between restarts but > delete the metadata containing execution state. > If checkpoint data exists between code redeployment, the program may not > execute properly or at all. My current workaround for this issue is to wrap > updateStateByKey with my own function that persists the state after every > update to my own separate directory. (That allows me to delete the checkpoint > with its metadata before redeploying) Then, when I restart the application, I > initialize the state with this persisted data. This incurs additional > overhead due to persisting of the same data twice: once in the checkpoint and > once in my persisted data folder. > If Kafka Direct API offsets could be stored in another separate checkpoint > directory, that would help address the problem of having to blow that away > between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697098#comment-14697098 ] Cody Koeninger commented on SPARK-9947: --- You already have access to offsets and can save them however you want. You can provide those offsets on restart, regardless of whether checkpointing was enabled. > Separate Metadata and State Checkpoint Data > --- > > Key: SPARK-9947 > URL: https://issues.apache.org/jira/browse/SPARK-9947 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow > Original Estimate: 168h > Remaining Estimate: 168h > > Problem: When updating an application that has checkpointing enabled to > support the updateStateByKey and 24/7 operation functionality, you encounter > the problem where you might like to maintain state data between restarts but > delete the metadata containing execution state. > If checkpoint data exists between code redeployment, the program may not > execute properly or at all. My current workaround for this issue is to wrap > updateStateByKey with my own function that persists the state after every > update to my own separate directory. (That allows me to delete the checkpoint > with its metadata before redeploying) Then, when I restart the application, I > initialize the state with this persisted data. This incurs additional > overhead due to persisting of the same data twice: once in the checkpoint and > once in my persisted data folder. > If Kafka Direct API offsets could be stored in another separate checkpoint > directory, that would help address the problem of having to blow that away > between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695939#comment-14695939 ] Cody Koeninger commented on SPARK-6249: --- Regarding streaming stats, those should be available in the current release > Get Kafka offsets from consumer group in ZK when using direct stream > > > Key: SPARK-6249 > URL: https://issues.apache.org/jira/browse/SPARK-6249 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Tathagata Das > > This is the proposal. > The simpler direct API (the one that does not take explicit offsets) can be > modified to also pick up the initial offset from ZK if group.id is specified. > This is exactly similar to how we find the latest or earliest offset in that > API, just that instead of latest/earliest offset of the topic we want to find > the offset from the consumer group. The group offsets is ZK is not used at > all for any further processing and restarting, so the exactly-once semantics > is not broken. > The use case where this is useful is simplified code upgrade. If the user > wants to upgrade the code, he/she can the context stop gracefully which will > ensure the ZK consumer group offset will be updated with the last offsets > processed. Then the new code is started (not restarted from checkpoint) can > pickup the consumer group offset from ZK and continue where the previous > code had left off. > Without the functionality of picking up consumer group offsets to start (that > is, currently) the only way to do this is for the users to save the offsets > somewhere (file, database, etc.) and manage the offsets themselves. I just > want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695929#comment-14695929 ] Cody Koeninger commented on SPARK-6249: --- https://github.com/koeninger/kafka-exactly-once If you've tried the code, read the blog post, and watched the presentation linked from that repo, and still have specific questions, feel free to ask. > Get Kafka offsets from consumer group in ZK when using direct stream > > > Key: SPARK-6249 > URL: https://issues.apache.org/jira/browse/SPARK-6249 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Tathagata Das > > This is the proposal. > The simpler direct API (the one that does not take explicit offsets) can be > modified to also pick up the initial offset from ZK if group.id is specified. > This is exactly similar to how we find the latest or earliest offset in that > API, just that instead of latest/earliest offset of the topic we want to find > the offset from the consumer group. The group offsets is ZK is not used at > all for any further processing and restarting, so the exactly-once semantics > is not broken. > The use case where this is useful is simplified code upgrade. If the user > wants to upgrade the code, he/she can the context stop gracefully which will > ensure the ZK consumer group offset will be updated with the last offsets > processed. Then the new code is started (not restarted from checkpoint) can > pickup the consumer group offset from ZK and continue where the previous > code had left off. > Without the functionality of picking up consumer group offsets to start (that > is, currently) the only way to do this is for the users to save the offsets > somewhere (file, database, etc.) and manage the offsets themselves. I just > want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9780) In case of invalid initialization of KafkaDirectStream, NPE is thrown
[ https://issues.apache.org/jira/browse/SPARK-9780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14681886#comment-14681886 ] Cody Koeninger commented on SPARK-9780: --- Makes sense, traveling currently but I'll put in a PR > In case of invalid initialization of KafkaDirectStream, NPE is thrown > - > > Key: SPARK-9780 > URL: https://issues.apache.org/jira/browse/SPARK-9780 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.1, 1.4.1 >Reporter: Grigory Turunov >Priority: Minor > > [o.a.s.streaming.kafka.KafkaRDD.scala#L143|https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L143] > In initialization of KafkaRDDIterator, there is an addition of > TaskCompletionListener to the context, which calls close() to the consumer, > which is not initialized yet (and will be initialized 12 lines after that). > If something happens in this 12 lines (in my case there was a private > constructor for valueDecoder), an Exception, which is thrown, triggers > context.markTaskCompleted() in > [o.a.s.scheduler.Task.scala#L90|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L90] > which throws NullPointerException, when tries to call close() for > non-initialized consumer. > This masks original exception - so it is very hard to understand, what is > happening. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9476) Kafka stream loses leader after 2h of operation
[ https://issues.apache.org/jira/browse/SPARK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14679380#comment-14679380 ] Cody Koeninger commented on SPARK-9476: --- The stacktraces you posted don't contain any Spark code, they look like they're all from Kafka producer code. I assume it's from the part of the job where you're feeding results back to Kafka. I'd like to help on general principles, but without a stacktrace that involves Spark or a minimal reproducible example without your proprietary code, there's not much I can do. I'd suggest trying to catch the Kafka exception when producing and adding more logging to see what's going on. > Kafka stream loses leader after 2h of operation > > > Key: SPARK-9476 > URL: https://issues.apache.org/jira/browse/SPARK-9476 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 > Environment: Docker, Centos, Spark standalone, core i7, 8Gb >Reporter: Ruben Ramalho > > This seems to happen every 2h, it happens both with the direct stream and > regular stream, I'm doing window operations over a 1h period (if that can > help). > Here's part of the error message: > 2015-07-30 13:27:23 WARN ClientUtils$:89 - Fetching topic metadata with > correlation id 10 for topics [Set(updates)] from broker > [id:0,host:192.168.3.23,port:3000] failed > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > 2015-07-30 13:27:23 INFO SyncProducer:68 - Disconnecting from > 192.168.3.23:3000 > 2015-07-30 13:27:23 WARN ConsumerFetcherManager$LeaderFinderThread:89 - > [spark-group_81563e123e9f-1438259236988-fc3d82bf-leader-finder-thread], > Failed to find leader for Set([updates,0]) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(oversight-updates)] from broker > [ArrayBuffer(id:0,host:192.168.3.23,port:3000)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > After the crash I tried to communicate with kafka with a simple scala > consumer and producer and have no problem at all. Spark tough needs a kafka > container restart to start normal operaiton. There are no errors on the kafka > log, apart from an improper closed connection. > I have been trying to solve this problem for days, I suspect this has > something to do with spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9476) Kafka stream loses leader after 2h of operation
[ https://issues.apache.org/jira/browse/SPARK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14649123#comment-14649123 ] Cody Koeninger commented on SPARK-9476: --- Those all look like info or warn level log lines, are there error level lines? It'd be helpful to have the relevant code. Have you tried running a simpler job in the same environment to see if it has similar issues? Also, what version of Kafka are you using on the brokers? > Kafka stream loses leader after 2h of operation > > > Key: SPARK-9476 > URL: https://issues.apache.org/jira/browse/SPARK-9476 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.1 > Environment: Docker, Centos, Spark standalone, core i7, 8Gb >Reporter: Ruben Ramalho > > This seems to happen every 2h, it happens both with the direct stream and > regular stream, I'm doing window operations over a 1h period (if that can > help). > Here's part of the error message: > 2015-07-30 13:27:23 WARN ClientUtils$:89 - Fetching topic metadata with > correlation id 10 for topics [Set(updates)] from broker > [id:0,host:192.168.3.23,port:3000] failed > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > 2015-07-30 13:27:23 INFO SyncProducer:68 - Disconnecting from > 192.168.3.23:3000 > 2015-07-30 13:27:23 WARN ConsumerFetcherManager$LeaderFinderThread:89 - > [spark-group_81563e123e9f-1438259236988-fc3d82bf-leader-finder-thread], > Failed to find leader for Set([updates,0]) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(oversight-updates)] from broker > [ArrayBuffer(id:0,host:192.168.3.23,port:3000)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > After the crash I tried to communicate with kafka with a simple scala > consumer and producer and have no problem at all. Spark tough needs a kafka > container restart to start normal operaiton. There are no errors on the kafka > log, apart from an improper closed connection. > I have been trying to solve this problem for days, I suspect this has > something to do with spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9475) Consistent hadoop config for external/*
Cody Koeninger created SPARK-9475: - Summary: Consistent hadoop config for external/* Key: SPARK-9475 URL: https://issues.apache.org/jira/browse/SPARK-9475 Project: Spark Issue Type: Sub-task Reporter: Cody Koeninger Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9473) Consistent hadoop config for SQL
Cody Koeninger created SPARK-9473: - Summary: Consistent hadoop config for SQL Key: SPARK-9473 URL: https://issues.apache.org/jira/browse/SPARK-9473 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Cody Koeninger -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9474) Consistent hadoop config for core
Cody Koeninger created SPARK-9474: - Summary: Consistent hadoop config for core Key: SPARK-9474 URL: https://issues.apache.org/jira/browse/SPARK-9474 Project: Spark Issue Type: Sub-task Components: Spark Core Reporter: Cody Koeninger -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9472) Consistent hadoop config for streaming
Cody Koeninger created SPARK-9472: - Summary: Consistent hadoop config for streaming Key: SPARK-9472 URL: https://issues.apache.org/jira/browse/SPARK-9472 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Cody Koeninger Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14646113#comment-14646113 ] Cody Koeninger commented on SPARK-9434: --- If you want to submit a PR with doc changes to add another link between the two, go for it. > Need how-to for resuming direct Kafka streaming consumers where they had left > off before getting terminated, OR actual support for that mode in the > Streaming API > - > > Key: SPARK-9434 > URL: https://issues.apache.org/jira/browse/SPARK-9434 > Project: Spark > Issue Type: Improvement > Components: Documentation, Examples, Streaming >Affects Versions: 1.4.1 >Reporter: Dmitry Goldenberg > > We've been getting some mixed information regarding how to cause our direct > streaming consumers to resume processing from where they left off in terms of > the Kafka offsets. > On the one hand side, we're hearing "If you are restarting the streaming app > with Direct kafka from the checkpoint information (that is, restarting), then > the last read offsets are automatically recovered, and the data will start > processing from that offset. All the N records added in T will stay buffered > in Kafka." (where T is the interval of time during which the consumer was > down). > On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which > are marked as "won't fix" which seem to ask for the functionality we need, > with comments like "I don't want to add more config options with confusing > semantics around what is being used for the system of record for offsets, I'd > rather make it easy for people to explicitly do what they need." > The use-case is actually very clear and doesn't ask for confusing semantics. > An API option to resume reading where you left off, in addition to the > smallest or greatest auto.offset.reset should be *very* useful, probably for > quite a few folks. > We're asking for this as an enhancement request. SPARK-8833 states " I am > waiting for getting enough usecase to float in before I take a final call." > We're adding to that. > In the meantime, can you clarify the confusion? Does direct streaming > persist the progress information into "DStream checkpoints" or does it not? > If it does, why is it that we're not seeing that happen? Our consumers start > with auto.offset.reset=greatest and that causes them to read from the first > offset of data that is written to Kafka *after* the consumer has been > restarted, meaning we're missing data that had come in while the consumer was > down. > If the progress is stored in "DStream checkpoints", we want to know a) how to > cause that to work for us and b) where the said checkpointing data is stored > physically. > Conversely, if this is not accurate, then is our only choice to manually > persist the offsets into Zookeeper? If that is the case then a) we'd like a > clear, more complete code sample to be published, since the one in the Kafka > streaming guide is incomplete (it lacks the actual lines of code persisting > the offsets) and b) we'd like to request that SPARK-8833 be revisited as a > feature worth implementing in the API. > Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14646043#comment-14646043 ] Cody Koeninger commented on SPARK-9434: --- Dmitry, the nabble link you posted indicates your message was never accepted by the mailing list. Looking through my email archives, there are only 3 discussions related to Kafka that you successfully sent to the mailing list, and all of them got answers. As sean said, I think that's a better forum for questions. To try and address what I think your question is... no, if you don't turn on checkpointing, you won't have any offsets saved in a checkpoint. Checkpointing isn't necessary for every job, so it is not turned on by default. The docs for KafkaUtils already explicitly say "To recover from driver failures, you have to enable checkpointing", with a link to the guide on how to do that. The closed tickets you're linking to relate to a desire for some kind of automated committing to, and/or recovery from, Zookeeper based offsets. As I stated in those tickets, I think the correct way to make this easier for people is just to expose an easier api for interacting with ZK. That already exists in the code, it's just private. > Need how-to for resuming direct Kafka streaming consumers where they had left > off before getting terminated, OR actual support for that mode in the > Streaming API > - > > Key: SPARK-9434 > URL: https://issues.apache.org/jira/browse/SPARK-9434 > Project: Spark > Issue Type: Improvement > Components: Documentation, Examples, Streaming >Affects Versions: 1.4.1 >Reporter: Dmitry Goldenberg > > We've been getting some mixed information regarding how to cause our direct > streaming consumers to resume processing from where they left off in terms of > the Kafka offsets. > On the one hand side, we're hearing "If you are restarting the streaming app > with Direct kafka from the checkpoint information (that is, restarting), then > the last read offsets are automatically recovered, and the data will start > processing from that offset. All the N records added in T will stay buffered > in Kafka." (where T is the interval of time during which the consumer was > down). > On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which > are marked as "won't fix" which seem to ask for the functionality we need, > with comments like "I don't want to add more config options with confusing > semantics around what is being used for the system of record for offsets, I'd > rather make it easy for people to explicitly do what they need." > The use-case is actually very clear and doesn't ask for confusing semantics. > An API option to resume reading where you left off, in addition to the > smallest or greatest auto.offset.reset should be *very* useful, probably for > quite a few folks. > We're asking for this as an enhancement request. SPARK-8833 states " I am > waiting for getting enough usecase to float in before I take a final call." > We're adding to that. > In the meantime, can you clarify the confusion? Does direct streaming > persist the progress information into "DStream checkpoints" or does it not? > If it does, why is it that we're not seeing that happen? Our consumers start > with auto.offset.reset=greatest and that causes them to read from the first > offset of data that is written to Kafka *after* the consumer has been > restarted, meaning we're missing data that had come in while the consumer was > down. > If the progress is stored in "DStream checkpoints", we want to know a) how to > cause that to work for us and b) where the said checkpointing data is stored > physically. > Conversely, if this is not accurate, then is our only choice to manually > persist the offsets into Zookeeper? If that is the case then a) we'd like a > clear, more complete code sample to be published, since the one in the Kafka > streaming guide is incomplete (it lacks the actual lines of code persisting > the offsets) and b) we'd like to request that SPARK-8833 be revisited as a > feature worth implementing in the API. > Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9215) Implement WAL-free Kinesis receiver that give at-least once guarantee
[ https://issues.apache.org/jira/browse/SPARK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14635179#comment-14635179 ] Cody Koeninger commented on SPARK-9215: --- I left some comments in the doc. Is Chris Fregly looking at this as well? > Implement WAL-free Kinesis receiver that give at-least once guarantee > - > > Key: SPARK-9215 > URL: https://issues.apache.org/jira/browse/SPARK-9215 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.1 >Reporter: Tathagata Das >Assignee: Tathagata Das > > https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9059) Update Direct Kafka Word count examples to show the use of HasOffsetRanges
[ https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14632476#comment-14632476 ] Cody Koeninger commented on SPARK-9059: --- How is this different from SPARK-8390 ? I thought the idea there was to keep word count simple, and have separate examples for offsets. Restarting from specific offsets is a good idea, but requires storage. If we want to move the examples from my external repo https://github.com/koeninger/kafka-exactly-once into spark, it would probably require switching from postgres to an in memory database. > Update Direct Kafka Word count examples to show the use of HasOffsetRanges > -- > > Key: SPARK-9059 > URL: https://issues.apache.org/jira/browse/SPARK-9059 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Tathagata Das > Labels: starter > > Update Scala, Java and Python examples of Direct Kafka word count to access > the offset ranges using HasOffsetRanges and print it. For example in Scala, > > {code} > var offsetRanges: Array[OffsetRange] = _ > ... > directKafkaDStream.foreachRDD { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges] > } > ... > transformedDStream.foreachRDD { rdd => > // some operation > println("Processed ranges: " + offsetRanges) > } > {code} > See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for > more info, and the master source code for more updated information on python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8833) Kafka Direct API support offset in zookeeper
[ https://issues.apache.org/jira/browse/SPARK-8833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615056#comment-14615056 ] Cody Koeninger commented on SPARK-8833: --- this basic idea has been discussed before and rejected: https://issues.apache.org/jira/browse/SPARK-6249 > Kafka Direct API support offset in zookeeper > > > Key: SPARK-8833 > URL: https://issues.apache.org/jira/browse/SPARK-8833 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.0 >Reporter: guowei >Priority: Minor > > Kafka Direct API only support consume the topic from latest or earliest. > but user usually need to consume message from last offset when restart > stream app . -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python
[ https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14594196#comment-14594196 ] Cody Koeninger commented on SPARK-8389: --- Have you guys been looking at SPARK-8337 , the idea in there is to provide a method that returns a stream of python dicts with all of the (currently available) fields in MessageAndMetadata On Fri, Jun 19, 2015 at 7:29 PM, Tathagata Das (JIRA) > Expose KafkaRDDs offsetRange in Java and Python > --- > > Key: SPARK-8389 > URL: https://issues.apache.org/jira/browse/SPARK-8389 > Project: Spark > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Tathagata Das >Assignee: Cody Koeninger >Priority: Critical > > Probably requires creating a JavaKafkaPairRDD and also use that in the python > APIs -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python
[ https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591170#comment-14591170 ] Cody Koeninger commented on SPARK-8389: --- Static type doesn't really matter since it's typecast anyway, as long as you can follow the chain of wrapped object back to the original. The java api does a lot of wrapping too, but you can still unwrap it with rdd.rdd() as I mentioned. I'm not clear on why something comparable isn't possible for the python api. > Expose KafkaRDDs offsetRange in Java and Python > --- > > Key: SPARK-8389 > URL: https://issues.apache.org/jira/browse/SPARK-8389 > Project: Spark > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Tathagata Das >Assignee: Cody Koeninger >Priority: Critical > > Probably requires creating a JavaKafkaPairRDD and also use that in the python > APIs -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8390) Update DirectKafkaWordCount examples to show how offset ranges can be used
[ https://issues.apache.org/jira/browse/SPARK-8390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14590720#comment-14590720 ] Cody Koeninger commented on SPARK-8390: --- Did we actually want to update the wordcount examples (might confuse, since you dont need offset ranges for minimal wordcount usage)... or just fix the part of the docs about the offset ranges? PR is just fixing the docs for now. I'd personally prefer to link to the talk / slides about direct stream once its available... not sure how you feel about external links in the doc. > Update DirectKafkaWordCount examples to show how offset ranges can be used > -- > > Key: SPARK-8390 > URL: https://issues.apache.org/jira/browse/SPARK-8390 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Tathagata Das >Assignee: Cody Koeninger > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588517#comment-14588517 ] Cody Koeninger commented on SPARK-8337: --- So one thing to keep in mind is that if the Kafka project ends up adding more fields to MessageAndMetadata, the scala interface is going to continue to give users access to those fields, without changing anything other than the Kafka version. If you go with the approach of building a Python dict, someone's going to have to remember to go manually change the code to give access to the new fields. I don't have enough Python knowledge to comment on whether the approach of passing a messageHandler function is feasible... I can try to get up to speed on it. It may be worth trying to get the attention of Davies Liu after the spark conference hubub has died down. > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.4.0 >Reporter: Amit Ramesh >Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python
[ https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588404#comment-14588404 ] Cody Koeninger commented on SPARK-8389: --- So on the java side, just so I'm clear, are we talking about the difference between people writing OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); which, as far as I can tell, they can do currently (see attached PR with test change) versus OffsetRange[] offsets = ((HasOffsetRanges)rdd).offsetRanges(); I can see how the second is definitely a nicer api... but I don't know that it's a critical bugfix, and I also don't know that it's worth introducing additional JavaKafkaRDD and JavaDirectKafkaInputDStream wrappers. The typecast is kind of an ugly hack to begin with, there's only so much we can do to make it nicer... short of higher kinded return type parameters for rdd methods in Spark 2.0 :) > Expose KafkaRDDs offsetRange in Java and Python > --- > > Key: SPARK-8389 > URL: https://issues.apache.org/jira/browse/SPARK-8389 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Tathagata Das >Assignee: Cody Koeninger >Priority: Critical > > Probably requires creating a JavaKafkaPairRDD and also use that in the python > APIs -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588211#comment-14588211 ] Cody Koeninger commented on SPARK-7122: --- It's certainly your prerogative to wait for an official release. However, keep in mind that the patch in question is just a performance optimization, not necessarily a bug fix targeted at whatever your issue is. Without a minimal reproducible case of your problem, or testing patches against your workload, there's no way of knowing if the performance optimization solves your problem. If it doesn't, you're looking at waiting for yet another release after 1.4.1. > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python
[ https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588202#comment-14588202 ] Cody Koeninger commented on SPARK-8389: --- There's already a ticket for the Python side of things, SPARK-8337. Not sure if you want to combine them. I'll look at the java side of things to start. > Expose KafkaRDDs offsetRange in Java and Python > --- > > Key: SPARK-8389 > URL: https://issues.apache.org/jira/browse/SPARK-8389 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Tathagata Das >Assignee: Cody Koeninger >Priority: Critical > > Probably requires creating a JavaKafkaPairRDD and also use that in the python > APIs -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8127) KafkaRDD optimize count() take() isEmpty()
Cody Koeninger created SPARK-8127: - Summary: KafkaRDD optimize count() take() isEmpty() Key: SPARK-8127 URL: https://issues.apache.org/jira/browse/SPARK-8127 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Cody Koeninger Priority: Minor KafkaRDD can use offset range to avoid doing extra work Possibly related to SPARK-7122 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14572059#comment-14572059 ] Cody Koeninger commented on SPARK-7122: --- If you can try out this patch https://github.com/apache/spark/pull/6632 please do so > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571675#comment-14571675 ] Cody Koeninger commented on SPARK-7122: --- Try doing something very straightforward inside of foreachRDD (println, increment a counter, whatever), instead of the current code. You're calling isEmpty, which implies a take(1) Then saveJsonToEs is internally calling take(1) again at least once Take is going to successively schedule jobs, quadrupling the number of partitions used every time. It's possible that starts getting some pathological behavior on RDDs that are empty or nearly empty yet still have 400 partitions. I could pretty straightforwardly override the definition of isEmpty in KafkaRDD to just look at the offset ranges instead of doing any work. Take would be a little trickier but still doable. But let's figure out if that's the issue first. > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571454#comment-14571454 ] Cody Koeninger commented on SPARK-7122: --- Nicolas, do you have an actual compilable project you can share a link to? I'm not sure what EsSpark.saveJsonToEs is doing, for instance. Also keep in mind that for the KafkaUtils.createStream case you're using a storage level of MEMORY_AND_DISK, whereas the directStream isn't going to do any caching by default - it's not a receiver. > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569254#comment-14569254 ] Cody Koeninger commented on SPARK-7122: --- Am I reading this correctly that you're only taking the first 11 messages per rdd? If that's forcing the whole rdd in the direct stream case but not the receiver-based stream case, I can see why there'd be a difference. Other thing to investigate would be removing checkpointing, but I doubt that's the issue. On Tue, Jun 2, 2015 at 9:24 AM, Nicolas PHUNG (JIRA) > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569125#comment-14569125 ] Cody Koeninger commented on SPARK-7122: --- That number of tasks sounds normal for that many partitions, since the direct stream partitions are 1:1 with kafka partitions. Task overhead might explain a little bit of the difference, but I doubt it's 6 seconds worth. I'm not sure there's much else I can say without being able to compare the code for the two. On Tue, Jun 2, 2015 at 4:46 AM, Nicolas PHUNG (JIRA) > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567917#comment-14567917 ] Cody Koeninger commented on SPARK-7122: --- So if you're setting up a 2 second batch stream, how long do batches take to complete? On Mon, Jun 1, 2015 at 2:50 PM, Nicolas PHUNG (JIRA) > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567865#comment-14567865 ] Cody Koeninger commented on SPARK-7122: --- When you say it's getting behind, how are you measuring that? On Mon, Jun 1, 2015 at 2:33 PM, Nicolas PHUNG (JIRA) > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7827) kafka streaming NotLeaderForPartition Exception could not be handled normally
[ https://issues.apache.org/jira/browse/SPARK-7827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556631#comment-14556631 ] Cody Koeninger commented on SPARK-7827: --- As TD said, it looks like it did retry 4 times. You can set refresh.leader.backoff.ms in kafka params to affect the sleep duration before throwing the exception. Kafka default is 200ms. Can you tell how long Kafka took to rebalance? > kafka streaming NotLeaderForPartition Exception could not be handled normally > - > > Key: SPARK-7827 > URL: https://issues.apache.org/jira/browse/SPARK-7827 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.1 > Environment: spark 1.3.1, on yarn, hadoop 2.6.0 >Reporter: hotdog > Original Estimate: 120h > Remaining Estimate: 120h > > This is my Cluster 's log, once the partition's leader could not be found, > the total streaming task will fail... > org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in > stage 3549.0 failed 4 times, most recent failure: Lost task 11.3 in stage > 3549.0 (TID 385491, td-h85.cp): kafka.common.NotLeaderForPartitionException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at java.lang.Class.newInstance(Class.java:374) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > I read the source code of KafkaRDD.scala: > private def handleFetchErr(resp: FetchResponse) { > if (resp.hasError) { > val err = resp.errorCode(part.topic, part.partition) > if (err == ErrorMapping.LeaderNotAvailableCode || > err == ErrorMapping.NotLeaderForPartitionCode) { > log.error(s"Lost leader for topic ${part.topic} partition > ${part.partition}, " + > s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms") > Thread.sleep(kc.config.refreshLeaderBackoffMs) > } > // Let normal rdd retry
[jira] [Commented] (SPARK-7255) spark.streaming.kafka.maxRetries not documented
[ https://issues.apache.org/jira/browse/SPARK-7255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14520403#comment-14520403 ] Cody Koeninger commented on SPARK-7255: --- I think that should be fine to document, thanks for noticing. Although actually, it's the maximum number of times in a row that the driver will attempt to contact the leader, not the number of attempts after the first. Which means it' somewhat unfortunately named. :) > spark.streaming.kafka.maxRetries not documented > --- > > Key: SPARK-7255 > URL: https://issues.apache.org/jira/browse/SPARK-7255 > Project: Spark > Issue Type: Documentation > Components: Documentation, Streaming >Affects Versions: 1.3.1 >Reporter: Benjamin Fradet >Priority: Minor > Fix For: 1.4.0 > > > I noticed there was no documentation for > [spark.streaming.kafka.maxRetries|https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala#L66] > was not documented in the [configuration > pagehttp://spark.apache.org/docs/latest/configuration.html#spark-streaming]. > Is this on purpose? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load
[ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511092#comment-14511092 ] Cody Koeninger commented on SPARK-7122: --- Does this actually have anything to do with direct stream? I.e. is the behavior the same with the receiver based stream? > KafkaUtils.createDirectStream - unreasonable processing time in absence of > load > --- > > Key: SPARK-7122 > URL: https://issues.apache.org/jira/browse/SPARK-7122 > Project: Spark > Issue Type: Question > Components: Streaming >Affects Versions: 1.3.1 > Environment: Spark Streaming 1.3.1, standalone mode running on just 1 > box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40" >Reporter: Platon Potapov >Priority: Minor > Attachments: 10.second.window.fast.job.txt, > 5.second.window.slow.job.txt, SparkStreamingJob.scala > > > attached is the complete source code of a test spark job. no external data > generators are run - just the presence of a kafka topic named "raw" suffices. > the spark job is run with no load whatsoever. http://localhost:4040/streaming > is checked to obtain job processing duration. > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.window(Seconds(40), Seconds(5)) > abc.print() > {code} > the median processing time is 3 seconds 80 ms > * in case the test contains the following transformation: > {code} > // dummy transformation > val temperature = bytes.filter(_._1 == "abc") > val abc = temperature.map(x => (1, x)) > abc.print() > {code} > the median processing time is just 50 ms > please explain why does the "window" transformation introduce such a growth > of job duration? > note: the result is the same regardless of the number of kafka topic > partitions (I've tried 1 and 8) > note2: the result is the same regardless of the window parameters (I've tried > (20, 2) and (40, 5)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6431) Couldn't find leader offsets exception when creating KafkaDirectStream
[ https://issues.apache.org/jira/browse/SPARK-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14481266#comment-14481266 ] Cody Koeninger commented on SPARK-6431: --- I think this got mis-diagnosed on the mailing list, sorry for the confusion. The only way I've been able to reproduce that exception is by trying to start a stream for a topic that doesn't exist at all. Alberto, did you actually run kafka-topics.sh --create before starting the job, or in some other way create the topic? Pretty sure what happened here is that your topic didn't exist the first time you ran the job. Your brokers were set to auto-create topics, so it did exist the next time you ran the job. Putting a message into the topic didn't have anything to do with it. Here's why I think that's what happened. Following console session is an example, where "empty" topic existed prior to starting the console, but had no messages. Topic "hasonemesssage" existed and had one message in it. Topic "doesntexistyet" didn't exist at the beginning of the console. The metadata apis return the same info for existing-but-empty topics as they do for topics with messages in them: scala> kc.getPartitions(Set("empty")).right res0: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right( Set([empty,0], [empty,1]))) scala> kc.getPartitions(Set("hasonemessage")).right res1: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set([hasonemessage,0], [hasonemessage,1]))) Leader offsets are both 0 for the empty topic, as you'd expect: scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("empty")).right.get) res5: Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]] = Right(Map([empty,1] -> LeaderOffset(localhost,9094,0), [empty,0] -> LeaderOffset(localhost,9093,0))) And one of the leader offsets is 1 for the topic with one message: scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("hasonemessage")).right.get) res6: Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]] = Right(Map([hasonemessage,0] -> LeaderOffset(localhost,9092,1), [hasonemessage,1] -> LeaderOffset(localhost,9093,0))) The first time a metadata request is made against the non-existing topic, it returns empty: kc.getPartitions(Set("doesntexistyet")).right res2: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set())) But if your brokers are configured with auto.create.topics.enable set to true, that metadata request alone is enough to trigger creation of the topic. Requesting it again shows that the topic has been created: scala> kc.getPartitions(Set("doesntexistyet")).right res3: scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]] = RightProjection(Right(Set([doesntexistyet,0], [doesntexistyet,1]))) If you don't think that explains what happened, please let me know if you have a way of reproducing that exception against an existing-but-empty topic, because I cant. As far as what to do about this, my instinct is to just improve the error handling for the getPartitions call. If the topic doesn't exist yet, It shouldn't be returning an empty set, it should be returning an error. > Couldn't find leader offsets exception when creating KafkaDirectStream > -- > > Key: SPARK-6431 > URL: https://issues.apache.org/jira/browse/SPARK-6431 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Alberto > > When I try to create an InputDStream using the createDirectStream method of > the KafkaUtils class and the kafka topic does not have any messages yet am > getting the following error: > org.apache.spark.SparkException: Couldn't find leader offsets for Set() > org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't > find leader offsets for Set() > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413) > If I put a message in the topic before creating the DirectStream everything > works fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6569) Kafka directInputStream logs what appear to be incorrect warnings
[ https://issues.apache.org/jira/browse/SPARK-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14394457#comment-14394457 ] Cody Koeninger commented on SPARK-6569: --- Keep in mind that this message is in KafkaRDD, which is designed to be used on its own outside of a streaming job. If someone asks for a 0 length offset range, I'd want the system to do something other than silently do nothing. Platon's point about monitoring makes sense, and we do monitor our jobs. I see that as orthogonal to logging - a graphite monitor might tell me a job is running abnormally, Spark logs might tell me that's because a producer is misconfigured and a particular topic is only getting written for one partition. I wouldn't expect to have to run my system at debug log level to get that. Honestly, given the incredible volume of Spark logs at info level, worrying about one more (potentially useful) log at info level doesn't make sense to me. > Kafka directInputStream logs what appear to be incorrect warnings > - > > Key: SPARK-6569 > URL: https://issues.apache.org/jira/browse/SPARK-6569 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 > Environment: Spark 1.3.0 >Reporter: Platon Potapov >Priority: Minor > > During what appears to be normal operation of streaming from a Kafka topic, > the following log records are observed, logged periodically: > {code} > [Stage 391:==> (3 + 0) / > 4] > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > {code} > * the part.fromOffset placeholder is not correctly substituted to a value > * is the condition really mandates a warning being logged? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6569) Kafka directInputStream logs what appear to be incorrect warnings
[ https://issues.apache.org/jira/browse/SPARK-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384038#comment-14384038 ] Cody Koeninger commented on SPARK-6569: --- I set it as warn because an empty batch can be the source of non-obvious problems that would be obscured if it was at the info level. Streams that don't get even one item during a batch are relatively rare for my use cases. I don't feel super strongly about it, though, if there's a reason to reduce the log level. > Kafka directInputStream logs what appear to be incorrect warnings > - > > Key: SPARK-6569 > URL: https://issues.apache.org/jira/browse/SPARK-6569 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 > Environment: Spark 1.3.0 >Reporter: Platon Potapov >Priority: Minor > > During what appears to be normal operation of streaming from a Kafka topic, > the following log records are observed, logged periodically: > {code} > [Stage 391:==> (3 + 0) / > 4] > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping raw 0 > {code} > * the part.fromOffset placeholder is not correctly substituted to a value > * is the condition really mandates a warning being logged? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6434) [Streaming][Kafka] CreateDirectStream for empty topics
[ https://issues.apache.org/jira/browse/SPARK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371330#comment-14371330 ] Cody Koeninger commented on SPARK-6434: --- Yeah, I didn't notice alberto had also created a ticket, we can mark one as duplicate. The thing is, I definitely would expect the stream to fail if I typo the name of the topic. I'm not 100% certain without trying it out whether you can tell the difference between a topic that doesn't exist and a topic that doesn't have messages, solely through the metadata api. > [Streaming][Kafka] CreateDirectStream for empty topics > -- > > Key: SPARK-6434 > URL: https://issues.apache.org/jira/browse/SPARK-6434 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Cody Koeninger >Priority: Minor > Fix For: 1.3.1 > > > http://apache-spark-developers-list.1001551.n3.nabble.com/Exception-using-the-new-createDirectStream-util-method-td8.html > Looks like kafka metadata apis don't return leaders for topics that don't > have messages yet. > Should either try to work around this, or document that topics need messages > before starting the stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6434) [Streaming][Kafka] CreateDirectStream for empty topics
Cody Koeninger created SPARK-6434: - Summary: [Streaming][Kafka] CreateDirectStream for empty topics Key: SPARK-6434 URL: https://issues.apache.org/jira/browse/SPARK-6434 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: Cody Koeninger Priority: Minor Fix For: 1.3.1 http://apache-spark-developers-list.1001551.n3.nabble.com/Exception-using-the-new-createDirectStream-util-method-td8.html Looks like kafka metadata apis don't return leaders for topics that don't have messages yet. Should either try to work around this, or document that topics need messages before starting the stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14355561#comment-14355561 ] Cody Koeninger commented on SPARK-6249: --- First, I don't think setting group.id should have any effect on whether offsets are pulled from / saved to ZK. They're two different things. Second, I think this is another case where exposing an API to make it easy to get / set kafka's managed consumer offsets would allow people to solve this problem for themselves, in the way that makes sense for them. We already have a way to specify the beginning offsets of the stream. We already have code to get / set consumer offsets from Kafka, it's just not exposed. By the time 1.4 is ready to release, hopefully we'll know whether we're ok exposing it. I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. > Get Kafka offsets from consumer group in ZK when using direct stream > > > Key: SPARK-6249 > URL: https://issues.apache.org/jira/browse/SPARK-6249 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Tathagata Das > > This is the proposal. > The simpler direct API (the one that does not take explicit offsets) can be > modified to also pick up the initial offset from ZK if group.id is specified. > This is exactly similar to how we find the latest or earliest offset in that > API, just that instead of latest/earliest offset of the topic we want to find > the offset from the consumer group. The group offsets is ZK is not used at > all for any further processing and restarting, so the exactly-once semantics > is not broken. > The use case where this is useful is simplified code upgrade. If the user > wants to upgrade the code, he/she can the context stop gracefully which will > ensure the ZK consumer group offset will be updated with the last offsets > processed. Then the new code is started (not restarted from checkpoint) can > pickup the consumer group offset from ZK and continue where the previous > code had left off. > Without the functionality of picking up consumer group offsets to start (that > is, currently) the only way to do this is for the users to save the offsets > somewhere (file, database, etc.) and manage the offsets themselves. I just > want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2808) update kafka to version 0.8.2
[ https://issues.apache.org/jira/browse/SPARK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14316937#comment-14316937 ] Cody Koeninger commented on SPARK-2808: --- I'm also kind of curious what the motivation is - I'd assume it's just for scala 2.11 compatibility? The topic-backed consumer offset stuff still sounds promising-but-sketchy to me, honestly > update kafka to version 0.8.2 > - > > Key: SPARK-2808 > URL: https://issues.apache.org/jira/browse/SPARK-2808 > Project: Spark > Issue Type: Sub-task > Components: Build, Spark Core >Reporter: Anand Avati > > First kafka_2.11 0.8.1 has to be released -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4964) Exactly-once semantics for Kafka
[ https://issues.apache.org/jira/browse/SPARK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14290747#comment-14290747 ] Cody Koeninger commented on SPARK-4964: --- Design doc at https://docs.google.com/a/databricks.com/document/d/1IuvZhg9cOueTf1mq4qwc1fhPb5FVcaRLcyjrtG4XU1k/edit?usp=sharing > Exactly-once semantics for Kafka > > > Key: SPARK-4964 > URL: https://issues.apache.org/jira/browse/SPARK-4964 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Cody Koeninger > > for background, see > http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html > Requirements: > - allow client code to implement exactly-once end-to-end semantics for Kafka > messages, in cases where their output storage is either idempotent or > transactional > - allow client code access to Kafka offsets, rather than automatically > committing them > - do not assume Zookeeper as a repository for offsets (for the transactional > case, offsets need to be stored in the same store as the data) > - allow failure recovery without lost or duplicated messages, even in cases > where a checkpoint cannot be restored (for instance, because code must be > updated) > Design: > The basic idea is to make an rdd where each partition corresponds to a given > Kafka topic, partition, starting offset, and ending offset. That allows for > deterministic replay of data from Kafka (as long as there is enough log > retention). > Client code is responsible for committing offsets, either transactionally to > the same store that data is being written to, or in the case of idempotent > data, after data has been written. > PR of a sample implementation for both the batch and dstream case is > forthcoming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers
[ https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14265570#comment-14265570 ] Cody Koeninger commented on SPARK-4960: --- At that point, it sounds like you're talking about an early filter rather than an early flatmap. Should it just be T => Option[T]? Since this ticket no longer solves the problem raised by SPARK-3146, and it seems unlikely that patch will ever get merged, what is the concrete plan for giving users of receiver-based kafka implementations early access to MessageAndMetadata? On Mon, Jan 5, 2015 at 7:48 PM, Tathagata Das (JIRA) > Interceptor pattern in receivers > > > Key: SPARK-4960 > URL: https://issues.apache.org/jira/browse/SPARK-4960 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Tathagata Das > > Sometimes it is good to intercept a message received through a receiver and > modify / do something with the message before it is stored into Spark. This > is often referred to as the interceptor pattern. There should be general way > to specify an interceptor function that gets applied to all receivers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers
[ https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260144#comment-14260144 ] Cody Koeninger commented on SPARK-4960: --- You're saying for an implementation that currently extends e.g. Receiver[T] that a user can provide a function T => Iterable[M] But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't need a user provided function of type (K, V) => Iterable[M] We need a user provided function of type MessageAndMetadata => Iterable[M] In other words, a third type parameter. I'm also not clear on how your proposed solution deals with the 9 different overloads of store(), including the one that takes raw serialized bytes. At that point I'm not sure that having an interceptor setter defined on a parent class makes a lot of sense, because it's the particular subclass that knows what its intermediate third type is (MessageAndMetadata in this case), as well as which store method(s) it cares about. Thats part of why I think constructor arguments are actually a cleaner way to handle this - kafka can have an "interceptor" argument that defaults to a function MessageAndMetadata => Iterable[(K,V)], other implementations can have a type signature for the interceptor that makes sense for them. As an aside, I think it should actually be TraversableOnce, not Iterable. All we care about is being able to call foreach on it once, and the classes that implement TraversableOnce are a superset of those that implement iterable. > Interceptor pattern in receivers > > > Key: SPARK-4960 > URL: https://issues.apache.org/jira/browse/SPARK-4960 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Tathagata Das > > Sometimes it is good to intercept a message received through a receiver and > modify / do something with the message before it is stored into Spark. This > is often referred to as the interceptor pattern. There should be general way > to specify an interceptor function that gets applied to all receivers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4964) Exactly-once semantics for Kafka
[ https://issues.apache.org/jira/browse/SPARK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14258670#comment-14258670 ] Cody Koeninger commented on SPARK-4964: --- Usage example of the dstream for the transactional and idempotent cases: https://github.com/koeninger/kafka-exactly-once/tree/master > Exactly-once semantics for Kafka > > > Key: SPARK-4964 > URL: https://issues.apache.org/jira/browse/SPARK-4964 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Cody Koeninger > > for background, see > http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html > Requirements: > - allow client code to implement exactly-once end-to-end semantics for Kafka > messages, in cases where their output storage is either idempotent or > transactional > - allow client code access to Kafka offsets, rather than automatically > committing them > - do not assume Zookeeper as a repository for offsets (for the transactional > case, offsets need to be stored in the same store as the data) > - allow failure recovery without lost or duplicated messages, even in cases > where a checkpoint cannot be restored (for instance, because code must be > updated) > Design: > The basic idea is to make an rdd where each partition corresponds to a given > Kafka topic, partition, starting offset, and ending offset. That allows for > deterministic replay of data from Kafka (as long as there is enough log > retention). > Client code is responsible for committing offsets, either transactionally to > the same store that data is being written to, or in the case of idempotent > data, after data has been written. > PR of a sample implementation for both the batch and dstream case is > forthcoming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4964) Exactly-once semantics for Kafka
Cody Koeninger created SPARK-4964: - Summary: Exactly-once semantics for Kafka Key: SPARK-4964 URL: https://issues.apache.org/jira/browse/SPARK-4964 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Cody Koeninger for background, see http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html Requirements: - allow client code to implement exactly-once end-to-end semantics for Kafka messages, in cases where their output storage is either idempotent or transactional - allow client code access to Kafka offsets, rather than automatically committing them - do not assume Zookeeper as a repository for offsets (for the transactional case, offsets need to be stored in the same store as the data) - allow failure recovery without lost or duplicated messages, even in cases where a checkpoint cannot be restored (for instance, because code must be updated) Design: The basic idea is to make an rdd where each partition corresponds to a given Kafka topic, partition, starting offset, and ending offset. That allows for deterministic replay of data from Kafka (as long as there is enough log retention). Client code is responsible for committing offsets, either transactionally to the same store that data is being written to, or in the case of idempotent data, after data has been written. PR of a sample implementation for both the batch and dstream case is forthcoming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM
[ https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254100#comment-14254100 ] Cody Koeninger commented on SPARK-3146: --- This is a real problem for production use, not an abstract concern about what users might or might not do if you gave them full access to power that they already should have had. The patch is a working implementation that solves the problem, we've been using it in production. I think the patch needs to be improved to take a function MessageAndMetadata => Iterable[R] instead of the current MessageAndMetadata => R (in other words it needs to be a flatMap operation instead of the current map). If for some reason this needs to be added to all receivers, and it needs to be done via a setter instead of a constructor argument, I don't understand why, but it will solve the problem, so I'm all for it. If it really needs to be a function (Any, Any) => Iterable[R], I think that's worse from a user interface point of view, but again it will solve the problem, so I'm all for it. If someone with the power to merge PRs will say which of those options they would be willing to accept, I'll happily implement it. What I definitely don't want to see is for this ticket to languish for another 6 months, during which time I keep having to do manual maintenance of a patched Spark. > Improve the flexibility of Spark Streaming Kafka API to offer user the > ability to process message before storing into BM > > > Key: SPARK-3146 > URL: https://issues.apache.org/jira/browse/SPARK-3146 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.2, 1.1.0 >Reporter: Saisai Shao > > Currently Spark Streaming Kafka API stores the key and value of each message > into BM for processing, potentially this may lose the flexibility for > different requirements: > 1. currently topic/partition/offset information for each message is discarded > by KafkaInputDStream. In some scenarios people may need this information to > better filter the message, like SPARK-2388 described. > 2. People may need to add timestamp for each message when feeding into Spark > Streaming, which can better measure the system latency. > 3. Checkpointing the partition/offsets or others... > So here we add a messageHandler in interface to give people the flexibility > to preprocess the message before storing into BM. In the meantime time this > improvement keep compatible with current API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM
[ https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252397#comment-14252397 ] Cody Koeninger commented on SPARK-3146: --- ((K, V), (topicAndPartition, offset)) is all of the public information in a MessageAndMetadata object, aside from the decoders, so that's probably ok. If you're trying to come up with a uniform interface (Any, Any) => Iterable[R] doesn't seem much better than Any => Iterable[R] though. Also, just to be clear, the particular example calls you're talking about only exist in ReliableKafkaReceiver, not KafkaInputDStream, and we need something that works for both. > Improve the flexibility of Spark Streaming Kafka API to offer user the > ability to process message before storing into BM > > > Key: SPARK-3146 > URL: https://issues.apache.org/jira/browse/SPARK-3146 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.2, 1.1.0 >Reporter: Saisai Shao > > Currently Spark Streaming Kafka API stores the key and value of each message > into BM for processing, potentially this may lose the flexibility for > different requirements: > 1. currently topic/partition/offset information for each message is discarded > by KafkaInputDStream. In some scenarios people may need this information to > better filter the message, like SPARK-2388 described. > 2. People may need to add timestamp for each message when feeding into Spark > Streaming, which can better measure the system latency. > 3. Checkpointing the partition/offsets or others... > So here we add a messageHandler in interface to give people the flexibility > to preprocess the message before storing into BM. In the meantime time this > improvement keep compatible with current API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM
[ https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252299#comment-14252299 ] Cody Koeninger commented on SPARK-3146: --- Yes, for the specific case of kafka, regardless of the name of the api, the user supplied function needs to be of type MessageAndMetadata => Iterable[R] > Improve the flexibility of Spark Streaming Kafka API to offer user the > ability to process message before storing into BM > > > Key: SPARK-3146 > URL: https://issues.apache.org/jira/browse/SPARK-3146 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.2, 1.1.0 >Reporter: Saisai Shao > > Currently Spark Streaming Kafka API stores the key and value of each message > into BM for processing, potentially this may lose the flexibility for > different requirements: > 1. currently topic/partition/offset information for each message is discarded > by KafkaInputDStream. In some scenarios people may need this information to > better filter the message, like SPARK-2388 described. > 2. People may need to add timestamp for each message when feeding into Spark > Streaming, which can better measure the system latency. > 3. Checkpointing the partition/offsets or others... > So here we add a messageHandler in interface to give people the flexibility > to preprocess the message before storing into BM. In the meantime time this > improvement keep compatible with current API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4122) Add library to write data back to Kafka
[ https://issues.apache.org/jira/browse/SPARK-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252261#comment-14252261 ] Cody Koeninger commented on SPARK-4122: --- +1 for the idea of making this write from both DStreams and normal RDDs > Add library to write data back to Kafka > --- > > Key: SPARK-4122 > URL: https://issues.apache.org/jira/browse/SPARK-4122 > Project: Spark > Issue Type: Bug >Reporter: Hari Shreedharan >Assignee: Hari Shreedharan > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org