[jira] [Commented] (SPARK-2629) Improved state management for Spark Streaming (mapWithState)

2018-12-05 Thread Dan Dutrow (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710144#comment-16710144
 ] 

Dan Dutrow commented on SPARK-2629:
---

This PR should not reference SPARK-2629





> Improved state management for Spark Streaming (mapWithState)
> 
>
> Key: SPARK-2629
> URL: https://issues.apache.org/jira/browse/SPARK-2629
> Project: Spark
>  Issue Type: Epic
>  Components: DStreams
>Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 1.6.0
>
>
>  Current updateStateByKey provides stateful processing in Spark Streaming. It 
> allows the user to maintain per-key state and manage that state using an 
> updateFunction. The updateFunction is called for each key, and it uses new 
> data and existing state of the key, to generate an updated state. However, 
> based on community feedback, we have learnt the following lessons.
> - Need for more optimized state management that does not scan every key
> - Need to make it easier to implement common use cases - (a) timeout of idle 
> data, (b) returning items other than state
> The high level idea that I am proposing is 
> - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user 
> to update per-key state, and emit arbitrary records. The new API is necessary 
> as this will have significantly different semantics than the existing 
> updateStateByKey API. This API will have direct support for timeouts.
> - Internally, the system will keep the state data as a map/list within the 
> partitions of the state RDDs. The new data RDDs will be partitioned 
> appropriately, and for all the key-value data, it will lookup the map/list in 
> the state RDD partition and create a new list/map of updated state data. The 
> new state RDD partition will be created based on the update data and if 
> necessary, with old data. 
> Here is the detailed design doc (*outdated, to be updated*). Please take a 
> look and provide feedback as comments.
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-13 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-21065:
---
Component/s: Web UI

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler, Web UI
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> (Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.)
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-21065:
---
Description: 
My streaming application has 200+ output operations, many of them stateful and 
several of them windowed. In an attempt to reduce the processing times, I set 
"spark.streaming.concurrentJobs" to 2+. Initial results are very positive, 
cutting our processing time from ~3 minutes to ~1 minute, but eventually we 
encounter an exception as follows:

(Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
batch from 45 minutes before the exception is thrown.)

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 149697756 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code causing the exception is here:

