[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041301#comment-15041301 ] Saisai Shao commented on SPARK-12103: - I think I had a proposal of message handler (receiver interceptor) for receiver-based Kafka stream to support your scenario, but it was obsolete for a long time and only adopted in direct Kafka stream. Currently adding back this feature to receiver based Kafka stream will break a lot things, also it is not so meaningful (I think lots of users are shifting to direct API now). > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037773#comment-15037773 ] Cody Koeninger commented on SPARK-12103: A cursory review of the Kafka project documentation should reveal that messages have a key (used for distribution among partitions) and a value. Why would one reasonably expect that Spark documentation referring to a Kafka message key was instead supposed to be the message topic? If you really want the topic name in each item of the rdd, create your separate streams, map over them to add the topic name, then union them together into a single stream. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038084#comment-15038084 ] Cody Koeninger commented on SPARK-12103: On the off chance you're acting in good faith, actually did read the Kafka documentation and Spark API documentation, and were still somehow unclear that K meant the type of the message Key and V meant the type of the message Value... I'll submit a pr to add @tparam docs for every type in the KafkaUtils api > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038106#comment-15038106 ] Dan Dutrow commented on SPARK-12103: Thanks. I don't mean to be a pain. I did make an honest attempt to figure it out and I'm not completely numb, just inexperienced with Kafka. I was trying to find a better way to pool resources and not have to dedicate a core to each receiver. Having the key be the topic name was wishful thinking and nothing in the documentation corrected that assumption. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038115#comment-15038115 ] Apache Spark commented on SPARK-12103: -- User 'koeninger' has created a pull request for this issue: https://github.com/apache/spark/pull/10132 > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037986#comment-15037986 ] Cody Koeninger commented on SPARK-12103: Knowing that kafka messages have a key and value isn't an "Kafka expert" thing. Serious question - did you read http://kafka.apache.org/documentation.html before posting this? The api documentation for createStream at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$ keyTypeClass Key type of DStream valueTypeClass value type of Dstream keyDecoderClass Type of kafka key decoder valueDecoderClass Type of kafka value decoder How is this not clear? If you have a specific improvement to docs, send a PR > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955 ] Dan Dutrow commented on SPARK-12103: I'm sure all of this is obvious to Kafka experts. Nowhere in the spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038007#comment-15038007 ] Dan Dutrow commented on SPARK-12103: Aha! I finally found where it's documented! It's in the Java API javadoc! I'm not in the habit of looking at the API for functions and languages I'm not using, but this will teach me! > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036741#comment-15036741 ] Dan Dutrow commented on SPARK-12103: One possible way around this problem would be to stick the topic name into the message key. Unless the message key is supposed to be unique, then this would allow you to retrieve the topic name from the data. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036625#comment-15036625 ] Dan Dutrow commented on SPARK-12103: After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#compaction I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org