[jira] [Commented] (SPARK-2629) Improved state management for Spark Streaming (mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710144#comment-16710144 ] Dan Dutrow commented on SPARK-2629: --- This PR should not reference SPARK-2629 > Improved state management for Spark Streaming (mapWithState) > > > Key: SPARK-2629 > URL: https://issues.apache.org/jira/browse/SPARK-2629 > Project: Spark > Issue Type: Epic > Components: DStreams >Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > Fix For: 1.6.0 > > > Current updateStateByKey provides stateful processing in Spark Streaming. It > allows the user to maintain per-key state and manage that state using an > updateFunction. The updateFunction is called for each key, and it uses new > data and existing state of the key, to generate an updated state. However, > based on community feedback, we have learnt the following lessons. > - Need for more optimized state management that does not scan every key > - Need to make it easier to implement common use cases - (a) timeout of idle > data, (b) returning items other than state > The high level idea that I am proposing is > - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user > to update per-key state, and emit arbitrary records. The new API is necessary > as this will have significantly different semantics than the existing > updateStateByKey API. This API will have direct support for timeouts. > - Internally, the system will keep the state data as a map/list within the > partitions of the state RDDs. The new data RDDs will be partitioned > appropriately, and for all the key-value data, it will lookup the map/list in > the state RDD partition and create a new list/map of updated state data. The > new state RDD partition will be created based on the update data and if > necessary, with old data. > Here is the detailed design doc (*outdated, to be updated*). Please take a > look and provide feedback as comments. > https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-21065: --- Component/s: Web UI > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler, Web UI >Affects Versions: 2.1.0 >Reporter: Dan Dutrow > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown.) > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-21065: --- Description: My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown.) 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 149697756 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code causing the exception is here: https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized { // This method is called before onBatchCompleted {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) } It seems to me that it may be caused by that batch being removed earlier. https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} val batchUIData = BatchUIData(batchCompleted.batchInfo) completedBatchUIData.enqueue(batchUIData) if (completedBatchUIData.size > batchUIDataLimit) { val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) } totalCompletedBatches += 1L totalProcessedRecords += batchUIData.numRecords } } What is the solution here? Should I make my spark streaming context remember duration a lot longer? ssc.remember(batchDuration * rememberMultiple) Otherwise, it seems like there should be some kind of existence check on runningBatchUIData before dereferencing it. was: My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown. 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 149697756 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code
[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046649#comment-16046649 ] Dan Dutrow commented on SPARK-21065: Something to note: If one batch's processing time exceeds the batch interval, then a second batch could begin before the first is complete. This is fine behavior for us, and offers the ability to catch up if something gets delayed, but may be confusing for the scheduler. > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler >Affects Versions: 2.1.0 >Reporter: Dan Dutrow > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown. > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-21065: --- Component/s: DStreams > Spark Streaming concurrentJobs + StreamingJobProgressListener conflict > -- > > Key: SPARK-21065 > URL: https://issues.apache.org/jira/browse/SPARK-21065 > Project: Spark > Issue Type: Bug > Components: DStreams, Scheduler >Affects Versions: 2.1.0 >Reporter: Dan Dutrow > > My streaming application has 200+ output operations, many of them stateful > and several of them windowed. In an attempt to reduce the processing times, I > set "spark.streaming.concurrentJobs" to 2+. Initial results are very > positive, cutting our processing time from ~3 minutes to ~1 minute, but > eventually we encounter an exception as follows: > Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a > batch from 45 minutes before the exception is thrown. > 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR > org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener > StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found 149697756 ms > at scala.collection.MalLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) > at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) > ... > The Spark code causing the exception is here: > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 > override def onOutputOperationCompleted( > outputOperationCompleted: StreamingListenerOutputOperationCompleted): > Unit = synchronized { > // This method is called before onBatchCompleted > {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} > updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) > } > It seems to me that it may be caused by that batch being removed earlier. > https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 > override def onBatchCompleted(batchCompleted: > StreamingListenerBatchCompleted): Unit = { > synchronized { > waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) > > {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} > val batchUIData = BatchUIData(batchCompleted.batchInfo) > completedBatchUIData.enqueue(batchUIData) > if (completedBatchUIData.size > batchUIDataLimit) { > val removedBatch = completedBatchUIData.dequeue() > batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) > } > totalCompletedBatches += 1L > totalProcessedRecords += batchUIData.numRecords > } > } > What is the solution here? Should I make my spark streaming context remember > duration a lot longer? ssc.remember(batchDuration * rememberMultiple) > Otherwise, it seems like there should be some kind of existence check on > runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
[ https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-21065: --- Description: My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown. 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 149697756 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code causing the exception is here: https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized { // This method is called before onBatchCompleted {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) } It seems to me that it may be caused by that batch being removed earlier. https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} val batchUIData = BatchUIData(batchCompleted.batchInfo) completedBatchUIData.enqueue(batchUIData) if (completedBatchUIData.size > batchUIDataLimit) { val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) } totalCompletedBatches += 1L totalProcessedRecords += batchUIData.numRecords } } What is the solution here? Should I make my spark streaming context remember duration a lot longer? ssc.remember(batchDuration * rememberMultiple) Otherwise, it seems like there should be some kind of existence check on runningBatchUIData before dereferencing it. was: My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown. 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 149697756 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code
[jira] [Created] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
Dan Dutrow created SPARK-21065: -- Summary: Spark Streaming concurrentJobs + StreamingJobProgressListener conflict Key: SPARK-21065 URL: https://issues.apache.org/jira/browse/SPARK-21065 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.1.0 Reporter: Dan Dutrow My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows: Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown. 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found 149697756 ms at scala.collection.MalLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) ... The Spark code causing the exception is here: https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125 override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized { // This method is called before onBatchCompleted {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color} updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo) } It seems to me that it may be caused by that batch being removed earlier. https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { synchronized { waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime) {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color} val batchUIData = BatchUIData(batchCompleted.batchInfo) completedBatchUIData.enqueue(batchUIData) if (completedBatchUIData.size > batchUIDataLimit) { val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) } totalCompletedBatches += 1L totalProcessedRecords += batchUIData.numRecords } } What is the solution here? Should I make my spark streaming context remember duration a lot longer? ssc.remember(batchDuration * rememberDuration) Otherwise, it seems like there should be some kind of existence check on runningBatchUIData before dereferencing it. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6305) Add support for log4j 2.x to Spark
[ https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964554#comment-15964554 ] Dan Dutrow edited comment on SPARK-6305 at 4/11/17 4:18 PM: Any update on the current status of this ticket? It would be particularly beneficial to our program, and presumably many others, if the KafkaAppender (available in log4j 2.x) could be provided to spark workers for real-time log aggregation. https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This is not an easy thing to override in a spark application due to native log4j loading with the CoarseGrainedExecutorBackend before the application. was (Author: dutrow): Please provide an update on the current status of this ticket? It would be particularly beneficial to our program, and presumably many others, if the KafkaAppender (available in log4j 2.x) could be provided to spark workers for real-time log aggregation. https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This is not an easy thing to override in a spark application due to native log4j loading with the CoarseGrainedExecutorBackend before the application. > Add support for log4j 2.x to Spark > -- > > Key: SPARK-6305 > URL: https://issues.apache.org/jira/browse/SPARK-6305 > Project: Spark > Issue Type: Improvement > Components: Build >Reporter: Tal Sliwowicz >Priority: Minor > > log4j 2 requires replacing the slf4j binding and adding the log4j jars in the > classpath. Since there are shaded jars, it must be done during the build. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark
[ https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964554#comment-15964554 ] Dan Dutrow commented on SPARK-6305: --- Please provide an update on the current status of this ticket? It would be particularly beneficial to our program, and presumably many others, if the KafkaAppender (available in log4j 2.x) could be provided to spark workers for real-time log aggregation. https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This is not an easy thing to override in a spark application due to native log4j loading with the CoarseGrainedExecutorBackend before the application. > Add support for log4j 2.x to Spark > -- > > Key: SPARK-6305 > URL: https://issues.apache.org/jira/browse/SPARK-6305 > Project: Spark > Issue Type: Improvement > Components: Build >Reporter: Tal Sliwowicz >Priority: Minor > > log4j 2 requires replacing the slf4j binding and adding the log4j jars in the > classpath. Since there are shaded jars, it must be done during the build. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project
[ https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111044#comment-15111044 ] Dan Dutrow commented on SPARK-11045: +1 to Dibyendu's comment that "Being at spark-packages, many [people do] not even consider using it [and] use the whatever Receiver Based model which is documented with Spark." Having hit limitations in both of the Receiver and Direct APIs, it would have been nice to have been pointed to the availability of alternatives. > Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to > Apache Spark Project > > > Key: SPARK-11045 > URL: https://issues.apache.org/jira/browse/SPARK-11045 > Project: Spark > Issue Type: New Feature > Components: Streaming >Reporter: Dibyendu Bhattacharya > > This JIRA is to track the progress of making the Receiver based Low Level > Kafka Consumer from spark-packages > (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be > contributed back to Apache Spark Project. > This Kafka consumer has been around for more than year and has matured over > the time . I see there are many adoptions of this package . I receive > positive feedbacks that this consumer gives better performance and fault > tolerant capabilities. > This is the primary intent of this JIRA to give community a better > alternative if they want to use Receiver Base model. > If this consumer make it to Spark Core, it will definitely see more adoption > and support from community and help many who still prefer the Receiver Based > model of Kafka Consumer. > I understand the Direct Stream is the consumer which can give Exact Once > semantics and uses Kafka Low Level API , which is good . But Direct Stream > has concerns around recovering checkpoint on driver code change . Application > developer need to manage their own offset which complex . Even if some one > does manages their own offset , it limits the parallelism Spark Streaming can > achieve. If someone wants more parallelism and want > spark.streaming.concurrentJobs more than 1 , you can no longer rely on > storing offset externally as you have no control which batch will run in > which sequence. > Furthermore , the Direct Stream has higher latency , as it fetch messages > form Kafka during RDD action . Also number of RDD partitions are limited to > topic partition . So unless your Kafka topic does not have enough partitions, > you have limited parallelism while RDD processing. > Due to above mentioned concerns , many people who does not want Exactly Once > semantics , still prefer Receiver based model. Unfortunately, when customer > fall back to KafkaUtil.CreateStream approach, which use Kafka High Level > Consumer, there are other issues around the reliability of Kafka High Level > API. Kafka High Level API is buggy and has serious issue around Consumer > Re-balance. Hence I do not think this is correct to advice people to use > KafkaUtil.CreateStream in production . > The better option presently is there is to use the Consumer from > spark-packages . It is is using Kafka Low Level Consumer API , store offset > in Zookeeper, and can recover from any failure . Below are few highlights of > this consumer .. > 1. It has a inbuilt PID Controller for dynamic rate limiting. > 2. In this consumer , The Rate Limiting is done by modifying the size blocks > by controlling the size of messages pulled from Kafka. Whereas , in Spark the > Rate Limiting is done by controlling number of messages. The issue with > throttling by number of message is, if message size various, block size will > also vary . Let say your Kafka has messages for different sizes from 10KB to > 500 KB. Thus throttling by number of message can never give any deterministic > size of your block hence there is no guarantee that Memory Back-Pressure can > really take affect. > 3. This consumer is using Kafka low level API which gives better performance > than KafkaUtils.createStream based High Level API. > 4. This consumer can give end to end no data loss channel if enabled with WAL. > By accepting this low level kafka consumer from spark packages to apache > spark project , we will give community a better options for Kafka > connectivity both for Receiver less and Receiver based model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2388) Streaming from multiple different Kafka topics is problematic
[ https://issues.apache.org/jira/browse/SPARK-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048885#comment-15048885 ] Dan Dutrow commented on SPARK-2388: --- The Kafka Direct API lets you insert a callback function that gives you access to the topic name and other metadata besides they key. > Streaming from multiple different Kafka topics is problematic > - > > Key: SPARK-2388 > URL: https://issues.apache.org/jira/browse/SPARK-2388 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Sergey > Fix For: 1.0.1 > > > Default way of creating stream out of Kafka source would be as > val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", > Map("retarget" -> 2,"datapair" -> 2)) > However, if two topics - in this case "retarget" and "datapair" - are very > different, there is no way to set up different filter, mapping functions, > etc), as they are effectively merged. > However, instance of KafkaInputDStream, created with this call internally > calls ConsumerConnector.createMessageStream() which returns *map* of > KafkaStreams, keyed by topic. It would be great if this map would be exposed > somehow, so aforementioned call > val streamS = KafkaUtils.createStreamS(...) > returned map of streams. > Regards, > Sergey Malov > Collective Media -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6051) Add an option for DirectKafkaInputDStream to commit the offsets into ZK
[ https://issues.apache.org/jira/browse/SPARK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048833#comment-15048833 ] Dan Dutrow commented on SPARK-6051: --- Is there documentation for how to update the metrics (#messages per batch) in the Spark Streaming tab when using the Direct API? Does the Streaming tab get its information from Zookeeper or something else internally? > Add an option for DirectKafkaInputDStream to commit the offsets into ZK > --- > > Key: SPARK-6051 > URL: https://issues.apache.org/jira/browse/SPARK-6051 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Saisai Shao > > Currently in DirectKafkaInputDStream, offset is managed by Spark Streaming > itself without ZK or Kafka involved, which will make several third-party > offset monitoring tools fail to monitor the status of Kafka consumer. So here > as a option to commit the offset to ZK when each job is finished, the process > is implemented as a asynchronized way, so the main processing flow will not > be blocked, already tested with KafkaOffsetMonitor tools. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038106#comment-15038106 ] Dan Dutrow commented on SPARK-12103: Thanks. I don't mean to be a pain. I did make an honest attempt to figure it out and I'm not completely numb, just inexperienced with Kafka. I was trying to find a better way to pool resources and not have to dedicate a core to each receiver. Having the key be the topic name was wishful thinking and nothing in the documentation corrected that assumption. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955 ] Dan Dutrow edited comment on SPARK-12103 at 12/3/15 4:16 PM: - I'm sure all of this is obvious to Kafka experts. Nowhere in the Spark scala function documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the KafkaWordCount.scala and DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. was (Author: dutrow): I'm sure all of this is obvious to Kafka experts. Nowhere in the Scala Spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the KafkaWordCount.scala and DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955 ] Dan Dutrow edited comment on SPARK-12103 at 12/3/15 4:15 PM: - I'm sure all of this is obvious to Kafka experts. Nowhere in the Scala Spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the KafkaWordCount.scala and DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. was (Author: dutrow): I'm sure all of this is obvious to Kafka experts. Nowhere in the spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the KafkaWordCount.scala and DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow reopened SPARK-12103: Please update the Kafka word count examples to comment on what the key field is. The current examples ignore the key without explanation. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955 ] Dan Dutrow edited comment on SPARK-12103 at 12/3/15 3:53 PM: - I'm sure all of this is obvious to Kafka experts. Nowhere in the spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the KafkaWordCount.scala and DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. was (Author: dutrow): I'm sure all of this is obvious to Kafka experts. Nowhere in the spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955 ] Dan Dutrow commented on SPARK-12103: I'm sure all of this is obvious to Kafka experts. Nowhere in the spark documentation does it say that the Left value is the Kafka key, except if you make a guess about what K is. You have to trace through Spark and Kafka code to figure it out. Please update the DirectKafkaWordCount.scala to include a comment about what is in the ._1 field. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038007#comment-15038007 ] Dan Dutrow commented on SPARK-12103: Aha! I finally found where it's documented! It's in the Java API javadoc! I'm not in the habit of looking at the API for functions and languages I'm not using, but this will teach me! > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow closed SPARK-12103. -- Resolution: Won't Fix > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming >Affects Versions: 1.4.1 >Reporter: Dan Dutrow >Priority: Minor > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE was: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Dan Dutrow > Fix For: 1.0.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER(( > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Fix Version/s: (was: 1.0.1) 1.4.2 > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Affects Version/s: 1.1.0 1.2.0 1.3.0 1.4.0 1.4.1 > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER)) val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE was: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Dan Dutrow > Fix For: 1.0.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
Dan Dutrow created SPARK-12103: -- Summary: KafkaUtils createStream with multiple topics -- does not work as expected Key: SPARK-12103 URL: https://issues.apache.org/jira/browse/SPARK-12103 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.0.0 Reporter: Dan Dutrow Fix For: 1.0.1 Default way of creating stream out of Kafka source would be as val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", Map("retarget" -> 2,"datapair" -> 2)) However, if two topics - in this case "retarget" and "datapair" - are very different, there is no way to set up different filter, mapping functions, etc), as they are effectively merged. However, instance of KafkaInputDStream, created with this call internally calls ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, keyed by topic. It would be great if this map would be exposed somehow, so aforementioned call val streamS = KafkaUtils.createStreamS(...) returned map of streams. Regards, Sergey Malov Collective Media -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE was: Default way of creating stream out of Kafka source would be as val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", Map("retarget" -> 2,"datapair" -> 2)) However, if two topics - in this case "retarget" and "datapair" - are very different, there is no way to set up different filter, mapping functions, etc), as they are effectively merged. However, instance of KafkaInputDStream, created with this call internally calls ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, keyed by topic. It would be great if this map would be exposed somehow, so aforementioned call val streamS = KafkaUtils.createStreamS(...) returned map of streams. Regards, Sergey Malov Collective Media > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Dan Dutrow > Fix For: 1.0.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER(( > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Target Version/s: 1.4.2, 1.6.1 (was: 1.0.1) > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-12103: --- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to use one (or a few) Kafka Streaming Receiver to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER)) val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE was: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER)) val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { .. } } END CODE > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036741#comment-15036741 ] Dan Dutrow commented on SPARK-12103: One possible way around this problem would be to stick the topic name into the message key. Unless the message key is supposed to be unique, then this would allow you to retrieve the topic name from the data. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036625#comment-15036625 ] Dan Dutrow commented on SPARK-12103: After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#compaction I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected
[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036625#comment-15036625 ] Dan Dutrow edited comment on SPARK-12103 at 12/2/15 10:38 PM: -- After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#impl_producer I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. was (Author: dutrow): After digging into the Kafka code some more (specifically kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and kafka.message.MessageAndMetadata), it appears that the Left value of the tuple is not the topic name but rather a key that Kafka puts on each message. See http://kafka.apache.org/documentation.html#compaction I don't see a way around this without hacking KafkaStream and ConsumerIterator to return the topic name instead of the message key. The return value should probably be clarified in the documentation. > KafkaUtils createStream with multiple topics -- does not work as expected > - > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1 >Reporter: Dan Dutrow > Fix For: 1.4.2 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to use one (or a few) Kafka Streaming Receiver to pool > resources. I have 10+ topics and don't want to dedicate 10 cores to > processing all of them. However, when reading the data procuced by > KafkaUtils.createStream, the DStream[(String,String)] does not properly > insert the topic name into the tuple. The left-key always null, making it > impossible to know what topic that data came from other than stashing your > key into the value. Is there a way around that problem? > CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { >val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > .. > } > } > END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697157#comment-14697157 ] Dan Dutrow commented on SPARK-9947: --- The desire is to continue using checkpointing for everything but allow selective deleting of certain types of checkpoint data. Using an external database to duplicate that functionality is not desired. Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey and 24/7 operation functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697180#comment-14697180 ] Dan Dutrow commented on SPARK-9947: --- I want to maintain the data in updateStateByKey between upgrades. This data can be recovered between upgrades so long as objects don't change. Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey and 24/7 operation functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697220#comment-14697220 ] Dan Dutrow commented on SPARK-9947: --- Ok, right, we're saving the same state that's outputted from the updateStateByKey to HDFS. The thought is that maybe updateStateByKey is saving the exact same data in the checkpoint as I have to do in my own function. Allowing separation of the different types of data stored in the checkpoint data might allow me to not have to save the same state data again. Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey and 24/7 operation functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-9947: -- Priority: Major (was: Minor) Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-9947: -- Description: Problem: When updating an application that has checkpointing enabled to support the updateStateByKey and 24/7 operation functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. was: Problem: When updating an application that has checkpointing enabled to support the updateStateByKey functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey and 24/7 operation functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9947) Separate Metadata and State Checkpoint Data
Dan Dutrow created SPARK-9947: - Summary: Separate Metadata and State Checkpoint Data Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Dan Dutrow This is the proposal. The simpler direct API (the one that does not take explicit offsets) can be modified to also pick up the initial offset from ZK if group.id is specified. This is exactly similar to how we find the latest or earliest offset in that API, just that instead of latest/earliest offset of the topic we want to find the offset from the consumer group. The group offsets is ZK is not used at all for any further processing and restarting, so the exactly-once semantics is not broken. The use case where this is useful is simplified code upgrade. If the user wants to upgrade the code, he/she can the context stop gracefully which will ensure the ZK consumer group offset will be updated with the last offsets processed. Then the new code is started (not restarted from checkpoint) can pickup the consumer group offset from ZK and continue where the previous code had left off. Without the functionality of picking up consumer group offsets to start (that is, currently) the only way to do this is for the users to save the offsets somewhere (file, database, etc.) and manage the offsets themselves. I just want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695887#comment-14695887 ] Dan Dutrow edited comment on SPARK-6249 at 8/13/15 8:34 PM: I have read many of the tickets, documentation, and source code relating to this subject, but haven't found a solid answer to what appears to be the common question of how to reinitialize a Kafka Direct API consumer after a restart. I have a rapidly evolving application that uses checkpoints and the like for 24/7 operations. However, when we deploy a new jar, we have to blow away the checkpoint because otherwise the changes won't take affect cleanly. We have an approach to maintain the data between restarts, but not the Kafka offset, and doing the Kafka bookkeeping in our application requires a deeper knowledge and proficiency to make sure that is done correctly. Not being familiar with the ZooKeeper API, I was hoping for a Spark-abstracted approach where I can configure something in the API or with a parameter like: auto.offset.reset - remember to do the best it can to resume processing at the previous offset location (for that consumer group). Short of that, a link to a blog, document, or example in GitHub that describes the ZK API to accomplish this functionality in detail would be greatly appreciated. On a related topic, it would be awesome if the Streaming Statistics page could be updated so that you can review the message rates while using the Kafka Direct API. If making that work is another Zookeeper trick, some example code would be immensely helpful. http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tt24246.html was (Author: dutrow): I have read many of the tickets, documentation, and source code relating to this subject, but haven't found a solid answer to what appears to be the common question of how to reinitialize a Kafka Direct API consumer after a restart. I have a rapidly evolving application that uses checkpoints and the like for 24/7 operations. However, when we deploy a new jar, we have to blow away the checkpoint because otherwise the changes won't take affect cleanly. We have an approach to maintain the data between restarts, but not the Kafka offset, and doing the Kafka bookkeeping in our application requires a deeper knowledge and proficiency to make sure that is done correctly. Not being familiar with the ZooKeeper API, I was hoping for a Spark-abstracted approach where I can configure something in the API or with a parameter like: auto.offset.reset - remember to do the best it can to resume processing at the previous offset location (for that consumer group). Short of that, a link to a blog, document, or example in GitHub that describes the ZK API to accomplish this functionality in detail would be greatly appreciated. On a related topic, it would be awesome if the Streaming Statistics page could be updated so that you can review the message rates while using the Kafka Direct API. If making that work is another Zookeeper trick, some example code would be immensely helpful. Get Kafka offsets from consumer group in ZK when using direct stream Key: SPARK-6249 URL: https://issues.apache.org/jira/browse/SPARK-6249 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Tathagata Das This is the proposal. The simpler direct API (the one that does not take explicit offsets) can be modified to also pick up the initial offset from ZK if group.id is specified. This is exactly similar to how we find the latest or earliest offset in that API, just that instead of latest/earliest offset of the topic we want to find the offset from the consumer group. The group offsets is ZK is not used at all for any further processing and restarting, so the exactly-once semantics is not broken. The use case where this is useful is simplified code upgrade. If the user wants to upgrade the code, he/she can the context stop gracefully which will ensure the ZK consumer group offset will be updated with the last offsets processed. Then the new code is started (not restarted from checkpoint) can pickup the consumer group offset from ZK and continue where the previous code had left off. Without the functionality of picking up consumer group offsets to start (that is, currently) the only way to do this is for the users to save the offsets somewhere (file, database, etc.) and manage the offsets themselves. I just want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional
[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data
[ https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dan Dutrow updated SPARK-9947: -- Flags: Important Affects Version/s: 1.4.1 Target Version/s: 1.5.0 (was: 1.4.0) Remaining Estimate: 168h Original Estimate: 168h Priority: Minor (was: Major) Description: Problem: When updating an application that has checkpointing enabled to support the updateStateByKey functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. was: This is the proposal. The simpler direct API (the one that does not take explicit offsets) can be modified to also pick up the initial offset from ZK if group.id is specified. This is exactly similar to how we find the latest or earliest offset in that API, just that instead of latest/earliest offset of the topic we want to find the offset from the consumer group. The group offsets is ZK is not used at all for any further processing and restarting, so the exactly-once semantics is not broken. The use case where this is useful is simplified code upgrade. If the user wants to upgrade the code, he/she can the context stop gracefully which will ensure the ZK consumer group offset will be updated with the last offsets processed. Then the new code is started (not restarted from checkpoint) can pickup the consumer group offset from ZK and continue where the previous code had left off. Without the functionality of picking up consumer group offsets to start (that is, currently) the only way to do this is for the users to save the offsets somewhere (file, database, etc.) and manage the offsets themselves. I just want to simplify this process. Separate Metadata and State Checkpoint Data --- Key: SPARK-9947 URL: https://issues.apache.org/jira/browse/SPARK-9947 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.4.1 Reporter: Dan Dutrow Priority: Minor Original Estimate: 168h Remaining Estimate: 168h Problem: When updating an application that has checkpointing enabled to support the updateStateByKey functionality, you encounter the problem where you might like to maintain state data between restarts but delete the metadata containing execution state. If checkpoint data exists between code redeployment, the program may not execute properly or at all. My current workaround for this issue is to wrap updateStateByKey with my own function that persists the state after every update to my own separate directory. (That allows me to delete the checkpoint with its metadata before redeploying) Then, when I restart the application, I initialize the state with this persisted data. This incurs additional overhead due to persisting of the same data twice: once in the checkpoint and once in my persisted data folder. If Kafka Direct API offsets could be stored in another separate checkpoint directory, that would help address the problem of having to blow that away between code redeployment as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream
[ https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695887#comment-14695887 ] Dan Dutrow commented on SPARK-6249: --- I have read many of the tickets, documentation, and source code relating to this subject, but haven't found a solid answer to what appears to be the common question of how to reinitialize a Kafka Direct API consumer after a restart. I have a rapidly evolving application that uses checkpoints and the like for 24/7 operations. However, when we deploy a new jar, we have to blow away the checkpoint because otherwise the changes won't take affect cleanly. We have an approach to maintain the data between restarts, but not the Kafka offset, and doing the Kafka bookkeeping in our application requires a deeper knowledge and proficiency to make sure that is done correctly. Not being familiar with the ZooKeeper API, I was hoping for a Spark-abstracted approach where I can configure something in the API or with a parameter like: auto.offset.reset - remember to do the best it can to resume processing at the previous offset location (for that consumer group). Short of that, a link to a blog, document, or example in GitHub that describes the ZK API to accomplish this functionality in detail would be greatly appreciated. On a related topic, it would be awesome if the Streaming Statistics page could be updated so that you can review the message rates while using the Kafka Direct API. If making that work is another Zookeeper trick, some example code would be immensely helpful. Get Kafka offsets from consumer group in ZK when using direct stream Key: SPARK-6249 URL: https://issues.apache.org/jira/browse/SPARK-6249 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Tathagata Das This is the proposal. The simpler direct API (the one that does not take explicit offsets) can be modified to also pick up the initial offset from ZK if group.id is specified. This is exactly similar to how we find the latest or earliest offset in that API, just that instead of latest/earliest offset of the topic we want to find the offset from the consumer group. The group offsets is ZK is not used at all for any further processing and restarting, so the exactly-once semantics is not broken. The use case where this is useful is simplified code upgrade. If the user wants to upgrade the code, he/she can the context stop gracefully which will ensure the ZK consumer group offset will be updated with the last offsets processed. Then the new code is started (not restarted from checkpoint) can pickup the consumer group offset from ZK and continue where the previous code had left off. Without the functionality of picking up consumer group offsets to start (that is, currently) the only way to do this is for the users to save the offsets somewhere (file, database, etc.) and manage the offsets themselves. I just want to simplify this process. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org