https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
  override def onOutputOperationCompleted(
  outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
Unit = synchronized {
// This method is called before onBatchCompleted
{color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
  updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
}

It seems to me that it may be caused by that batch being removed earlier.
https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
synchronized {
  waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
  
{color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
  val batchUIData = BatchUIData(batchCompleted.batchInfo)
  completedBatchUIData.enqueue(batchUIData)
  if (completedBatchUIData.size > batchUIDataLimit) {
val removedBatch = completedBatchUIData.dequeue()
batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
  }
  totalCompletedBatches += 1L

  totalProcessedRecords += batchUIData.numRecords
}
}

What is the solution here? Should I make my spark streaming context remember 
duration a lot longer? ssc.remember(batchDuration * rememberMultiple)

Otherwise, it seems like there should be some kind of existence check on 
runningBatchUIData before dereferencing it.


  was:
My streaming application has 200+ output operations, many of them stateful and 
several of them windowed. In an attempt to reduce the processing times, I set 
"spark.streaming.concurrentJobs" to 2+. Initial results are very positive, 
cutting our processing time from ~3 minutes to ~1 minute, but eventually we 
encounter an exception as follows:

Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
batch from 45 minutes before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 149697756 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code 

[jira] [Commented] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046649#comment-16046649
 ] 

Dan Dutrow commented on SPARK-21065:


Something to note: If one batch's processing time exceeds the batch interval, 
then a second batch could begin before the first is complete. This is fine 
behavior for us, and offers the ability to catch up if something gets delayed, 
but may be confusing for the scheduler.

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-21065:
---
Component/s: DStreams

> Spark Streaming concurrentJobs + StreamingJobProgressListener conflict
> --
>
> Key: SPARK-21065
> URL: https://issues.apache.org/jira/browse/SPARK-21065
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Scheduler
>Affects Versions: 2.1.0
>Reporter: Dan Dutrow
>
> My streaming application has 200+ output operations, many of them stateful 
> and several of them windowed. In an attempt to reduce the processing times, I 
> set "spark.streaming.concurrentJobs" to 2+. Initial results are very 
> positive, cutting our processing time from ~3 minutes to ~1 minute, but 
> eventually we encounter an exception as follows:
> Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
> batch from 45 minutes before the exception is thrown.
> 2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
> org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
> StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found 149697756 ms
> at scala.collection.MalLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
> at 
> org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
> ...
> The Spark code causing the exception is here:
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
>   override def onOutputOperationCompleted(
>   outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
> Unit = synchronized {
> // This method is called before onBatchCompleted
> {color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
>   updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
> }
> It seems to me that it may be caused by that batch being removed earlier.
> https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102
>   override def onBatchCompleted(batchCompleted: 
> StreamingListenerBatchCompleted): Unit = {
> synchronized {
>   waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
>   
> {color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
>   val batchUIData = BatchUIData(batchCompleted.batchInfo)
>   completedBatchUIData.enqueue(batchUIData)
>   if (completedBatchUIData.size > batchUIDataLimit) {
> val removedBatch = completedBatchUIData.dequeue()
> batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
>   }
>   totalCompletedBatches += 1L
>   totalProcessedRecords += batchUIData.numRecords
> }
> }
> What is the solution here? Should I make my spark streaming context remember 
> duration a lot longer? ssc.remember(batchDuration * rememberMultiple)
> Otherwise, it seems like there should be some kind of existence check on 
> runningBatchUIData before dereferencing it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-21065:
---
Description: 
My streaming application has 200+ output operations, many of them stateful and 
several of them windowed. In an attempt to reduce the processing times, I set 
"spark.streaming.concurrentJobs" to 2+. Initial results are very positive, 
cutting our processing time from ~3 minutes to ~1 minute, but eventually we 
encounter an exception as follows:

Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
batch from 45 minutes before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 149697756 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code causing the exception is here:

https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
  override def onOutputOperationCompleted(
  outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
Unit = synchronized {
// This method is called before onBatchCompleted
{color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
  updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
}

It seems to me that it may be caused by that batch being removed earlier.
https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
synchronized {
  waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
  
{color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
  val batchUIData = BatchUIData(batchCompleted.batchInfo)
  completedBatchUIData.enqueue(batchUIData)
  if (completedBatchUIData.size > batchUIDataLimit) {
val removedBatch = completedBatchUIData.dequeue()
batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
  }
  totalCompletedBatches += 1L

  totalProcessedRecords += batchUIData.numRecords
}
}

What is the solution here? Should I make my spark streaming context remember 
duration a lot longer? ssc.remember(batchDuration * rememberMultiple)

Otherwise, it seems like there should be some kind of existence check on 
runningBatchUIData before dereferencing it.


  was:
My streaming application has 200+ output operations, many of them stateful and 
several of them windowed. In an attempt to reduce the processing times, I set 
"spark.streaming.concurrentJobs" to 2+. Initial results are very positive, 
cutting our processing time from ~3 minutes to ~1 minute, but eventually we 
encounter an exception as follows:

Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
batch from 45 minutes before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 149697756 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code 

[jira] [Created] (SPARK-21065) Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

2017-06-12 Thread Dan Dutrow (JIRA)
Dan Dutrow created SPARK-21065:
--

 Summary: Spark Streaming concurrentJobs + 
StreamingJobProgressListener conflict
 Key: SPARK-21065
 URL: https://issues.apache.org/jira/browse/SPARK-21065
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.1.0
Reporter: Dan Dutrow


My streaming application has 200+ output operations, many of them stateful and 
several of them windowed. In an attempt to reduce the processing times, I set 
"spark.streaming.concurrentJobs" to 2+. Initial results are very positive, 
cutting our processing time from ~3 minutes to ~1 minute, but eventually we 
encounter an exception as follows:

Note that 149697756 ms is 2017-06-09 03:06:00, so it's trying to get a 
batch from 45 minutes before the exception is thrown.

2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR 
org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener 
StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found 149697756 ms
at scala.collection.MalLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at 
org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
...


The Spark code causing the exception is here:

https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
  override def onOutputOperationCompleted(
  outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
Unit = synchronized {
// This method is called before onBatchCompleted
{color:red}runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).{color}
  updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
}

It seems to me that it may be caused by that batch being removed earlier.
https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
synchronized {
  waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
  
{color:red}runningBatchUIData.remove(batchCompleted.batchInfo.batchTime){color}
  val batchUIData = BatchUIData(batchCompleted.batchInfo)
  completedBatchUIData.enqueue(batchUIData)
  if (completedBatchUIData.size > batchUIDataLimit) {
val removedBatch = completedBatchUIData.dequeue()
batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
  }
  totalCompletedBatches += 1L

  totalProcessedRecords += batchUIData.numRecords
}
}

What is the solution here? Should I make my spark streaming context remember 
duration a lot longer? ssc.remember(batchDuration * rememberDuration)

Otherwise, it seems like there should be some kind of existence check on 
runningBatchUIData before dereferencing it.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6305) Add support for log4j 2.x to Spark

2017-04-11 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964554#comment-15964554
 ] 

Dan Dutrow edited comment on SPARK-6305 at 4/11/17 4:18 PM:


Any update on the current status of this ticket? It would be particularly 
beneficial to our program, and presumably many others, if the KafkaAppender 
(available in log4j 2.x) could be provided to spark workers for real-time log 
aggregation. 
https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This 
is not an easy thing to override in a spark application due to native log4j 
loading with the CoarseGrainedExecutorBackend before the application.


was (Author: dutrow):
Please provide an update on the current status of this ticket? It would be 
particularly beneficial to our program, and presumably many others, if the 
KafkaAppender (available in log4j 2.x) could be provided to spark workers for 
real-time log aggregation. 
https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This 
is not an easy thing to override in a spark application due to native log4j 
loading with the CoarseGrainedExecutorBackend before the application.

> Add support for log4j 2.x to Spark
> --
>
> Key: SPARK-6305
> URL: https://issues.apache.org/jira/browse/SPARK-6305
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Reporter: Tal Sliwowicz
>Priority: Minor
>
> log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
> classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark

2017-04-11 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964554#comment-15964554
 ] 

Dan Dutrow commented on SPARK-6305:
---

Please provide an update on the current status of this ticket? It would be 
particularly beneficial to our program, and presumably many others, if the 
KafkaAppender (available in log4j 2.x) could be provided to spark workers for 
real-time log aggregation. 
https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender This 
is not an easy thing to override in a spark application due to native log4j 
loading with the CoarseGrainedExecutorBackend before the application.

> Add support for log4j 2.x to Spark
> --
>
> Key: SPARK-6305
> URL: https://issues.apache.org/jira/browse/SPARK-6305
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Reporter: Tal Sliwowicz
>Priority: Minor
>
> log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
> classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2016-01-21 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111044#comment-15111044
 ] 

Dan Dutrow commented on SPARK-11045:


+1 to Dibyendu's comment that "Being at spark-packages, many [people do] not 
even consider using it [and] use the whatever Receiver Based model which is 
documented with Spark." Having hit limitations in both of the Receiver and 
Direct APIs, it would have been nice to have been pointed to the availability 
of alternatives.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2388) Streaming from multiple different Kafka topics is problematic

2015-12-09 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048885#comment-15048885
 ] 

Dan Dutrow commented on SPARK-2388:
---

The Kafka Direct API lets you insert a callback function that gives you access 
to the topic name and other metadata besides they key.

> Streaming from multiple different Kafka topics is problematic
> -
>
> Key: SPARK-2388
> URL: https://issues.apache.org/jira/browse/SPARK-2388
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Sergey
> Fix For: 1.0.1
>
>
> Default way of creating stream out of Kafka source would be as
> val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
> Map("retarget" -> 2,"datapair" -> 2))
> However, if two topics - in this case "retarget" and "datapair" - are very 
> different, there is no way to set up different filter, mapping functions, 
> etc), as they are effectively merged.
> However, instance of KafkaInputDStream, created with this call internally 
> calls ConsumerConnector.createMessageStream() which returns *map* of 
> KafkaStreams, keyed by topic. It would be great if this map would be exposed 
> somehow, so aforementioned call 
> val streamS = KafkaUtils.createStreamS(...)
> returned map of streams.
> Regards,
> Sergey Malov
> Collective Media



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6051) Add an option for DirectKafkaInputDStream to commit the offsets into ZK

2015-12-09 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15048833#comment-15048833
 ] 

Dan Dutrow commented on SPARK-6051:
---

Is there documentation for how to update the metrics (#messages per batch) in 
the Spark Streaming tab when using the Direct API? Does the Streaming tab get 
its information from Zookeeper or something else internally?

> Add an option for DirectKafkaInputDStream to commit the offsets into ZK
> ---
>
> Key: SPARK-6051
> URL: https://issues.apache.org/jira/browse/SPARK-6051
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Saisai Shao
>
> Currently in DirectKafkaInputDStream, offset is managed by Spark Streaming  
> itself without ZK or Kafka involved, which will make several third-party 
> offset monitoring tools fail to monitor the status of Kafka consumer. So here 
> as a option to commit the offset to ZK when each job is finished, the process 
> is implemented as a asynchronized way, so the main processing flow will not 
> be blocked, already tested with KafkaOffsetMonitor tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038106#comment-15038106
 ] 

Dan Dutrow commented on SPARK-12103:


Thanks. I don't mean to be a pain. I did make an honest attempt to figure it 
out and I'm not completely numb, just inexperienced with Kafka. I was trying to 
find a better way to pool resources and not have to dedicate a core to each 
receiver. Having the key be the topic name was wishful thinking and nothing in 
the documentation corrected that assumption.


> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955
 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/3/15 4:16 PM:
-

I'm sure all of this is obvious to Kafka experts. Nowhere in the Spark scala 
function documentation does it say that the Left value is the Kafka key, except 
if you make a guess about what K is. You have to trace through Spark and Kafka 
code to figure it out. Please update the KafkaWordCount.scala and 
DirectKafkaWordCount.scala to include a comment about what is in the ._1 field.


was (Author: dutrow):
I'm sure all of this is obvious to Kafka experts. Nowhere in the Scala Spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the KafkaWordCount.scala and 
DirectKafkaWordCount.scala to include a comment about what is in the ._1 field.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955
 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/3/15 4:15 PM:
-

I'm sure all of this is obvious to Kafka experts. Nowhere in the Scala Spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the KafkaWordCount.scala and 
DirectKafkaWordCount.scala to include a comment about what is in the ._1 field.


was (Author: dutrow):
I'm sure all of this is obvious to Kafka experts. Nowhere in the spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the KafkaWordCount.scala and 
DirectKafkaWordCount.scala to include a comment about what is in the ._1 field.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow reopened SPARK-12103:


Please update the Kafka word count examples to comment on what the key field 
is. The current examples ignore the key without explanation.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955
 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/3/15 3:53 PM:
-

I'm sure all of this is obvious to Kafka experts. Nowhere in the spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the KafkaWordCount.scala and 
DirectKafkaWordCount.scala to include a comment about what is in the ._1 field.


was (Author: dutrow):
I'm sure all of this is obvious to Kafka experts. Nowhere in the spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the DirectKafkaWordCount.scala to include a 
comment about what is in the ._1 field.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037955#comment-15037955
 ] 

Dan Dutrow commented on SPARK-12103:


I'm sure all of this is obvious to Kafka experts. Nowhere in the spark 
documentation does it say that the Left value is the Kafka key, except if you 
make a guess about what K is. You have to trace through Spark and Kafka code to 
figure it out. Please update the DirectKafkaWordCount.scala to include a 
comment about what is in the ._1 field.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15038007#comment-15038007
 ] 

Dan Dutrow commented on SPARK-12103:


Aha! I finally found where it's documented! It's in the Java API javadoc! I'm 
not in the habit of looking at the API for functions and languages I'm not 
using, but this will teach me!

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow closed SPARK-12103.
--
Resolution: Won't Fix

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Description: 
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER((

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE




  was:
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER((

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE





> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Dan Dutrow
> Fix For: 1.0.1
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER((
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Fix Version/s: (was: 1.0.1)
   1.4.2

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Affects Version/s: 1.1.0
   1.2.0
   1.3.0
   1.4.0
   1.4.1

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Description: 
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER))

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE




  was:
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER((

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE





> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Dan Dutrow
> Fix For: 1.0.1
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)
Dan Dutrow created SPARK-12103:
--

 Summary: KafkaUtils createStream with multiple topics -- does not 
work as expected
 Key: SPARK-12103
 URL: https://issues.apache.org/jira/browse/SPARK-12103
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0
Reporter: Dan Dutrow
 Fix For: 1.0.1


Default way of creating stream out of Kafka source would be as

val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2,"datapair" -> 2))

However, if two topics - in this case "retarget" and "datapair" - are very 
different, there is no way to set up different filter, mapping functions, etc), 
as they are effectively merged.

However, instance of KafkaInputDStream, created with this call internally calls 
ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, 
keyed by topic. It would be great if this map would be exposed somehow, so 
aforementioned call 

val streamS = KafkaUtils.createStreamS(...)

returned map of streams.

Regards,
Sergey Malov
Collective Media



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Description: 
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER((

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE




  was:
Default way of creating stream out of Kafka source would be as

val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2,"datapair" -> 2))

However, if two topics - in this case "retarget" and "datapair" - are very 
different, there is no way to set up different filter, mapping functions, etc), 
as they are effectively merged.

However, instance of KafkaInputDStream, created with this call internally calls 
ConsumerConnector.createMessageStream() which returns *map* of KafkaStreams, 
keyed by topic. It would be great if this map would be exposed somehow, so 
aforementioned call 

val streamS = KafkaUtils.createStreamS(...)

returned map of streams.

Regards,
Sergey Malov
Collective Media


> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Dan Dutrow
> Fix For: 1.0.1
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[STring, STring, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER((
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Target Version/s: 1.4.2, 1.6.1  (was: 1.0.1)

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-12103:
---
Description: 
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to use one (or a few) Kafka Streaming Receiver to pool 
resources. I have 10+ topics and don't want to dedicate 10 cores to processing 
all of them. However, when reading the data procuced by 
KafkaUtils.createStream, the DStream[(String,String)] does not properly insert 
the topic name into the tuple. The left-key always null, making it impossible 
to know what topic that data came from other than stashing your key into the 
value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER))

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE




  was:
(Note: yes, there is a Direct API that may be better, but it's not the easiest 
thing to get started with. The Kafka Receiver API still needs to work, 
especially for newcomers)

When creating a receiver stream using KafkaUtils, there is a valid use case 
where you would want to pool resources. I have 10+ topics and don't want to 
dedicate 10 cores to processing all of them. However, when reading the data 
procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
properly insert the topic name into the tuple. The left-key always null, making 
it impossible to know what topic that data came from other than stashing your 
key into the value.  Is there a way around that problem?

 CODE

val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
"topicE" -> 1, "topicF" -> 1, ...)

val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i 
=>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, consumerProperties,
topics,
StorageLevel.MEMORY_ONLY_SER))

val unioned :DStream[(String,String)] = ssc.union(streams)

unioned.flatMap(x => {

   val (key, value) = x
  // key is always null!
  // value has data from any one of my topics

  key match ... {
  ..
  }

}

 END CODE





> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036741#comment-15036741
 ] 

Dan Dutrow commented on SPARK-12103:


One possible way around this problem would be to stick the topic name into the 
message key. Unless the message key is supposed to be unique, then this would 
allow you to retrieve the topic name from the data.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036625#comment-15036625
 ] 

Dan Dutrow commented on SPARK-12103:


After digging into the Kafka code some more (specifically 
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and 
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple 
is not the topic name but rather a key that Kafka puts on each message. See 
http://kafka.apache.org/documentation.html#compaction

I don't see a way around this without hacking KafkaStream and ConsumerIterator 
to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to pool resources. I have 10+ topics and don't want to 
> dedicate 10 cores to processing all of them. However, when reading the data 
> procuced by KafkaUtils.createStream, the DStream[(String,String)] does not 
> properly insert the topic name into the tuple. The left-key always null, 
> making it impossible to know what topic that data came from other than 
> stashing your key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-02 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036625#comment-15036625
 ] 

Dan Dutrow edited comment on SPARK-12103 at 12/2/15 10:38 PM:
--

After digging into the Kafka code some more (specifically 
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and 
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple 
is not the topic name but rather a key that Kafka puts on each message. See 
http://kafka.apache.org/documentation.html#impl_producer

I don't see a way around this without hacking KafkaStream and ConsumerIterator 
to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.


was (Author: dutrow):
After digging into the Kafka code some more (specifically 
kafka.consumer.KafkaStream, kafka.consumer.ConsumerIterator and 
kafka.message.MessageAndMetadata), it appears that the Left value of the tuple 
is not the topic name but rather a key that Kafka puts on each message. See 
http://kafka.apache.org/documentation.html#compaction

I don't see a way around this without hacking KafkaStream and ConsumerIterator 
to return the topic name instead of the message key.

The return value should probably be clarified in the documentation.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0, 1.2.0, 1.3.0, 1.4.0, 1.4.1
>Reporter: Dan Dutrow
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697157#comment-14697157
 ] 

Dan Dutrow commented on SPARK-9947:
---

The desire is to continue using checkpointing for everything but allow 
selective deleting of certain types of checkpoint data. Using an external 
database to duplicate that functionality is not desired.

 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey and 24/7 operation functionality, you encounter 
 the problem where you might like to maintain state data between restarts but 
 delete the metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697180#comment-14697180
 ] 

Dan Dutrow commented on SPARK-9947:
---

I want to maintain the data in updateStateByKey between upgrades. This data can 
be recovered between upgrades so long as objects don't change.

 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey and 24/7 operation functionality, you encounter 
 the problem where you might like to maintain state data between restarts but 
 delete the metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14697220#comment-14697220
 ] 

Dan Dutrow commented on SPARK-9947:
---

Ok, right, we're saving the same state that's outputted from the 
updateStateByKey to HDFS. The thought is that maybe updateStateByKey is saving 
the exact same data in the checkpoint as I have to do in my own function. 
Allowing separation of the different types of data stored in the checkpoint 
data might allow me to not have to save the same state data again.

 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey and 24/7 operation functionality, you encounter 
 the problem where you might like to maintain state data between restarts but 
 delete the metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-13 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-9947:
--
Priority: Major  (was: Minor)

 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey functionality, you encounter the problem where 
 you might like to maintain state data between restarts but delete the 
 metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-13 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-9947:
--
Description: 
Problem: When updating an application that has checkpointing enabled to support 
the updateStateByKey and 24/7 operation functionality, you encounter the 
problem where you might like to maintain state data between restarts but delete 
the metadata containing execution state. 

If checkpoint data exists between code redeployment, the program may not 
execute properly or at all. My current workaround for this issue is to wrap 
updateStateByKey with my own function that persists the state after every 
update to my own separate directory. (That allows me to delete the checkpoint 
with its metadata before redeploying) Then, when I restart the application, I 
initialize the state with this persisted data. This incurs additional overhead 
due to persisting of the same data twice: once in the checkpoint and once in my 
persisted data folder. 

If Kafka Direct API offsets could be stored in another separate checkpoint 
directory, that would help address the problem of having to blow that away 
between code redeployment as well.

  was:
Problem: When updating an application that has checkpointing enabled to support 
the updateStateByKey functionality, you encounter the problem where you might 
like to maintain state data between restarts but delete the metadata containing 
execution state. 

If checkpoint data exists between code redeployment, the program may not 
execute properly or at all. My current workaround for this issue is to wrap 
updateStateByKey with my own function that persists the state after every 
update to my own separate directory. (That allows me to delete the checkpoint 
with its metadata before redeploying) Then, when I restart the application, I 
initialize the state with this persisted data. This incurs additional overhead 
due to persisting of the same data twice: once in the checkpoint and once in my 
persisted data folder. 

If Kafka Direct API offsets could be stored in another separate checkpoint 
directory, that would help address the problem of having to blow that away 
between code redeployment as well.


 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey and 24/7 operation functionality, you encounter 
 the problem where you might like to maintain state data between restarts but 
 delete the metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-13 Thread Dan Dutrow (JIRA)
Dan Dutrow created SPARK-9947:
-

 Summary: Separate Metadata and State Checkpoint Data
 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Dan Dutrow


This is the proposal. 

The simpler direct API (the one that does not take explicit offsets) can be 
modified to also pick up the initial offset from ZK if group.id is specified. 
This is exactly similar to how we find the latest or earliest offset in that 
API, just that instead of latest/earliest offset of the topic we want to find 
the offset from the consumer group. The group offsets is ZK is not used at all 
for any further processing and restarting, so the exactly-once semantics is not 
broken. 

The use case where this is useful is simplified code upgrade. If the user wants 
to upgrade the code, he/she can the context stop gracefully which will ensure 
the ZK consumer group offset will be updated with the last offsets processed. 
Then the new code is started (not restarted from checkpoint) can pickup  the 
consumer group offset from ZK and continue where the previous code had left 
off. 

Without the functionality of picking up consumer group offsets to start (that 
is, currently) the only way to do this is for the users to save the offsets 
somewhere (file, database, etc.) and manage the offsets themselves. I just want 
to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-08-13 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695887#comment-14695887
 ] 

Dan Dutrow edited comment on SPARK-6249 at 8/13/15 8:34 PM:


I have read many of the tickets, documentation, and source code relating to 
this subject, but haven't found a solid answer to what appears to be the common 
question of how to reinitialize a Kafka Direct API consumer after a restart. I 
have a rapidly evolving application that uses checkpoints and the like for 24/7 
operations. However, when we deploy a new jar, we have to blow away the 
checkpoint because otherwise the changes won't take affect cleanly. We have an 
approach to maintain the data between restarts, but not the Kafka offset, and 
doing the Kafka bookkeeping in our application requires a deeper knowledge and 
proficiency to make sure that is done correctly. Not being familiar with the 
ZooKeeper API, I was hoping for a Spark-abstracted approach where I can 
configure something in the API or with a parameter like: auto.offset.reset - 
remember to do the best it can to resume processing at the previous offset 
location (for that consumer group). Short of that, a link to a blog, document, 
or example in GitHub that describes the ZK API to accomplish this functionality 
in detail would be greatly appreciated.

On a related topic, it would be awesome if the Streaming Statistics page could 
be updated so that you can review the message rates while using the Kafka 
Direct API. If making that work is another Zookeeper trick, some example code 
would be immensely helpful.

http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tt24246.html


was (Author: dutrow):
I have read many of the tickets, documentation, and source code relating to 
this subject, but haven't found a solid answer to what appears to be the common 
question of how to reinitialize a Kafka Direct API consumer after a restart. I 
have a rapidly evolving application that uses checkpoints and the like for 24/7 
operations. However, when we deploy a new jar, we have to blow away the 
checkpoint because otherwise the changes won't take affect cleanly. We have an 
approach to maintain the data between restarts, but not the Kafka offset, and 
doing the Kafka bookkeeping in our application requires a deeper knowledge and 
proficiency to make sure that is done correctly. Not being familiar with the 
ZooKeeper API, I was hoping for a Spark-abstracted approach where I can 
configure something in the API or with a parameter like: auto.offset.reset - 
remember to do the best it can to resume processing at the previous offset 
location (for that consumer group). Short of that, a link to a blog, document, 
or example in GitHub that describes the ZK API to accomplish this functionality 
in detail would be greatly appreciated.

On a related topic, it would be awesome if the Streaming Statistics page could 
be updated so that you can review the message rates while using the Kafka 
Direct API. If making that work is another Zookeeper trick, some example code 
would be immensely helpful.

 Get Kafka offsets from consumer group in ZK when using direct stream
 

 Key: SPARK-6249
 URL: https://issues.apache.org/jira/browse/SPARK-6249
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Tathagata Das

 This is the proposal. 
 The simpler direct API (the one that does not take explicit offsets) can be 
 modified to also pick up the initial offset from ZK if group.id is specified. 
 This is exactly similar to how we find the latest or earliest offset in that 
 API, just that instead of latest/earliest offset of the topic we want to find 
 the offset from the consumer group. The group offsets is ZK is not used at 
 all for any further processing and restarting, so the exactly-once semantics 
 is not broken. 
 The use case where this is useful is simplified code upgrade. If the user 
 wants to upgrade the code, he/she can the context stop gracefully which will 
 ensure the ZK consumer group offset will be updated with the last offsets 
 processed. Then the new code is started (not restarted from checkpoint) can 
 pickup  the consumer group offset from ZK and continue where the previous 
 code had left off. 
 Without the functionality of picking up consumer group offsets to start (that 
 is, currently) the only way to do this is for the users to save the offsets 
 somewhere (file, database, etc.) and manage the offsets themselves. I just 
 want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional 

[jira] [Updated] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-13 Thread Dan Dutrow (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dan Dutrow updated SPARK-9947:
--
 Flags: Important
 Affects Version/s: 1.4.1
  Target Version/s: 1.5.0  (was: 1.4.0)
Remaining Estimate: 168h
 Original Estimate: 168h
  Priority: Minor  (was: Major)
   Description: 
Problem: When updating an application that has checkpointing enabled to support 
the updateStateByKey functionality, you encounter the problem where you might 
like to maintain state data between restarts but delete the metadata containing 
execution state. 

If checkpoint data exists between code redeployment, the program may not 
execute properly or at all. My current workaround for this issue is to wrap 
updateStateByKey with my own function that persists the state after every 
update to my own separate directory. (That allows me to delete the checkpoint 
with its metadata before redeploying) Then, when I restart the application, I 
initialize the state with this persisted data. This incurs additional overhead 
due to persisting of the same data twice: once in the checkpoint and once in my 
persisted data folder. 

If Kafka Direct API offsets could be stored in another separate checkpoint 
directory, that would help address the problem of having to blow that away 
between code redeployment as well.

  was:
This is the proposal. 

The simpler direct API (the one that does not take explicit offsets) can be 
modified to also pick up the initial offset from ZK if group.id is specified. 
This is exactly similar to how we find the latest or earliest offset in that 
API, just that instead of latest/earliest offset of the topic we want to find 
the offset from the consumer group. The group offsets is ZK is not used at all 
for any further processing and restarting, so the exactly-once semantics is not 
broken. 

The use case where this is useful is simplified code upgrade. If the user wants 
to upgrade the code, he/she can the context stop gracefully which will ensure 
the ZK consumer group offset will be updated with the last offsets processed. 
Then the new code is started (not restarted from checkpoint) can pickup  the 
consumer group offset from ZK and continue where the previous code had left 
off. 

Without the functionality of picking up consumer group offsets to start (that 
is, currently) the only way to do this is for the users to save the offsets 
somewhere (file, database, etc.) and manage the offsets themselves. I just want 
to simplify this process. 


 Separate Metadata and State Checkpoint Data
 ---

 Key: SPARK-9947
 URL: https://issues.apache.org/jira/browse/SPARK-9947
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.4.1
Reporter: Dan Dutrow
Priority: Minor
   Original Estimate: 168h
  Remaining Estimate: 168h

 Problem: When updating an application that has checkpointing enabled to 
 support the updateStateByKey functionality, you encounter the problem where 
 you might like to maintain state data between restarts but delete the 
 metadata containing execution state. 
 If checkpoint data exists between code redeployment, the program may not 
 execute properly or at all. My current workaround for this issue is to wrap 
 updateStateByKey with my own function that persists the state after every 
 update to my own separate directory. (That allows me to delete the checkpoint 
 with its metadata before redeploying) Then, when I restart the application, I 
 initialize the state with this persisted data. This incurs additional 
 overhead due to persisting of the same data twice: once in the checkpoint and 
 once in my persisted data folder. 
 If Kafka Direct API offsets could be stored in another separate checkpoint 
 directory, that would help address the problem of having to blow that away 
 between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-08-13 Thread Dan Dutrow (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695887#comment-14695887
 ] 

Dan Dutrow commented on SPARK-6249:
---

I have read many of the tickets, documentation, and source code relating to 
this subject, but haven't found a solid answer to what appears to be the common 
question of how to reinitialize a Kafka Direct API consumer after a restart. I 
have a rapidly evolving application that uses checkpoints and the like for 24/7 
operations. However, when we deploy a new jar, we have to blow away the 
checkpoint because otherwise the changes won't take affect cleanly. We have an 
approach to maintain the data between restarts, but not the Kafka offset, and 
doing the Kafka bookkeeping in our application requires a deeper knowledge and 
proficiency to make sure that is done correctly. Not being familiar with the 
ZooKeeper API, I was hoping for a Spark-abstracted approach where I can 
configure something in the API or with a parameter like: auto.offset.reset - 
remember to do the best it can to resume processing at the previous offset 
location (for that consumer group). Short of that, a link to a blog, document, 
or example in GitHub that describes the ZK API to accomplish this functionality 
in detail would be greatly appreciated.

On a related topic, it would be awesome if the Streaming Statistics page could 
be updated so that you can review the message rates while using the Kafka 
Direct API. If making that work is another Zookeeper trick, some example code 
would be immensely helpful.

 Get Kafka offsets from consumer group in ZK when using direct stream
 

 Key: SPARK-6249
 URL: https://issues.apache.org/jira/browse/SPARK-6249
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Tathagata Das

 This is the proposal. 
 The simpler direct API (the one that does not take explicit offsets) can be 
 modified to also pick up the initial offset from ZK if group.id is specified. 
 This is exactly similar to how we find the latest or earliest offset in that 
 API, just that instead of latest/earliest offset of the topic we want to find 
 the offset from the consumer group. The group offsets is ZK is not used at 
 all for any further processing and restarting, so the exactly-once semantics 
 is not broken. 
 The use case where this is useful is simplified code upgrade. If the user 
 wants to upgrade the code, he/she can the context stop gracefully which will 
 ensure the ZK consumer group offset will be updated with the last offsets 
 processed. Then the new code is started (not restarted from checkpoint) can 
 pickup  the consumer group offset from ZK and continue where the previous 
 code had left off. 
 Without the functionality of picking up consumer group offsets to start (that 
 is, currently) the only way to do this is for the users to save the offsets 
 somewhere (file, database, etc.) and manage the offsets themselves. I just 
 want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org