[jira] [Commented] (SPARK-12775) Couldn't find leader offsets exception when hostname can't be resolved

2016-01-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15096550#comment-15096550
 ] 

Cody Koeninger commented on SPARK-12775:


Couldn't find leader offsets ... because you couldn't resolve the leader 
hostname... doesn't seem that misleading to me.

If you want to be able to improve the error message, can you post the full 
stacktrace?

> Couldn't find leader offsets exception when hostname can't be resolved
> --
>
> Key: SPARK-12775
> URL: https://issues.apache.org/jira/browse/SPARK-12775
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Sebastian Piu
>Priority: Minor
>
> When hostname resolution fails for a broker an unclear/misleading error is 
> shown:
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for 
> Set([mytopic,0], [mytopic,18], [mytopic,12], [mytopic,6])
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>   at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> Error above ocurred when a broker was added to the cluster and my machine 
> could not resolve its hostname



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12693) OffsetOutOfRangeException caused by retention

2016-01-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089306#comment-15089306
 ] 

Cody Koeninger commented on SPARK-12693:


If an executor is given a range of kakfa offsets that no longer exist in kafka, 
it's going to error.

It doesn't matter whether this is caused by you setting kafka retention 
parameters to several orders of magnitude smaller than default, or deleting 
offsets from kafka between the time when the driver created an rdd and the 
executor started to compute it, or trying to restart from a checkpoint that has 
old offsets, or manually creating an RDD for offsets that don't exist, or 
whatever else.  If the data doesn't exist any more, the only reasonable thing 
to do is error.  You don't really need to experiment with all the different 
ways you can cause this to happen, the documentation pretty explicitly says you 
need adequate kafka retention.

Regarding your most recent comment, as the documentation says, 'If __not__ 
starting from a checkpoint, "auto.offset.reset" may be set to "largest" or 
"smallest" to determine where the stream starts'.  You're starting from a 
checkpoint, you're already specifying where you want the stream to start from, 
and if those offsets no longer exist, it's not going to work.

As I said, changing this would impact correctness (automatically retrying with 
different offsets would mean silently losing data) and performance (the count 
optimizations assume that the messages specified in the offset range are 
actually the messages that will be processed), so I don't see this being likely 
to change.  It's your job to make sure kafka has the data you're asking for.



> OffsetOutOfRangeException caused by retention
> -
>
> Key: SPARK-12693
> URL: https://issues.apache.org/jira/browse/SPARK-12693
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.0
> Environment: Ubuntu 64bit, Intel i7
>Reporter: Rado Buransky
>Priority: Minor
>  Labels: kafka
> Attachments: kafka-log.txt, log.txt
>
>
> I am running Kafka server locally with extremely low retention of 3 seconds 
> and with 1 second segmentation. I create direct Kafka stream with 
> auto.offset.reset = smallest. 
> In case of bad luck (happens actually quite often in my case) the smallest 
> offset retrieved druing stream initialization doesn't already exists when 
> streaming actually starts.
> Complete source code of the Spark Streaming application is here:
> https://github.com/pygmalios/spark-checkpoint-experience/blob/cb27ab83b7a29e619386b56e68a755d7bd73fc46/src/main/scala/com/pygmalios/sparkCheckpointExperience/spark/SparkApp.scala
> The application ends in an endless loop trying to get that non-existing 
> offset and has to be killed. Check attached logs from Spark and also from 
> Kafka server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12693) OffsetOutOfRangeException cause by retention

2016-01-07 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15088725#comment-15088725
 ] 

Cody Koeninger commented on SPARK-12693:


What is your actual use case for changing the log retention check from the
default 5 minutes down to 100 millis?

As I said originally, if you have such a short retention time, and want to
avoid the issue, use offset reset largest rather than smallest.  Even then
you need to make sure your batch time is small enough and you never lag
behind on processing.

On Thu, Jan 7, 2016 at 10:51 PM, Rado Buransky (JIRA) 



> OffsetOutOfRangeException cause by retention
> 
>
> Key: SPARK-12693
> URL: https://issues.apache.org/jira/browse/SPARK-12693
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.0
> Environment: Ubuntu 64bit, Intel i7
>Reporter: Rado Buransky
>Priority: Minor
>  Labels: kafka
> Attachments: kafka-log.txt, log.txt
>
>
> I am running Kafka server locally with extremely low retention of 3 seconds 
> and with 1 second segmentation. I create direct Kafka stream with 
> auto.offset.reset = smallest. 
> In case of bad luck (happens actually quite often in my case) the smallest 
> offset retrieved druing stream initialization doesn't already exists when 
> streaming actually starts.
> Complete source code of the Spark Streaming application is here:
> https://github.com/pygmalios/spark-checkpoint-experience/blob/cb27ab83b7a29e619386b56e68a755d7bd73fc46/src/main/scala/com/pygmalios/sparkCheckpointExperience/spark/SparkApp.scala
> The application ends in an endless loop trying to get that non-existing 
> offset and has to be killed. Check attached logs from Spark and also from 
> Kafka server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10963) Make KafkaCluster api public

2016-01-02 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076536#comment-15076536
 ] 

Cody Koeninger commented on SPARK-10963:


There's a nonzero chance that the 0.9 integration will be a totally
separate subproject.



> Make KafkaCluster api public
> 
>
> Key: SPARK-10963
> URL: https://issues.apache.org/jira/browse/SPARK-10963
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Cody Koeninger
>Priority: Minor
>
> per mailing list discussion, theres enough interest in people using 
> KafkaCluster (e.g. to access latest offsets) to justify making it public



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12524) Group by key in a pairrdd without any shuffle

2015-12-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071884#comment-15071884
 ] 

Cody Koeninger commented on SPARK-12524:


mapPartitions is just giving you an iterator, what you do with it is up to
you. Have you actually tried it for your use case?



> Group by key in a pairrdd without any shuffle
> -
>
> Key: SPARK-12524
> URL: https://issues.apache.org/jira/browse/SPARK-12524
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, Java API
>Affects Versions: 1.5.2
>Reporter: Shushant Arora
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> In a PairRDD. When we are all values of same key are in same partition 
> and want to perform group by key locally and no reduce/aggregation operation 
> afterwords just further tranformation on grouped rdd. There is no facility 
> for that. We have to perform shuffle which is costly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12524) Group by key in a pairrdd without any shuffle

2015-12-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071872#comment-15071872
 ] 

Cody Koeninger commented on SPARK-12524:


Why can't you just use mapPartitions?

> Group by key in a pairrdd without any shuffle
> -
>
> Key: SPARK-12524
> URL: https://issues.apache.org/jira/browse/SPARK-12524
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, Java API
>Affects Versions: 1.5.2
>Reporter: Shushant Arora
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> In a PairRDD. When we are all values of same key are in same partition 
> and want to perform group by key locally and no reduce/aggregation operation 
> afterwords just further tranformation on grouped rdd. There is no facility 
> for that. We have to perform shuffle which is costly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-12-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15069101#comment-15069101
 ] 

Cody Koeninger commented on SPARK-11045:


Direct stream isn't doing any caching unless you specifically ask for it,
in which case you can set storage level
On Dec 22, 2015 9:30 PM, "Balaji Ramamoorthy (JIRA)" 



> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062313#comment-15062313
 ] 

Cody Koeninger commented on SPARK-12177:


Honestly SSL / auth is the only compelling thing about 0.9, not sure it
makes sense to have 0.9 it without it.

On Wed, Dec 16, 2015 at 3:16 AM, Nikita Tarasenko (JIRA) 



> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges

2015-12-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048938#comment-15048938
 ] 

Cody Koeninger commented on SPARK-9059:
---

To be clear, I think SPARK-8389 should make it possible to write an equivalent 
python snippet, there's just not one in the docs.  jerryshao would know better.

> Update Python Direct Kafka Word count examples to show the use of 
> HasOffsetRanges
> -
>
> Key: SPARK-9059
> URL: https://issues.apache.org/jira/browse/SPARK-9059
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Tathagata Das
>Priority: Minor
>  Labels: starter
>
> Update Python examples of Direct Kafka word count to access the offset ranges 
> using HasOffsetRanges and print it. For example in Scala,
>  
> {code}
> var offsetRanges: Array[OffsetRange] = _
> ...
> directKafkaDStream.foreachRDD { rdd => 
> offsetRanges = rdd.asInstanceOf[HasOffsetRanges]  
> }
> ...
> transformedDStream.foreachRDD { rdd => 
> // some operation
> println("Processed ranges: " + offsetRanges)
> }
> {code}
> See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for 
> more info, and the master source code for more updated information on python. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges

2015-12-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048904#comment-15048904
 ] 

Cody Koeninger edited comment on SPARK-9059 at 12/9/15 4:19 PM:


HasOffsetRanges is explained here

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

There's not an equivalent python code snippet like the java and scala ones.



was (Author: c...@koeninger.org):
HasOffsetRanges is explained here

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

There's not an equivalent python code snippet for the java and scala ones.


> Update Python Direct Kafka Word count examples to show the use of 
> HasOffsetRanges
> -
>
> Key: SPARK-9059
> URL: https://issues.apache.org/jira/browse/SPARK-9059
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Tathagata Das
>Priority: Minor
>  Labels: starter
>
> Update Python examples of Direct Kafka word count to access the offset ranges 
> using HasOffsetRanges and print it. For example in Scala,
>  
> {code}
> var offsetRanges: Array[OffsetRange] = _
> ...
> directKafkaDStream.foreachRDD { rdd => 
> offsetRanges = rdd.asInstanceOf[HasOffsetRanges]  
> }
> ...
> transformedDStream.foreachRDD { rdd => 
> // some operation
> println("Processed ranges: " + offsetRanges)
> }
> {code}
> See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for 
> more info, and the master source code for more updated information on python. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9059) Update Python Direct Kafka Word count examples to show the use of HasOffsetRanges

2015-12-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048904#comment-15048904
 ] 

Cody Koeninger commented on SPARK-9059:
---

HasOffsetRanges is explained here

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

There's not an equivalent python code snippet for the java and scala ones.


> Update Python Direct Kafka Word count examples to show the use of 
> HasOffsetRanges
> -
>
> Key: SPARK-9059
> URL: https://issues.apache.org/jira/browse/SPARK-9059
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Tathagata Das
>Priority: Minor
>  Labels: starter
>
> Update Python examples of Direct Kafka word count to access the offset ranges 
> using HasOffsetRanges and print it. For example in Scala,
>  
> {code}
> var offsetRanges: Array[OffsetRange] = _
> ...
> directKafkaDStream.foreachRDD { rdd => 
> offsetRanges = rdd.asInstanceOf[HasOffsetRanges]  
> }
> ...
> transformedDStream.foreachRDD { rdd => 
> // some operation
> println("Processed ranges: " + offsetRanges)
> }
> {code}
> See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for 
> more info, and the master source code for more updated information on python. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6051) Add an option for DirectKafkaInputDStream to commit the offsets into ZK

2015-12-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048895#comment-15048895
 ] 

Cody Koeninger commented on SPARK-6051:
---

Responded on the mailing list, but for posterity's sake:

Which version of spark are you on?  I thought that was added to the spark UI in 
recent versions.

DIrect api doesn't have any inherent interaction with zookeeper.  If you need 
number of messages per batch and aren't on a recent enough version of spark to 
see them in the ui, you can get them programmatically from the offset ranges.  
See the definition of count() in recent versions of KafkaRDD for an example.

> Add an option for DirectKafkaInputDStream to commit the offsets into ZK
> ---
>
> Key: SPARK-6051
> URL: https://issues.apache.org/jira/browse/SPARK-6051
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Saisai Shao
>
> Currently in DirectKafkaInputDStream, offset is managed by Spark Streaming  
> itself without ZK or Kafka involved, which will make several third-party 
> offset monitoring tools fail to monitor the status of Kafka consumer. So here 
> as a option to commit the offset to ZK when each job is finished, the process 
> is implemented as a asynchronized way, so the main processing flow will not 
> be blocked, already tested with KafkaOffsetMonitor tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12203) Add KafkaDirectInputDStream that directly pulls messages from Kafka Brokers using receivers

2015-12-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048879#comment-15048879
 ] 

Cody Koeninger commented on SPARK-12203:


Commented on the PR.  I don't think this makes sense for inclusion in spark, at 
least in its current state.

I think efforts towards minimizing latency of the direct stream (assuming that 
just tuning your batch sizes smaller isn't sufficient) would be better spent 
pursuing pre-fetching / caching on the executors... but that's a noticeable 
increase in complexity.

> Add KafkaDirectInputDStream that directly pulls messages from Kafka Brokers 
> using receivers
> ---
>
> Key: SPARK-12203
> URL: https://issues.apache.org/jira/browse/SPARK-12203
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Liang-Chi Hsieh
>
> Currently, we have DirectKafkaInputDStream, which directly pulls messages 
> from Kafka Brokers without any receivers, and KafkaInputDStream, which pulls 
> messages from a Kafka Broker using receiver with zookeeper.
> As we observed, because DirectKafkaInputDStream retrieves messages from Kafka 
> after each batch finishes, it posts a latency compared with KafkaInputDStream 
> that continues to pull messages during each batch window.
> So we try to add KafkaDirectInputDStream that directly pulls messages from 
> Kafka Brokers as DirectKafkaInputDStream, but it uses receivers as 
> KafkaInputDStream and pulls messages during each batch window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046975#comment-15046975
 ] 

Cody Koeninger commented on SPARK-12177:


I really think this needs to be handled as a separate subproject, or otherwise 
in a fully backwards compatible way.  The new consumer api will require 
upgrading kafka brokers, which is a big ask just in order for people to upgrade 
spark versions.

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reopened SPARK-12103:


PR for doc change:

https://github.com/apache/spark/pull/10132

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15038084#comment-15038084
 ] 

Cody Koeninger commented on SPARK-12103:


On the off chance you're acting in good faith, actually did read the Kafka 
documentation and Spark API documentation, and were still somehow unclear that 
K meant the type of the message Key and V meant the type of the message 
Value... 

I'll submit a pr to add @tparam docs for every type in the KafkaUtils api

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037986#comment-15037986
 ] 

Cody Koeninger commented on SPARK-12103:


Knowing that kafka messages have a key and value isn't an "Kafka expert" thing. 
 Serious question - did you read http://kafka.apache.org/documentation.html  
before posting this?

The api documentation for createStream at 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

keyTypeClass Key type of DStream
valueTypeClass value type of Dstream
keyDecoderClass Type of kafka key decoder
valueDecoderClass Type of kafka value decoder

How is this not clear?

If you have a specific improvement to docs, send a PR

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12103) KafkaUtils createStream with multiple topics -- does not work as expected

2015-12-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037773#comment-15037773
 ] 

Cody Koeninger commented on SPARK-12103:


A cursory review of the Kafka project documentation should reveal that messages 
have a key (used for distribution among partitions) and a value. Why would one 
reasonably expect that Spark documentation referring to a Kafka message key was 
instead supposed to be the message topic?

If you really want the topic name in each item of the rdd, create your separate 
streams, map over them to add the topic name, then union them together into a 
single stream.

> KafkaUtils createStream with multiple topics -- does not work as expected
> -
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
>  CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>   ..
>   }
> }
>  END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12002) offsetRanges attribute missing in Kafka RDD when resuming from checkpoint

2015-11-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029058#comment-15029058
 ] 

Cody Koeninger commented on SPARK-12002:


Just to be clear, this is a only an issue with python?

> offsetRanges attribute missing in Kafka RDD when resuming from checkpoint
> -
>
> Key: SPARK-12002
> URL: https://issues.apache.org/jira/browse/SPARK-12002
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Streaming
>Reporter: Amit Ramesh
>
> SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed 
> the issue of not ending up with non-Kafka RDDs when chaining transforms to 
> Kafka RDDs. It appears that this issue remains for the case where a streaming 
> application using Kafka direct streams is initialized from the checkpoint 
> directory. The following is a representative example where everything works 
> as expected during the first run, but exceptions are thrown on a subsequent 
> run when the context is being initialized from the checkpoint directory.
> {code:title=test_checkpoint.py|language=python}
> from pyspark import SparkContext  
>   
> from pyspark.streaming import StreamingContext
>   
> from pyspark.streaming.kafka import KafkaUtils
>   
> def attach_kafka_metadata(kafka_rdd): 
>   
> offset_ranges = kafka_rdd.offsetRanges()  
>   
>   
>   
> return kafka_rdd  
>   
>   
>   
>   
>   
> def create_context(): 
>   
> sc = SparkContext(appName='kafka-test')   
>   
> ssc = StreamingContext(sc, 10)
>   
> ssc.checkpoint(CHECKPOINT_URI)
>   
>   
>   
> kafka_stream = KafkaUtils.createDirectStream( 
>   
> ssc,  
>   
> [TOPIC],  
>   
> kafkaParams={ 
>   
> 'metadata.broker.list': BROKERS,  
>   
> },
>   
> ) 
>   
> kafka_stream.transform(attach_kafka_metadata).count().pprint()
>   
>   
>   
> return ssc
>   
>   
>   
>   
>   
> if __name__ == "__main__":
>   
> ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)
>   
> ssc.start()   

[jira] [Commented] (SPARK-11693) spark kafka direct streaming exception

2015-11-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004072#comment-15004072
 ] 

Cody Koeninger commented on SPARK-11693:


You've under-provisioned Kafka storage and / or Spark compute capacity.
The result is that data is being deleted before it has been processed.
I personally think the proper response to a system being broken is for it to 
obviously break in a noticeable way, rather than silently giving the wrong 
result.
My recommended way to handle this would be to monitor your stream, and have a 
restart policy that's appropriate for your situation.

If you want to modify the area of the code you noted to silently catch the 
exception and start at the next available offset, you can do so pretty 
straightforwardly (streaming-kafka is an external module so you shouldn't have 
to re-deploy all of spark).  I don't think that's a modification that makes 
sense for the general use case however.

> spark kafka direct streaming exception
> --
>
> Key: SPARK-11693
> URL: https://issues.apache.org/jira/browse/SPARK-11693
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.5.1
>Reporter: xiaoxiaoluo
>Priority: Minor
>
> We are using spark kafka direct streaming in our test enviroment. We have 
> limited the kafka partition size to avoid to exhaust the disk space.So when 
> the speed of data writing to kafka faster than the speed of spark streaming 
> reading data. There will be some exception in spark streaming, and the 
> application will be shut down.
> {noformat}
> 15/11/11 10:17:35 ERROR Executor: Exception in task 0.3 in stage 1626659.0 
> (TID 1134180)
> kafka.common.OffsetOutOfRangeException
>   at sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>   at java.lang.Class.newInstance(Class.java:442)
>   at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 15/11/11 10:17:42 ERROR CoarseGrainedExecutorBackend: Driver 10.1.92.44:49939 
> disassociated! Shutting down.
> {noformat}
> Could streaming get the current smallest offset from this partition? and go 
> on to process streaming data?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-11-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004052#comment-15004052
 ] 

Cody Koeninger commented on SPARK-10320:


This is for practical purposes blocked on SPARK-10963

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11698) Add option to ignore kafka messages that are out of limit rate

2015-11-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15004050#comment-15004050
 ] 

Cody Koeninger commented on SPARK-11698:


That looks like a reasonable way to solve your particular use case.

I think my preference would be to make the strategy for generating the next 
batch of offsets user-configurable in a more general way, rather than adding 
knobs for each possible use case.

Have you seen the discussion in
https://issues.apache.org/jira/browse/SPARK-10320
and would it likely accommodate this modification?

Outside of the specifics of the direct stream, this is also related to having 
different strategies for dealing with backpressure, and I'm not sure what the 
long-term plan there is.

> Add option to ignore kafka messages that are out of limit rate
> --
>
> Key: SPARK-11698
> URL: https://issues.apache.org/jira/browse/SPARK-11698
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Liang-Chi Hsieh
>
> With spark.streaming.kafka.maxRatePerPartition, we can control the max rate 
> limit. However, we can not ignore these messages out of limit. These messages 
> will be consumed in next iteration. We have a use case that we need to ignore 
> these messages and process latest messages in next iteration.
> In other words, we simply want to consume part of messages in each iteration 
> and ignore remaining messages that are not consumed.
> We add an option for this purpose.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11211) Kafka - offsetOutOfRange forces to largest

2015-10-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971834#comment-14971834
 ] 

Cody Koeninger commented on SPARK-11211:


In an attempt to replicate, I used the following:
- spark 1.5.1
- a kafka broker with pathologically small retention and frequent log check / 
deletion
log.retention.bytes=2048
log.segment.bytes=1024
log.retention.check.interval.ms=3
- the IdempotentExample job from 
https://github.com/koeninger/kafka-exactly-once, which has a batch size of 60 
seconds
- simple 1 msg / sec sent from the console producer, e.g.
for i in {1..1}; do echo "`date +%s`"; sleep 1; done |  
bin/kafka-console-producer.sh --broker-list 'localhost:9092' --topic test


This results in (at least the first batch) failing as I'd expect, since kafka 
messages are being deleted before they can be processed. I can see the failed 
batch in the master ui, and there actually is an exception visible on the 
driver (although it's a classNotFound exception caused by deserializing the 
OffsetOutOfRangeException; there's a separate ticket  SPARK-11195 related to 
that).  

This is what I see in the logs on the executor:

15/10/23 15:16:00 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 0 offsets 4727 
-> 4867
15/10/23 15:16:00 INFO KafkaRDD: Computing topic test, partition 1 offsets 2900 
-> 2973
15/10/23 15:16:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
kafka.common.OffsetOutOfRangeException

// (repeats total of 4 times, then that job fails)

15/10/23 15:20:31 INFO KafkaRDD: Computing topic test, partition 0 offsets 4867 
-> 4926
15/10/23 15:20:31 INFO KafkaRDD: Beginning offset 2973 is the same as ending 
offset skipping test 1

You'll notice there isn't any gap in offsets between the batch that failed and 
the next batch.  It's not that the consumer is suddenly skipping to the largest 
value on the kafka topic, it's that the first batch failed.

This is a situation where the underlying assumption (ie that you have enough 
kafka retention) has gone wrong, so the best thing to do in my opinion is fail.

I'm not sure exactly what you'd expect to happen in this case.  Silently 
submitting another batch that duplicates some smaller portion of the first 
batch seems like a really bad idea. I'd like to know when I've lost data, and 
I'd like separate batches to contain strictly separate groups of messages.  

If you want to reduce the scope of the number of messages that can be lost in 
this kind of situation, use maxMessagesPerPartition and / or smaller batch 
sizes.  But in any case, you've lost data because Kafka deleted it.  Changing 
spark behavior won't get you your data back.

If I'm misunderstanding the nature of the problem, please report the version of 
spark you're using, an actual observation of behavior (e.g. log output), and 
the behavior you expect?  What you're reporting is more of a (probably 
inaccurate) theory as to the root cause, as opposed to observed behavior.

> Kafka - offsetOutOfRange forces to largest
> --
>
> Key: SPARK-11211
> URL: https://issues.apache.org/jira/browse/SPARK-11211
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1, 1.5.1
>Reporter: Daniel Strassler
>
> This problem relates to how DStreams using the Direct Approach of connecting 
> to a Kafka topic behave when they request an offset that does not exist on 
> the topic.  Currently it appears the "auto.offset.reset" configuration value 
> is being ignored and the default value of “largest” is always being used.
>  
> When using the Direct Approach of connecting to a Kafka topic using a 
> DStream, even if you have the Kafka configuration "auto.offset.reset" set to 
> smallest, the behavior in the event of a 
> kafka.common.OffsetOutOfRangeException exception is to move the next offset 
> to be consumed value to the largest value on the Kafka topic.  It appears 
> that the exception is being eaten and not propagated up to the driver as 
> well, so a work around triggered by the propagation of the error can not be 
> implemented either.
>  
> The current behavior of setting to largest means that any data on the Kafka 
> topic at the time of the exception being thrown is skipped(lost) to 
> consumption and only data produced to the topic after the exception will be 
> consumed.  Two possible fixes are listed below.
>  
> Fix 1:  When “auto.offset.reset" is set to “smallest”, the DStream should set 
> the next consumed offset to be the smallest offset value on the Kafka topic.
>  
> Fix 2:  Propagate the error to the Driver to allow it to react as it deems 
> appropriate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---

[jira] [Commented] (SPARK-11195) Exception thrown on executor throws ClassNotFound on driver

2015-10-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971783#comment-14971783
 ] 

Cody Koeninger commented on SPARK-11195:


I'm seeing this on 1.5.1 as well

> Exception thrown on executor throws ClassNotFound on driver
> ---
>
> Key: SPARK-11195
> URL: https://issues.apache.org/jira/browse/SPARK-11195
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.1
>Reporter: Hurshal Patel
>
> I have a minimal repro job
> {code:title=Repro.scala}
> package repro
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkException
> class MyException(message: String) extends Exception(message: String)
> object Repro {
>   def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("MyException ClassNotFound Repro")
> val sc = new SparkContext(conf)
> sc.parallelize(List(1)).map { x =>
>   throw new repro.MyException("this is a failure")
>   true
> }.collect()
>   }
> }
> {code}
> On Spark 1.4.1, I get a task failure with the reason correctly set to 
> MyException.
> On Spark 1.5.1, I _expect_ the same behavior, but instead I get a task 
> failure with an UnknownReason caused by ClassNotFoundException.
>  
> here is the job on vanilla Spark 1.4.1:
> {code:title=spark_1.5.1_log}
> $ ./bin/spark-submit --master local --deploy-mode client --class repro.Repro 
> /home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 15/10/19 11:55:20 INFO SparkContext: Running Spark version 1.4.1
> 15/10/19 11:55:21 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 15/10/19 11:55:22 WARN Utils: Your hostname, choochootrain resolves to a 
> loopback address: 127.0.1.1; using 10.0.1.97 instead (on interface wlan0)
> 15/10/19 11:55:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
> another address
> 15/10/19 11:55:22 INFO SecurityManager: Changing view acls to: root
> 15/10/19 11:55:22 INFO SecurityManager: Changing modify acls to: root
> 15/10/19 11:55:22 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(root); users 
> with modify permissions: Set(root)
> 15/10/19 11:55:24 INFO Slf4jLogger: Slf4jLogger started
> 15/10/19 11:55:24 INFO Remoting: Starting remoting
> 15/10/19 11:55:24 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@10.0.1.97:46683]
> 15/10/19 11:55:24 INFO Utils: Successfully started service 'sparkDriver' on 
> port 46683.
> 15/10/19 11:55:24 INFO SparkEnv: Registering MapOutputTracker
> 15/10/19 11:55:24 INFO SparkEnv: Registering BlockManagerMaster
> 15/10/19 11:55:24 INFO DiskBlockManager: Created local directory at 
> /tmp/spark-0348a320-0ca3-4528-9ab5-9ba37d3c2e07/blockmgr-08496143-1d9d-41c8-a581-b6220edf00d5
> 15/10/19 11:55:24 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
> 15/10/19 11:55:25 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-0348a320-0ca3-4528-9ab5-9ba37d3c2e07/httpd-52c396d2-b47f-45a5-bb76-d10aa864e6d5
> 15/10/19 11:55:25 INFO HttpServer: Starting HTTP Server
> 15/10/19 11:55:25 INFO Utils: Successfully started service 'HTTP file server' 
> on port 47915.
> 15/10/19 11:55:25 INFO SparkEnv: Registering OutputCommitCoordinator
> 15/10/19 11:55:25 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 15/10/19 11:55:25 INFO SparkUI: Started SparkUI at http://10.0.1.97:4040
> 15/10/19 11:55:25 INFO SparkContext: Added JAR 
> file:/home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar at 
> http://10.0.1.97:47915/jars/repro-assembly-0.0.1.jar with timestamp 
> 1445280925969
> 15/10/19 11:55:26 INFO Executor: Starting executor ID driver on host localhost
> 15/10/19 11:55:26 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 46569.
> 15/10/19 11:55:26 INFO NettyBlockTransferService: Server created on 46569
> 15/10/19 11:55:26 INFO BlockManagerMaster: Trying to register BlockManager
> 15/10/19 11:55:26 INFO BlockManagerMasterEndpoint: Registering block manager 
> localhost:46569 with 265.4 MB RAM, BlockManagerId(driver, localhost, 46569)
> 15/10/19 11:55:26 INFO BlockManagerMaster: Registered BlockManager
> 15/10/19 11:55:27 INFO SparkContext: Starting job: collect at repro.scala:18
> 15/10/19 11:55:27 INFO DAGScheduler: Got job 0 (collect at repro.scala:18) 
> with 1 output partitions (allowLocal=false)
> 15/10/19 11:55:27 INFO DAGScheduler: Final stage: ResultStage 0(collect at 
> repro.scala:18)
> 15/10/19 11:55:27 INFO DAGScheduler: Parents of f

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952444#comment-14952444
 ] 

Cody Koeninger commented on SPARK-11045:


Direct stream partitions are 1:1 with kafka topicpartitions.  Thus the ordering 
semantics are pretty much what you'd expect from Kafka.  You have to 
proactively do something to change that, like a shuffle, or concurrent jobs, or 
speculative execution.  

As far as failure semantics go, clearly if you don't have exactly once you're 
going to either lose or repeat messages.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952373#comment-14952373
 ] 

Cody Koeninger commented on SPARK-11045:


What you're saying doesn't address my point regarding parallelism.  It's 
certainly possible to take advantage of Kafka ordering guarantees without 
sticking everything in one partition.  E.g. I want events for a given customer 
to have a defined ordering, so I hash partition my topic by customer id.  As a 
kafka user, I'd expect those messages to be consumed in the correct order, not 
some unpredictable order, unless I specifically ask for it by shuffling.

Even in the case of 1 topicpartition, your suggestions regarding parallelism 
would result in broken semantics.  As soon as you split a given kafka 
topicpartition into multiple spark partitions for a given batch, you allow for 
messages from a single topicpartition to be processed simultaneously, and thus 
out of order.

Saying your approach gives "much better parallelism" because it's semantically 
broken by default isn't much of a feature claim.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka cons

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952299#comment-14952299
 ] 

Cody Koeninger commented on SPARK-11045:


"in Receiver based model, the number of partitions is dictated by Block 
Intervals and Batch Interval. If your block interval is 200 Ms, and Batch 
interval is 10 seconds , your RDD will have 50 partitions !"

Note that if your receiver actually works this way, then you've broken Kafka 
per-partition ordering guarantees, which is kind of a big problem.

"Regarding the state of spark-packages code, your comment is not at good taste"

Don't misunderstand me, I am not attacking you personally.  A +1 on merging 
code into spark is basically saying "I'd be ok with maintaining this if it was 
added to spark".  I would not be comfortable doing that (to the extent my 
opinion counts).  That has nothing to do with which companies are using the 
code in production.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Rec

[jira] [Commented] (SPARK-11045) Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14951877#comment-14951877
 ] 

Cody Koeninger commented on SPARK-11045:


The comments regarding parallelism are not accurate.  Your read parallelism 
from Kafka is ultimately limited by number of Kafka partitions regardless of 
which consumer you use.

Spark checkpoint recovery is a problem, again regardless of what consumer you 
use.  Zookeeper as an offset store also has its own problems. At least the 
direct stream allows you to choose what works for you.

It seems like the main substantive complaint is rebalance behavior of the high 
level consumer. Frankly, given the state of the spark packages code the last 
time I looked at it, I'd rather effort be spent on addressing problems with the 
existing receiver rather than incorporating the spark packages code as is into 
spark.

> Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to 
> Apache Spark Project
> 
>
> Key: SPARK-11045
> URL: https://issues.apache.org/jira/browse/SPARK-11045
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Dibyendu Bhattacharya
>
> This JIRA is to track the progress of making the Receiver based Low Level 
> Kafka Consumer from spark-packages 
> (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) to be 
> contributed back to Apache Spark Project.
> This Kafka consumer has been around for more than year and has matured over 
> the time . I see there are many adoptions of this package . I receive 
> positive feedbacks that this consumer gives better performance and fault 
> tolerant capabilities. 
> This is the primary intent of this JIRA to give community a better 
> alternative if they want to use Receiver Base model. 
> If this consumer make it to Spark Core, it will definitely see more adoption 
> and support from community and help many who still prefer the Receiver Based 
> model of Kafka Consumer. 
> I understand the Direct Stream is the consumer which can give Exact Once 
> semantics and uses Kafka Low Level API  , which is good . But Direct Stream 
> has concerns around recovering checkpoint on driver code change . Application 
> developer need to manage their own offset which complex . Even if some one 
> does manages their own offset , it limits the parallelism Spark Streaming can 
> achieve. If someone wants more parallelism and want 
> spark.streaming.concurrentJobs more than 1 , you can no longer rely on 
> storing offset externally as you have no control which batch will run in 
> which sequence. 
> Furthermore , the Direct Stream has higher latency , as it fetch messages 
> form Kafka during RDD action . Also number of RDD partitions are limited to 
> topic partition . So unless your Kafka topic does not have enough partitions, 
> you have limited parallelism while RDD processing. 
> Due to above mentioned concerns , many people who does not want Exactly Once 
> semantics , still prefer Receiver based model. Unfortunately, when customer 
> fall back to KafkaUtil.CreateStream approach, which use Kafka High Level 
> Consumer, there are other issues around the reliability of Kafka High Level 
> API.  Kafka High Level API is buggy and has serious issue around Consumer 
> Re-balance. Hence I do not think this is correct to advice people to use 
> KafkaUtil.CreateStream in production . 
> The better option presently is there is to use the Consumer from 
> spark-packages . It is is using Kafka Low Level Consumer API , store offset 
> in Zookeeper, and can recover from any failure . Below are few highlights of 
> this consumer  ..
> 1. It has a inbuilt PID Controller for dynamic rate limiting.
> 2. In this consumer ,  The Rate Limiting is done by modifying the size blocks 
> by controlling the size of messages pulled from Kafka. Whereas , in Spark the 
> Rate Limiting is done by controlling number of  messages. The issue with 
> throttling by number of message is, if message size various, block size will 
> also vary . Let say your Kafka has messages for different sizes from 10KB to 
> 500 KB. Thus throttling by number of message can never give any deterministic 
> size of your block hence there is no guarantee that Memory Back-Pressure can 
> really take affect. 
> 3. This consumer is using Kafka low level API which gives better performance 
> than KafkaUtils.createStream based High Level API.
> 4. This consumer can give end to end no data loss channel if enabled with WAL.
> By accepting this low level kafka consumer from spark packages to apache 
> spark project , we will give community a better options for Kafka 
> connectivity both for Receiver less and Receiver based model. 



--
This message was sent b

[jira] [Created] (SPARK-10963) Make KafkaCluster api public

2015-10-06 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-10963:
--

 Summary: Make KafkaCluster api public
 Key: SPARK-10963
 URL: https://issues.apache.org/jira/browse/SPARK-10963
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Cody Koeninger
Priority: Minor


per mailing list discussion, theres enough interest in people using 
KafkaCluster (e.g. to access latest offsets) to justify making it public



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

2015-10-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945091#comment-14945091
 ] 

Cody Koeninger commented on SPARK-5569:
---

The gist I originally posted, linked at the top of this ticket, has a 3 line 
change that should reproduce the issue.

> Checkpoints cannot reference classes defined outside of Spark's assembly
> 
>
> Key: SPARK-5569
> URL: https://issues.apache.org/jira/browse/SPARK-5569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-142110041.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-142110041.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
> at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
> at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at 
> scala.collection.mutable.WrappedArray.fo

[jira] [Commented] (SPARK-9472) Consistent hadoop config for streaming

2015-09-30 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14937483#comment-14937483
 ] 

Cody Koeninger commented on SPARK-9472:
---

I'd suggest patching your distribution then, I doubt there's going to be
much interest in officially backporting it

On Wed, Sep 30, 2015 at 11:27 AM, Russell Alexander Spitzer (JIRA) <



> Consistent hadoop config for streaming
> --
>
> Key: SPARK-9472
> URL: https://issues.apache.org/jira/browse/SPARK-9472
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Cody Koeninger
>Assignee: Cody Koeninger
>Priority: Minor
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9472) Consistent hadoop config for streaming

2015-09-30 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14936866#comment-14936866
 ] 

Cody Koeninger commented on SPARK-9472:
---

You can get around this by passing in the hadoop context that you want as
an argument to getOrCreate

StreamingContext.getOrCreate(somePath, () => someFunction, SparkHadoopUtil
.get.newConfiguration(someSparkConf)

On Tue, Sep 29, 2015 at 10:00 PM, Russell Alexander Spitzer (JIRA) <



> Consistent hadoop config for streaming
> --
>
> Key: SPARK-9472
> URL: https://issues.apache.org/jira/browse/SPARK-9472
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Cody Koeninger
>Assignee: Cody Koeninger
>Priority: Minor
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-10732) Starting spark streaming from a specific point in time.

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903231#comment-14903231
 ] 

Cody Koeninger edited comment on SPARK-10732 at 9/22/15 7:02 PM:
-

Yeah, even if that gets implemented it will likely be at 1 minute granularity, 
which might be ok for some failure recovery situations but is unlikely to work 
for your SPARK-10734 ticket


was (Author: c...@koeninger.org):
Yeah, even if that gets implemented it will likely be at 1 minute granularity, 
which might be ok for some failure recovery situations but is unlikely to work 
for your SPARK-1074 ticket

> Starting spark streaming from a specific point in time.
> ---
>
> Key: SPARK-10732
> URL: https://issues.apache.org/jira/browse/SPARK-10732
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Bijay Singh Bisht
>
> Currently, spark streaming either starts from current time or from the latest 
> checkpoint. It would be extremely useful to start from any arbitrary point. 
> This would be useful in replay scenarios or in running regression tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903231#comment-14903231
 ] 

Cody Koeninger commented on SPARK-10732:


Yeah, even if that gets implemented it will likely be at 1 minute granularity, 
which might be ok for some failure recovery situations but is unlikely to work 
for your SPARK-1074 ticket

> Starting spark streaming from a specific point in time.
> ---
>
> Key: SPARK-10732
> URL: https://issues.apache.org/jira/browse/SPARK-10732
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Bijay Singh Bisht
>
> Currently, spark streaming either starts from current time or from the latest 
> checkpoint. It would be extremely useful to start from any arbitrary point. 
> This would be useful in replay scenarios or in running regression tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903110#comment-14903110
 ] 

Cody Koeninger commented on SPARK-10732:


As I already said, kafka's implementation of getOffsetsBefore is based on log 
file timestamps, not a granular index, so it really isn't accurate enough for 
this.

> Starting spark streaming from a specific point in time.
> ---
>
> Key: SPARK-10732
> URL: https://issues.apache.org/jira/browse/SPARK-10732
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Bijay Singh Bisht
>
> Currently, spark streaming either starts from current time or from the latest 
> checkpoint. It would be extremely useful to start from any arbitrary point. 
> This would be useful in replay scenarios or in running regression tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10734) DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest offset, however using the batch time would be more desireable.

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903106#comment-14903106
 ] 

Cody Koeninger commented on SPARK-10734:


as I explained in SPARK-10732 , kafka's getOffsetsBefore api is limited to the 
timestamps on log file segments, so its granularity is quite poor and doesn't 
really behave as one might expect.


> DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest 
> offset, however using the batch time would be more desireable.
> ---
>
> Key: SPARK-10734
> URL: https://issues.apache.org/jira/browse/SPARK-10734
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Reporter: Bijay Singh Bisht
>
> DirectKafkaInputDStream uses the OffsetRequest.LatestTime to find the latest 
> offset, however since OffsetRequest.LatestTime is a relative thing, its 
> depends on when the batch is scheduled. One would imagine that given an input 
> data set the data in the batches should be predictable, irrespective of the 
> system conditions. Using the batch time implies that the stream processing 
> will have the same batches irrespective of whether when the processing was 
> started and the load conditions on the system.
> This along with [SPARK-10732] provides for a nice regression scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14902692#comment-14902692
 ] 

Cody Koeninger commented on SPARK-5569:
---

I'm still not clear on what you're doing in "captureOffsetRanges(..)" that
is leading to the exception you first mentioned.

But regarding the question of whether transform / foreachRDD etc runs on
the driver or the executor,

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

should make it pretty clear which portions run where

On Mon, Sep 21, 2015 at 6:29 PM, Jon Buffington (JIRA) 



> Checkpoints cannot reference classes defined outside of Spark's assembly
> 
>
> Key: SPARK-5569
> URL: https://issues.apache.org/jira/browse/SPARK-5569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-142110041.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-142110041.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
> at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
> at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.

[jira] [Commented] (SPARK-10732) Starting spark streaming from a specific point in time.

2015-09-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14902648#comment-14902648
 ] 

Cody Koeninger commented on SPARK-10732:


It doesn't make sense to decouple this from the implementation.  Not all stream 
implementations have the ability to replay starting at a given point.

In order to actually do this for Kafka, you need an index from time to offset.

Spark has no built in storage that would be appropriate for maintaining this 
index.  Kafka has a time-based api, but it is based on log file timestamps, not 
a granular index, so is totally inaccurate for most use cases.

For now, you'll need to keep the index yourself, and then provide the 
corresponding starting offsets as Sean mentioned.


> Starting spark streaming from a specific point in time.
> ---
>
> Key: SPARK-10732
> URL: https://issues.apache.org/jira/browse/SPARK-10732
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Bijay Singh Bisht
>
> Currently, spark streaming either starts from current time or from the latest 
> checkpoint. It would be extremely useful to start from any arbitrary point. 
> This would be useful in replay scenarios or in running regression tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

2015-09-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901435#comment-14901435
 ] 

Cody Koeninger commented on SPARK-5569:
---

Yeah, spark-streaming can be marked as provided.

Try to get a small reproducible case that demonstrates the issue.

On Mon, Sep 21, 2015 at 3:58 PM, Jon Buffington (JIRA) 



> Checkpoints cannot reference classes defined outside of Spark's assembly
> 
>
> Key: SPARK-5569
> URL: https://issues.apache.org/jira/browse/SPARK-5569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-142110041.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-142110041.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
> at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
> at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

2015-09-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901364#comment-14901364
 ] 

Cody Koeninger commented on SPARK-5569:
---

You can typically use sbt's merge strategy to deal with that kind of thing

example:

https://github.com/databricks/learning-spark/blob/master/build.sbt

On Mon, Sep 21, 2015 at 3:23 PM, Jon Buffington (JIRA) 



> Checkpoints cannot reference classes defined outside of Spark's assembly
> 
>
> Key: SPARK-5569
> URL: https://issues.apache.org/jira/browse/SPARK-5569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-142110041.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-142110041.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
> at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
> at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
> at 
> scala.collection.IndexedSeqOptimized$class.

[jira] [Commented] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

2015-09-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14901106#comment-14901106
 ] 

Cody Koeninger commented on SPARK-5569:
---

The direct stream doesn't save offset ranges, it saves tuples and converts them 
to / from offset ranges.  That workaround has been in place since the original 
release.

You're probably running into something else, can you make a minimal 
reproducible code example?

> Checkpoints cannot reference classes defined outside of Spark's assembly
> 
>
> Key: SPARK-5569
> URL: https://issues.apache.org/jira/browse/SPARK-5569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from 
> file file:/var/tmp/cp/checkpoint-142110041.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file 
> file:/var/tmp/cp/checkpoint-142110041.bk
> java.io.IOException: java.lang.ClassNotFoundException: 
> org.apache.spark.rdd.kafka.KafkaRDDPartition
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
> at 
> org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
> at 
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
> at 
> org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>   

[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-09-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14790843#comment-14790843
 ] 

Cody Koeninger commented on SPARK-10320:


I don't think there's much benefit to multiple dstreams with the direct api, 
because it's straightforward to filter or match on the topic on a per-partition 
basis.  I'm not sure that adding entirely new dstreams after the streaming 
context has been started makes sense.

As far as defaults go... I don't see a clearly reasonable default like 
messageHandler has.  Maybe an example implementation of a function that 
maintains just a list of topic names and handles the offset lookups.

The other thing is, in order to get much use out of this, the api for 
communicating with the kafka cluster would need to be made public, and there 
had been some reluctance on that point previously.

[~tdas] Any thoughts on making the KafkaCluster api public?

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-09-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726076#comment-14726076
 ] 

Cody Koeninger commented on SPARK-10320:


You would supply a function, similar to the way createDirectStream currently 
takes a messageHandler: MessageAndMetadata[K, V] => R

The type of that function would be 

(Time, Map[TopicAndPartition, Long], Map[TopicAndPartition, LeaderOffset]) => 
(Map[TopicAndPartition, Long, Map[TopicAndPartition, LeaderOffset])

in other words

(time, fromOffsets, untilOffsets) => (fromOffsets, untilOffsets)

Your function would be called in the compute() method of the dstream, after 
contacting the leaders and before making the rdd for the next batch.
That would let you make arbitrary modifications to the topics / partitions / 
offsets.

As far as the desire for a general solution, I think this is a kafka-specific 
concern.  Not all streams have topics.

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-09-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725719#comment-14725719
 ] 

Cody Koeninger commented on SPARK-10320:


So if you're changing topics in an event handler, it's almost certainly not the 
same thread the spark context was started from...

At any rate, do you think a callback along the lines of what I described would 
be sufficient for your use case?  

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-08-31 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724113#comment-14724113
 ] 

Cody Koeninger commented on SPARK-10320:


It's possible this might be solvable with a user-supplied callback of the form

(Map[TopicAndPartition, LeaderOffset]) => Map[TopicAndPartition, LeaderOffset]
or maybe
(Time, Map[TopicAndPartition, LeaderOffset]) => Map[TopicAndPartition, 
LeaderOffset]

that got called in the compute method of DelayedKafkaInputDStream.  That would 
avoid threading issues, and also allow for more-or-less arbitrary modification 
of the topics and offsets, including some use cases that people have to 
subclass for currently.

Actually, that just handles the ending offsets of the batch, so it'd need to be 
a pair of maps, one for the beginning and the other for the ending.  But the 
basic idea remains.



> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10320) Kafka Support new topic subscriptions without requiring restart of the streaming context

2015-08-31 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-10320:
---
Summary: Kafka Support new topic subscriptions without requiring restart of 
the streaming context  (was: Support new topic subscriptions without requiring 
restart of the streaming context)

> Kafka Support new topic subscriptions without requiring restart of the 
> streaming context
> 
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Support new topic subscriptions without requiring restart of the streaming context

2015-08-31 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724074#comment-14724074
 ] 

Cody Koeninger commented on SPARK-10320:


" If you restart the job and specify a new offset, that is where consumption 
should start, in effect overriding any saved offsets."

That's not the way checkpoints work.  You're either restarting from a 
checkpoint, or you're not restarting from a checkpoint, the decision is up to 
you.  If you want to specify a new offset, start the job clean.

"The topic changes happen in the same thread of execution where the initial 
list of topics was provided before starting the streaming context."

Can you say a little more about what you're actually doing here?  How do you 
know when topics need to be modified?  Typically streaming jobs just call 
ssc.awaitTermination in their main thread, which seems incompatible with what 
you're describing.

> Support new topic subscriptions without requiring restart of the streaming 
> context
> --
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10320) Support new topic subscriptions without requiring restart of the streaming context

2015-08-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14717342#comment-14717342
 ] 

Cody Koeninger commented on SPARK-10320:


As I said on the list, the best way to deal with this currently is start a new 
app with your new code, before stopping the old app.

In terms of a potential feature addition, I think there are a number of 
questions that would need to be cleared up... e.g.

- when would you change topics?  During a streaming listener onbatch completed 
handler?  From a separate thread?

- when adding a topic, what would the expectations around starting offset be?  
As in the current api, provide explicit offsets per partition, start at 
beginning, or start at end?

- if you add partitions for topics that currently exist, and specify a starting 
offset that's different from where the job is currently, what would the 
expectation be?
- if you add, later remove, then later re-add a topic, what would the 
expectation regarding saved checkpoints be?

> Support new topic subscriptions without requiring restart of the streaming 
> context
> --
>
> Key: SPARK-10320
> URL: https://issues.apache.org/jira/browse/SPARK-10320
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Sudarshan Kadambi
>
> Spark Streaming lacks the ability to subscribe to newer topics or unsubscribe 
> to current ones once the streaming context has been started. Restarting the 
> streaming context increases the latency of update handling.
> Consider a streaming application subscribed to n topics. Let's say 1 of the 
> topics is no longer needed in streaming analytics and hence should be 
> dropped. We could do this by stopping the streaming context, removing that 
> topic from the topic list and restarting the streaming context. Since with 
> some DStreams such as DirectKafkaStream, the per-partition offsets are 
> maintained by Spark, we should be able to resume uninterrupted (I think?) 
> from where we left off with a minor delay. However, in instances where 
> expensive state initialization (from an external datastore) may be needed for 
> datasets published to all topics, before streaming updates can be applied to 
> it, it is more convenient to only subscribe or unsubcribe to the incremental 
> changes to the topic list. Without such a feature, updates go unprocessed for 
> longer than they need to be, thus affecting QoS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-08-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697188#comment-14697188
 ] 

Cody Koeninger commented on SPARK-6249:
---

If you want an api that has imprecise semantics and stores stuff in ZK, use
the receiver based stream.

This ticket has been closed for a while, I'd suggest further discussion
would be better suited for the mailing list.




> Get Kafka offsets from consumer group in ZK when using direct stream
> 
>
> Key: SPARK-6249
> URL: https://issues.apache.org/jira/browse/SPARK-6249
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Tathagata Das
>
> This is the proposal. 
> The simpler direct API (the one that does not take explicit offsets) can be 
> modified to also pick up the initial offset from ZK if group.id is specified. 
> This is exactly similar to how we find the latest or earliest offset in that 
> API, just that instead of latest/earliest offset of the topic we want to find 
> the offset from the consumer group. The group offsets is ZK is not used at 
> all for any further processing and restarting, so the exactly-once semantics 
> is not broken. 
> The use case where this is useful is simplified code upgrade. If the user 
> wants to upgrade the code, he/she can the context stop gracefully which will 
> ensure the ZK consumer group offset will be updated with the last offsets 
> processed. Then the new code is started (not restarted from checkpoint) can 
> pickup  the consumer group offset from ZK and continue where the previous 
> code had left off. 
> Without the functionality of picking up consumer group offsets to start (that 
> is, currently) the only way to do this is for the users to save the offsets 
> somewhere (file, database, etc.) and manage the offsets themselves. I just 
> want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697193#comment-14697193
 ] 

Cody Koeninger commented on SPARK-9947:
---

Didn't you already say that you were saving updateStateByKey state yourself?




> Separate Metadata and State Checkpoint Data
> ---
>
> Key: SPARK-9947
> URL: https://issues.apache.org/jira/browse/SPARK-9947
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Problem: When updating an application that has checkpointing enabled to 
> support the updateStateByKey and 24/7 operation functionality, you encounter 
> the problem where you might like to maintain state data between restarts but 
> delete the metadata containing execution state. 
> If checkpoint data exists between code redeployment, the program may not 
> execute properly or at all. My current workaround for this issue is to wrap 
> updateStateByKey with my own function that persists the state after every 
> update to my own separate directory. (That allows me to delete the checkpoint 
> with its metadata before redeploying) Then, when I restart the application, I 
> initialize the state with this persisted data. This incurs additional 
> overhead due to persisting of the same data twice: once in the checkpoint and 
> once in my persisted data folder. 
> If Kafka Direct API offsets could be stored in another separate checkpoint 
> directory, that would help address the problem of having to blow that away 
> between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697166#comment-14697166
 ] 

Cody Koeninger commented on SPARK-9947:
---

You can't re-use checkpoint data across application upgrades anyway, so
that honestly seems kind of pointless.







> Separate Metadata and State Checkpoint Data
> ---
>
> Key: SPARK-9947
> URL: https://issues.apache.org/jira/browse/SPARK-9947
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Problem: When updating an application that has checkpointing enabled to 
> support the updateStateByKey and 24/7 operation functionality, you encounter 
> the problem where you might like to maintain state data between restarts but 
> delete the metadata containing execution state. 
> If checkpoint data exists between code redeployment, the program may not 
> execute properly or at all. My current workaround for this issue is to wrap 
> updateStateByKey with my own function that persists the state after every 
> update to my own separate directory. (That allows me to delete the checkpoint 
> with its metadata before redeploying) Then, when I restart the application, I 
> initialize the state with this persisted data. This incurs additional 
> overhead due to persisting of the same data twice: once in the checkpoint and 
> once in my persisted data folder. 
> If Kafka Direct API offsets could be stored in another separate checkpoint 
> directory, that would help address the problem of having to blow that away 
> between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9947) Separate Metadata and State Checkpoint Data

2015-08-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14697098#comment-14697098
 ] 

Cody Koeninger commented on SPARK-9947:
---

You already have access to offsets and can save them however you want.  You can 
provide those offsets on restart, regardless of whether checkpointing was 
enabled.

> Separate Metadata and State Checkpoint Data
> ---
>
> Key: SPARK-9947
> URL: https://issues.apache.org/jira/browse/SPARK-9947
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.1
>Reporter: Dan Dutrow
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Problem: When updating an application that has checkpointing enabled to 
> support the updateStateByKey and 24/7 operation functionality, you encounter 
> the problem where you might like to maintain state data between restarts but 
> delete the metadata containing execution state. 
> If checkpoint data exists between code redeployment, the program may not 
> execute properly or at all. My current workaround for this issue is to wrap 
> updateStateByKey with my own function that persists the state after every 
> update to my own separate directory. (That allows me to delete the checkpoint 
> with its metadata before redeploying) Then, when I restart the application, I 
> initialize the state with this persisted data. This incurs additional 
> overhead due to persisting of the same data twice: once in the checkpoint and 
> once in my persisted data folder. 
> If Kafka Direct API offsets could be stored in another separate checkpoint 
> directory, that would help address the problem of having to blow that away 
> between code redeployment as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-08-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695939#comment-14695939
 ] 

Cody Koeninger commented on SPARK-6249:
---

Regarding streaming stats, those should be available in the current release



> Get Kafka offsets from consumer group in ZK when using direct stream
> 
>
> Key: SPARK-6249
> URL: https://issues.apache.org/jira/browse/SPARK-6249
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Tathagata Das
>
> This is the proposal. 
> The simpler direct API (the one that does not take explicit offsets) can be 
> modified to also pick up the initial offset from ZK if group.id is specified. 
> This is exactly similar to how we find the latest or earliest offset in that 
> API, just that instead of latest/earliest offset of the topic we want to find 
> the offset from the consumer group. The group offsets is ZK is not used at 
> all for any further processing and restarting, so the exactly-once semantics 
> is not broken. 
> The use case where this is useful is simplified code upgrade. If the user 
> wants to upgrade the code, he/she can the context stop gracefully which will 
> ensure the ZK consumer group offset will be updated with the last offsets 
> processed. Then the new code is started (not restarted from checkpoint) can 
> pickup  the consumer group offset from ZK and continue where the previous 
> code had left off. 
> Without the functionality of picking up consumer group offsets to start (that 
> is, currently) the only way to do this is for the users to save the offsets 
> somewhere (file, database, etc.) and manage the offsets themselves. I just 
> want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-08-13 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695929#comment-14695929
 ] 

Cody Koeninger commented on SPARK-6249:
---

https://github.com/koeninger/kafka-exactly-once

If you've tried the code, read the blog post, and watched the presentation
linked from that repo, and still have specific questions, feel free to ask.



> Get Kafka offsets from consumer group in ZK when using direct stream
> 
>
> Key: SPARK-6249
> URL: https://issues.apache.org/jira/browse/SPARK-6249
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Tathagata Das
>
> This is the proposal. 
> The simpler direct API (the one that does not take explicit offsets) can be 
> modified to also pick up the initial offset from ZK if group.id is specified. 
> This is exactly similar to how we find the latest or earliest offset in that 
> API, just that instead of latest/earliest offset of the topic we want to find 
> the offset from the consumer group. The group offsets is ZK is not used at 
> all for any further processing and restarting, so the exactly-once semantics 
> is not broken. 
> The use case where this is useful is simplified code upgrade. If the user 
> wants to upgrade the code, he/she can the context stop gracefully which will 
> ensure the ZK consumer group offset will be updated with the last offsets 
> processed. Then the new code is started (not restarted from checkpoint) can 
> pickup  the consumer group offset from ZK and continue where the previous 
> code had left off. 
> Without the functionality of picking up consumer group offsets to start (that 
> is, currently) the only way to do this is for the users to save the offsets 
> somewhere (file, database, etc.) and manage the offsets themselves. I just 
> want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9780) In case of invalid initialization of KafkaDirectStream, NPE is thrown

2015-08-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14681886#comment-14681886
 ] 

Cody Koeninger commented on SPARK-9780:
---

Makes sense, traveling currently but I'll put in a PR

> In case of invalid initialization of KafkaDirectStream, NPE is thrown
> -
>
> Key: SPARK-9780
> URL: https://issues.apache.org/jira/browse/SPARK-9780
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1, 1.4.1
>Reporter: Grigory Turunov
>Priority: Minor
>
> [o.a.s.streaming.kafka.KafkaRDD.scala#L143|https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L143]
> In initialization of KafkaRDDIterator, there is an addition of 
> TaskCompletionListener to the context, which calls close() to the consumer, 
> which is not initialized yet (and will be initialized 12 lines after that).
> If something happens in this 12 lines (in my case there was a private 
> constructor for valueDecoder), an Exception, which is thrown, triggers 
> context.markTaskCompleted() in
> [o.a.s.scheduler.Task.scala#L90|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L90]
> which throws NullPointerException, when tries to call close() for 
> non-initialized consumer.
> This masks original exception - so it is very hard to understand, what is 
> happening.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9476) Kafka stream loses leader after 2h of operation

2015-08-09 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14679380#comment-14679380
 ] 

Cody Koeninger commented on SPARK-9476:
---

The stacktraces you posted don't contain any Spark code, they look like they're 
all from Kafka producer code.  I assume it's from the part of the job where 
you're feeding results back to Kafka.

I'd like to help on general principles, but without a stacktrace that involves 
Spark or a minimal reproducible example without your proprietary code, there's 
not much I can do.

I'd suggest trying to catch the Kafka exception when producing and adding more 
logging to see what's going on.

> Kafka stream loses leader after 2h of operation 
> 
>
> Key: SPARK-9476
> URL: https://issues.apache.org/jira/browse/SPARK-9476
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
> Environment: Docker, Centos, Spark standalone, core i7, 8Gb
>Reporter: Ruben Ramalho
>
> This seems to happen every 2h, it happens both with the direct stream and 
> regular stream, I'm doing window operations over a 1h period (if that can 
> help).
> Here's part of the error message:
> 2015-07-30 13:27:23 WARN  ClientUtils$:89 - Fetching topic metadata with 
> correlation id 10 for topics [Set(updates)] from broker 
> [id:0,host:192.168.3.23,port:3000] failed
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 2015-07-30 13:27:23 INFO  SyncProducer:68 - Disconnecting from 
> 192.168.3.23:3000
> 2015-07-30 13:27:23 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - 
> [spark-group_81563e123e9f-1438259236988-fc3d82bf-leader-finder-thread], 
> Failed to find leader for Set([updates,0])
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(oversight-updates)] from broker 
> [ArrayBuffer(id:0,host:192.168.3.23,port:3000)] failed
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Caused by: java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> After the crash I tried to communicate with kafka with a simple scala 
> consumer and producer and have no problem at all. Spark tough needs a kafka 
> container restart to start normal operaiton. There are no errors on the kafka 
> log, apart from an improper closed connection.
> I have been trying to solve this problem for days, I suspect this has 
> something to do with spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9476) Kafka stream loses leader after 2h of operation

2015-07-31 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14649123#comment-14649123
 ] 

Cody Koeninger commented on SPARK-9476:
---

Those all look like info or warn level log lines, are there error level lines?

It'd be helpful to have the relevant code. Have you tried running a simpler job 
in the same environment to see if it has similar issues?

Also, what version of Kafka are you using on the brokers?

> Kafka stream loses leader after 2h of operation 
> 
>
> Key: SPARK-9476
> URL: https://issues.apache.org/jira/browse/SPARK-9476
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.1
> Environment: Docker, Centos, Spark standalone, core i7, 8Gb
>Reporter: Ruben Ramalho
>
> This seems to happen every 2h, it happens both with the direct stream and 
> regular stream, I'm doing window operations over a 1h period (if that can 
> help).
> Here's part of the error message:
> 2015-07-30 13:27:23 WARN  ClientUtils$:89 - Fetching topic metadata with 
> correlation id 10 for topics [Set(updates)] from broker 
> [id:0,host:192.168.3.23,port:3000] failed
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 2015-07-30 13:27:23 INFO  SyncProducer:68 - Disconnecting from 
> 192.168.3.23:3000
> 2015-07-30 13:27:23 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - 
> [spark-group_81563e123e9f-1438259236988-fc3d82bf-leader-finder-thread], 
> Failed to find leader for Set([updates,0])
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(oversight-updates)] from broker 
> [ArrayBuffer(id:0,host:192.168.3.23,port:3000)] failed
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>   at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Caused by: java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>   at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>   at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>   at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> After the crash I tried to communicate with kafka with a simple scala 
> consumer and producer and have no problem at all. Spark tough needs a kafka 
> container restart to start normal operaiton. There are no errors on the kafka 
> log, apart from an improper closed connection.
> I have been trying to solve this problem for days, I suspect this has 
> something to do with spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9475) Consistent hadoop config for external/*

2015-07-30 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-9475:
-

 Summary: Consistent hadoop config for external/*
 Key: SPARK-9475
 URL: https://issues.apache.org/jira/browse/SPARK-9475
 Project: Spark
  Issue Type: Sub-task
Reporter: Cody Koeninger
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9473) Consistent hadoop config for SQL

2015-07-30 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-9473:
-

 Summary: Consistent hadoop config for SQL
 Key: SPARK-9473
 URL: https://issues.apache.org/jira/browse/SPARK-9473
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Cody Koeninger






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9474) Consistent hadoop config for core

2015-07-30 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-9474:
-

 Summary: Consistent hadoop config for core
 Key: SPARK-9474
 URL: https://issues.apache.org/jira/browse/SPARK-9474
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Reporter: Cody Koeninger






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9472) Consistent hadoop config for streaming

2015-07-30 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-9472:
-

 Summary: Consistent hadoop config for streaming
 Key: SPARK-9472
 URL: https://issues.apache.org/jira/browse/SPARK-9472
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Cody Koeninger
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API

2015-07-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14646113#comment-14646113
 ] 

Cody Koeninger commented on SPARK-9434:
---

If you want to submit a PR with doc changes to add another link between the 
two, go for it.

> Need how-to for resuming direct Kafka streaming consumers where they had left 
> off before getting terminated, OR actual support for that mode in the 
> Streaming API
> -
>
> Key: SPARK-9434
> URL: https://issues.apache.org/jira/browse/SPARK-9434
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Examples, Streaming
>Affects Versions: 1.4.1
>Reporter: Dmitry Goldenberg
>
> We've been getting some mixed information regarding how to cause our direct 
> streaming consumers to resume processing from where they left off in terms of 
> the Kafka offsets.
> On the one hand side, we're hearing "If you are restarting the streaming app 
> with Direct kafka from the checkpoint information (that is, restarting), then 
> the last read offsets are automatically recovered, and the data will start 
> processing from that offset. All the N records added in T will stay buffered 
> in Kafka." (where T is the interval of time during which the consumer was 
> down).
> On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which 
> are marked as "won't fix" which seem to ask for the functionality we need, 
> with comments like "I don't want to add more config options with confusing 
> semantics around what is being used for the system of record for offsets, I'd 
> rather make it easy for people to explicitly do what they need."
> The use-case is actually very clear and doesn't ask for confusing semantics. 
> An API option to resume reading where you left off, in addition to the 
> smallest or greatest auto.offset.reset should be *very* useful, probably for 
> quite a few folks.
> We're asking for this as an enhancement request. SPARK-8833 states " I am 
> waiting for getting enough usecase to float in before I take a final call." 
> We're adding to that.
> In the meantime, can you clarify the confusion?  Does direct streaming 
> persist the progress information into "DStream checkpoints" or does it not?  
> If it does, why is it that we're not seeing that happen? Our consumers start 
> with auto.offset.reset=greatest and that causes them to read from the first 
> offset of data that is written to Kafka *after* the consumer has been 
> restarted, meaning we're missing data that had come in while the consumer was 
> down.
> If the progress is stored in "DStream checkpoints", we want to know a) how to 
> cause that to work for us and b) where the said checkpointing data is stored 
> physically.
> Conversely, if this is not accurate, then is our only choice to manually 
> persist the offsets into Zookeeper? If that is the case then a) we'd like a 
> clear, more complete code sample to be published, since the one in the Kafka 
> streaming guide is incomplete (it lacks the actual lines of code persisting 
> the offsets) and b) we'd like to request that SPARK-8833 be revisited as a 
> feature worth implementing in the API.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API

2015-07-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14646043#comment-14646043
 ] 

Cody Koeninger commented on SPARK-9434:
---

Dmitry, the nabble link you posted indicates your message was never accepted by 
the mailing list.

Looking through my email archives, there are only 3 discussions related to 
Kafka that you successfully sent to the mailing list, and all of them got 
answers.  As sean said, I think that's a better forum for questions.

To try and address what I think your question is... no, if you don't turn on 
checkpointing, you won't have any offsets saved in a checkpoint.  Checkpointing 
isn't necessary for every job, so it is not turned on by default.  The docs for 
KafkaUtils already explicitly say "To recover from driver failures, you have to 
enable checkpointing", with a link to the guide on how to do that.

The closed tickets you're linking to relate to a desire for some kind of 
automated committing to, and/or recovery from, Zookeeper based offsets.  As I 
stated in those tickets, I think the correct way to make this easier for people 
is just to expose an easier api for interacting with ZK.  That already exists 
in the code, it's just private.

> Need how-to for resuming direct Kafka streaming consumers where they had left 
> off before getting terminated, OR actual support for that mode in the 
> Streaming API
> -
>
> Key: SPARK-9434
> URL: https://issues.apache.org/jira/browse/SPARK-9434
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Examples, Streaming
>Affects Versions: 1.4.1
>Reporter: Dmitry Goldenberg
>
> We've been getting some mixed information regarding how to cause our direct 
> streaming consumers to resume processing from where they left off in terms of 
> the Kafka offsets.
> On the one hand side, we're hearing "If you are restarting the streaming app 
> with Direct kafka from the checkpoint information (that is, restarting), then 
> the last read offsets are automatically recovered, and the data will start 
> processing from that offset. All the N records added in T will stay buffered 
> in Kafka." (where T is the interval of time during which the consumer was 
> down).
> On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which 
> are marked as "won't fix" which seem to ask for the functionality we need, 
> with comments like "I don't want to add more config options with confusing 
> semantics around what is being used for the system of record for offsets, I'd 
> rather make it easy for people to explicitly do what they need."
> The use-case is actually very clear and doesn't ask for confusing semantics. 
> An API option to resume reading where you left off, in addition to the 
> smallest or greatest auto.offset.reset should be *very* useful, probably for 
> quite a few folks.
> We're asking for this as an enhancement request. SPARK-8833 states " I am 
> waiting for getting enough usecase to float in before I take a final call." 
> We're adding to that.
> In the meantime, can you clarify the confusion?  Does direct streaming 
> persist the progress information into "DStream checkpoints" or does it not?  
> If it does, why is it that we're not seeing that happen? Our consumers start 
> with auto.offset.reset=greatest and that causes them to read from the first 
> offset of data that is written to Kafka *after* the consumer has been 
> restarted, meaning we're missing data that had come in while the consumer was 
> down.
> If the progress is stored in "DStream checkpoints", we want to know a) how to 
> cause that to work for us and b) where the said checkpointing data is stored 
> physically.
> Conversely, if this is not accurate, then is our only choice to manually 
> persist the offsets into Zookeeper? If that is the case then a) we'd like a 
> clear, more complete code sample to be published, since the one in the Kafka 
> streaming guide is incomplete (it lacks the actual lines of code persisting 
> the offsets) and b) we'd like to request that SPARK-8833 be revisited as a 
> feature worth implementing in the API.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9215) Implement WAL-free Kinesis receiver that give at-least once guarantee

2015-07-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14635179#comment-14635179
 ] 

Cody Koeninger commented on SPARK-9215:
---

I left some comments in the doc.

Is Chris Fregly looking at this as well?

> Implement WAL-free Kinesis receiver that give at-least once guarantee
> -
>
> Key: SPARK-9215
> URL: https://issues.apache.org/jira/browse/SPARK-9215
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>
> https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9059) Update Direct Kafka Word count examples to show the use of HasOffsetRanges

2015-07-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14632476#comment-14632476
 ] 

Cody Koeninger commented on SPARK-9059:
---

How is this different from SPARK-8390 ?

I thought the idea there was to keep word count simple, and have separate 
examples for offsets.

Restarting from specific offsets is a good idea, but requires storage. If we 
want to move the examples from my external repo 
https://github.com/koeninger/kafka-exactly-once into spark, it would probably 
require switching from postgres to an in memory database.

> Update Direct Kafka Word count examples to show the use of HasOffsetRanges
> --
>
> Key: SPARK-9059
> URL: https://issues.apache.org/jira/browse/SPARK-9059
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Tathagata Das
>  Labels: starter
>
> Update Scala, Java and Python examples of Direct Kafka word count to access 
> the offset ranges using HasOffsetRanges and print it. For example in Scala,
>  
> {code}
> var offsetRanges: Array[OffsetRange] = _
> ...
> directKafkaDStream.foreachRDD { rdd => 
> offsetRanges = rdd.asInstanceOf[HasOffsetRanges]  
> }
> ...
> transformedDStream.foreachRDD { rdd => 
> // some operation
> println("Processed ranges: " + offsetRanges)
> }
> {code}
> See https://spark.apache.org/docs/latest/streaming-kafka-integration.html for 
> more info, and the master source code for more updated information on python. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8833) Kafka Direct API support offset in zookeeper

2015-07-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14615056#comment-14615056
 ] 

Cody Koeninger commented on SPARK-8833:
---

this basic idea has been discussed before and rejected:

https://issues.apache.org/jira/browse/SPARK-6249

> Kafka Direct API support offset in zookeeper
> 
>
> Key: SPARK-8833
> URL: https://issues.apache.org/jira/browse/SPARK-8833
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: guowei
>Priority: Minor
>
> Kafka Direct API only support consume the topic from latest or earliest.
> but user usually need to consume message from last offset  when restart 
> stream app .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python

2015-06-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14594196#comment-14594196
 ] 

Cody Koeninger commented on SPARK-8389:
---

Have you guys been looking at SPARK-8337 , the idea in there is to provide
a method that returns a stream of python dicts with all of the (currently
available) fields in MessageAndMetadata

On Fri, Jun 19, 2015 at 7:29 PM, Tathagata Das (JIRA) 



> Expose KafkaRDDs offsetRange in Java and Python
> ---
>
> Key: SPARK-8389
> URL: https://issues.apache.org/jira/browse/SPARK-8389
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Tathagata Das
>Assignee: Cody Koeninger
>Priority: Critical
>
> Probably requires creating a JavaKafkaPairRDD and also use that in the python 
> APIs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python

2015-06-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591170#comment-14591170
 ] 

Cody Koeninger commented on SPARK-8389:
---

Static type doesn't really matter since it's typecast anyway, as long as you 
can follow the chain of wrapped object back to the original.  The java api does 
a lot of wrapping too, but you can still unwrap it with rdd.rdd() as I 
mentioned.  

I'm not clear on why something comparable isn't possible for the python api.

> Expose KafkaRDDs offsetRange in Java and Python
> ---
>
> Key: SPARK-8389
> URL: https://issues.apache.org/jira/browse/SPARK-8389
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Tathagata Das
>Assignee: Cody Koeninger
>Priority: Critical
>
> Probably requires creating a JavaKafkaPairRDD and also use that in the python 
> APIs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8390) Update DirectKafkaWordCount examples to show how offset ranges can be used

2015-06-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14590720#comment-14590720
 ] 

Cody Koeninger commented on SPARK-8390:
---

Did we actually want to update the wordcount examples (might confuse, since you 
dont need offset ranges for minimal wordcount usage)... or just fix the part of 
the docs about the offset ranges?

PR is just fixing the docs for now.

I'd personally prefer to link to the talk / slides about direct stream once its 
available... not sure how you feel about external links in the doc.

> Update DirectKafkaWordCount examples to show how offset ranges can be used
> --
>
> Key: SPARK-8390
> URL: https://issues.apache.org/jira/browse/SPARK-8390
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Tathagata Das
>Assignee: Cody Koeninger
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588517#comment-14588517
 ] 

Cody Koeninger commented on SPARK-8337:
---

So one thing to keep in mind is that if the Kafka project ends up adding more 
fields to MessageAndMetadata, the scala interface is going to continue to give 
users access to those fields, without changing anything other than the Kafka 
version.

If you go with the approach of building a Python dict, someone's going to have 
to remember to go manually change the code to give access to the new fields.

I don't have enough Python knowledge to comment on whether the approach of 
passing a messageHandler function is feasible... I can try to get up to speed 
on it.  It may be worth trying to get the attention of Davies Liu after the 
spark conference hubub has died down.

> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --
>
> Key: SPARK-8337
> URL: https://issues.apache.org/jira/browse/SPARK-8337
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.4.0
>Reporter: Amit Ramesh
>Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python

2015-06-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588404#comment-14588404
 ] 

Cody Koeninger commented on SPARK-8389:
---

So on the java side, just so I'm clear, are we talking about the difference 
between people writing

OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();

which, as far as I can tell, they can do currently (see attached PR with test 
change) 
versus

OffsetRange[] offsets = ((HasOffsetRanges)rdd).offsetRanges();

I can see how the second is definitely a nicer api...  but I don't know that 
it's a critical bugfix, and I also don't know that it's worth introducing 
additional JavaKafkaRDD and JavaDirectKafkaInputDStream wrappers.  The typecast 
is kind of an ugly hack to begin with, there's only so much we can do to make 
it nicer... short of higher kinded return type parameters for rdd methods in 
Spark 2.0  :)

> Expose KafkaRDDs offsetRange in Java and Python
> ---
>
> Key: SPARK-8389
> URL: https://issues.apache.org/jira/browse/SPARK-8389
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Tathagata Das
>Assignee: Cody Koeninger
>Priority: Critical
>
> Probably requires creating a JavaKafkaPairRDD and also use that in the python 
> APIs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588211#comment-14588211
 ] 

Cody Koeninger commented on SPARK-7122:
---

It's certainly your prerogative to wait for an official release.  However, keep 
in mind that the patch in question is just a performance optimization, not 
necessarily a bug fix targeted at whatever your issue is.  Without a minimal 
reproducible case of your problem, or testing patches against your workload, 
there's no way of knowing if the performance optimization solves your problem.  
If it doesn't, you're looking at waiting for yet another release after 1.4.1.

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8389) Expose KafkaRDDs offsetRange in Java and Python

2015-06-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14588202#comment-14588202
 ] 

Cody Koeninger commented on SPARK-8389:
---

There's already a ticket for the Python side of things, SPARK-8337.  Not sure 
if you want to combine them.

I'll look at the java side of things to start.

> Expose KafkaRDDs offsetRange in Java and Python
> ---
>
> Key: SPARK-8389
> URL: https://issues.apache.org/jira/browse/SPARK-8389
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Tathagata Das
>Assignee: Cody Koeninger
>Priority: Critical
>
> Probably requires creating a JavaKafkaPairRDD and also use that in the python 
> APIs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8127) KafkaRDD optimize count() take() isEmpty()

2015-06-05 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-8127:
-

 Summary: KafkaRDD optimize count() take() isEmpty()
 Key: SPARK-8127
 URL: https://issues.apache.org/jira/browse/SPARK-8127
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Cody Koeninger
Priority: Minor


KafkaRDD can use offset range to avoid doing extra work

Possibly related to SPARK-7122



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14572059#comment-14572059
 ] 

Cody Koeninger commented on SPARK-7122:
---

If you can try out this patch https://github.com/apache/spark/pull/6632 please 
do so

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571675#comment-14571675
 ] 

Cody Koeninger commented on SPARK-7122:
---

Try doing something very straightforward inside of foreachRDD (println, 
increment a counter, whatever), instead of the current code.

You're calling isEmpty, which implies a take(1)

Then saveJsonToEs is internally calling take(1) again at least once

Take is going to successively schedule jobs, quadrupling the number of 
partitions used every time.  It's possible that starts getting some 
pathological behavior on RDDs that are empty or nearly empty yet still have 400 
partitions.

I could pretty straightforwardly override the definition of isEmpty in KafkaRDD 
to just look at the offset ranges instead of doing any work. Take would be a 
little trickier but still doable.  But let's figure out if that's the issue 
first.



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14571454#comment-14571454
 ] 

Cody Koeninger commented on SPARK-7122:
---

Nicolas, do you have an actual compilable project you can share a link to?  I'm 
not sure what EsSpark.saveJsonToEs is doing, for instance.

Also keep in mind that for the KafkaUtils.createStream case you're using a 
storage level of MEMORY_AND_DISK, whereas the directStream isn't going to do 
any caching by default - it's not a receiver.

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-02 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569254#comment-14569254
 ] 

Cody Koeninger commented on SPARK-7122:
---

Am I reading this correctly that you're only taking the first 11 messages
per rdd?  If that's forcing the whole rdd in the direct stream case but not
the receiver-based stream case, I can see why there'd be a difference.

Other thing to investigate would be removing checkpointing, but I doubt
that's the issue.

On Tue, Jun 2, 2015 at 9:24 AM, Nicolas PHUNG (JIRA) 



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-02 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569125#comment-14569125
 ] 

Cody Koeninger commented on SPARK-7122:
---

That number of tasks sounds normal for that many partitions, since the
direct stream partitions are 1:1 with kafka partitions.

Task overhead might explain a little bit of the difference, but I doubt
it's 6 seconds worth.

I'm not sure there's much else I can say without being able to compare the
code for the two.



On Tue, Jun 2, 2015 at 4:46 AM, Nicolas PHUNG (JIRA) 



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567917#comment-14567917
 ] 

Cody Koeninger commented on SPARK-7122:
---

So if you're setting up a 2 second batch stream, how long do batches take
to complete?

On Mon, Jun 1, 2015 at 2:50 PM, Nicolas PHUNG (JIRA) 



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-06-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14567865#comment-14567865
 ] 

Cody Koeninger commented on SPARK-7122:
---

When you say it's getting behind, how are you measuring that?


On Mon, Jun 1, 2015 at 2:33 PM, Nicolas PHUNG (JIRA) 



> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7827) kafka streaming NotLeaderForPartition Exception could not be handled normally

2015-05-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14556631#comment-14556631
 ] 

Cody Koeninger commented on SPARK-7827:
---

As TD said, it looks like it did retry 4 times.

You can set refresh.leader.backoff.ms in kafka params to affect the sleep 
duration before throwing the exception. Kafka default is 200ms.  Can you tell 
how long Kafka took to rebalance?



> kafka streaming NotLeaderForPartition Exception could not be handled normally
> -
>
> Key: SPARK-7827
> URL: https://issues.apache.org/jira/browse/SPARK-7827
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: spark 1.3.1,  on yarn,   hadoop 2.6.0  
>Reporter: hotdog
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> This is my Cluster 's log, once the partition's leader could not be found, 
> the total streaming task will fail...
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in 
> stage 3549.0 failed 4 times, most recent failure: Lost task 11.3 in stage 
> 3549.0 (TID 385491, td-h85.cp): kafka.common.NotLeaderForPartitionException
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at java.lang.Class.newInstance(Class.java:374)
>   at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:217)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> I read the source code of KafkaRDD.scala:
>   private def handleFetchErr(resp: FetchResponse) {
>   if (resp.hasError) {
> val err = resp.errorCode(part.topic, part.partition)
> if (err == ErrorMapping.LeaderNotAvailableCode ||
>   err == ErrorMapping.NotLeaderForPartitionCode) {
>   log.error(s"Lost leader for topic ${part.topic} partition 
> ${part.partition}, " +
> s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
>   Thread.sleep(kc.config.refreshLeaderBackoffMs)
> }
> // Let normal rdd retry 

[jira] [Commented] (SPARK-7255) spark.streaming.kafka.maxRetries not documented

2015-04-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14520403#comment-14520403
 ] 

Cody Koeninger commented on SPARK-7255:
---

I think that should be fine to document, thanks for noticing.

Although actually, it's the maximum number of times in a row that the driver 
will attempt to contact the leader, not the number of attempts after the first. 
 Which means it' somewhat unfortunately named. :)

> spark.streaming.kafka.maxRetries not documented
> ---
>
> Key: SPARK-7255
> URL: https://issues.apache.org/jira/browse/SPARK-7255
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, Streaming
>Affects Versions: 1.3.1
>Reporter: Benjamin Fradet
>Priority: Minor
> Fix For: 1.4.0
>
>
> I noticed there was no documentation for 
> [spark.streaming.kafka.maxRetries|https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala#L66]
>  was not documented in the [configuration 
> pagehttp://spark.apache.org/docs/latest/configuration.html#spark-streaming].
> Is this on purpose?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

2015-04-24 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511092#comment-14511092
 ] 

Cody Koeninger commented on SPARK-7122:
---

Does this actually have anything to do with direct stream?  I.e. is the 
behavior the same with the receiver based stream?

> KafkaUtils.createDirectStream - unreasonable processing time in absence of 
> load
> ---
>
> Key: SPARK-7122
> URL: https://issues.apache.org/jira/browse/SPARK-7122
> Project: Spark
>  Issue Type: Question
>  Components: Streaming
>Affects Versions: 1.3.1
> Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>Reporter: Platon Potapov
>Priority: Minor
> Attachments: 10.second.window.fast.job.txt, 
> 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data 
> generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming 
> is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.window(Seconds(40), Seconds(5))
> abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
> // dummy transformation
> val temperature = bytes.filter(_._1 == "abc")
> val abc = temperature.map(x => (1, x))
> abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth 
> of job duration?
> note: the result is the same regardless of the number of kafka topic 
> partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried 
> (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6431) Couldn't find leader offsets exception when creating KafkaDirectStream

2015-04-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14481266#comment-14481266
 ] 

Cody Koeninger commented on SPARK-6431:
---

I think this got mis-diagnosed on the mailing list, sorry for the confusion.

The only way I've been able to reproduce that exception is by trying to start a 
stream for a topic that doesn't exist at all.  Alberto, did you actually run 
kafka-topics.sh --create before starting the job, or in some other way create 
the topic?  Pretty sure what happened here is that your topic didn't exist the 
first time you ran the job.  Your brokers were set to auto-create topics, so it 
did exist the next time you ran the job.  Putting a message into the topic 
didn't have anything to do with it.

Here's why I think that's what happened.  Following console session is an 
example, where "empty" topic existed prior to starting the console, but had no 
messages.  Topic "hasonemesssage" existed and had one message in it.  Topic 
"doesntexistyet" didn't exist at the beginning of the console.

The metadata apis return the same info for existing-but-empty topics as they do 
for topics with messages in them:

scala> kc.getPartitions(Set("empty")).right
res0: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(
Set([empty,0], [empty,1])))

scala> kc.getPartitions(Set("hasonemessage")).right
res1: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set([hasonemessage,0], [hasonemessage,1])))


Leader offsets are both 0 for the empty topic, as you'd expect:

scala> kc.getLatestLeaderOffsets(kc.getPartitions(Set("empty")).right.get)
res5: 
Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]]
 = Right(Map([empty,1] -> LeaderOffset(localhost,9094,0), [empty,0] -> 
LeaderOffset(localhost,9093,0)))

And one of the leader offsets is 1 for the topic with one message:

scala> 
kc.getLatestLeaderOffsets(kc.getPartitions(Set("hasonemessage")).right.get)
res6: 
Either[org.apache.spark.streaming.kafka.KafkaCluster.Err,Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset]]
 = Right(Map([hasonemessage,0] -> LeaderOffset(localhost,9092,1), 
[hasonemessage,1] -> LeaderOffset(localhost,9093,0)))


The first time a metadata request is made against the non-existing topic, it 
returns empty:

kc.getPartitions(Set("doesntexistyet")).right
res2: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set()))


But if your brokers are configured with auto.create.topics.enable set to true, 
that metadata request alone is enough to trigger creation of the topic.  
Requesting it again shows that the topic has been created:

scala> kc.getPartitions(Set("doesntexistyet")).right
res3: 
scala.util.Either.RightProjection[org.apache.spark.streaming.kafka.KafkaCluster.Err,Set[kafka.common.TopicAndPartition]]
 = RightProjection(Right(Set([doesntexistyet,0], [doesntexistyet,1])))


If you don't think that explains what happened, please let me know if you have 
a way of reproducing that exception against an existing-but-empty topic, 
because I cant.

As far as what to do about this, my instinct is to just improve the error 
handling for the getPartitions call.  If the topic doesn't exist yet, It 
shouldn't be returning an empty set, it should be returning an error.


> Couldn't find leader offsets exception when creating KafkaDirectStream
> --
>
> Key: SPARK-6431
> URL: https://issues.apache.org/jira/browse/SPARK-6431
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Alberto
>
> When I try to create an InputDStream using the createDirectStream method of 
> the KafkaUtils class and the kafka topic does not have any messages yet am 
> getting the following error:
> org.apache.spark.SparkException: Couldn't find leader offsets for Set()
> org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't 
> find leader offsets for Set()
>   at 
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
> If I put a message in the topic before creating the DirectStream everything 
> works fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6569) Kafka directInputStream logs what appear to be incorrect warnings

2015-04-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14394457#comment-14394457
 ] 

Cody Koeninger commented on SPARK-6569:
---

Keep in mind that this message is in KafkaRDD, which is designed to be used on 
its own outside of a streaming job.  If someone asks for a 0 length offset 
range, I'd want the system to do something other than silently do nothing.

Platon's point about monitoring makes sense, and we do monitor our jobs.  I see 
that as orthogonal to logging - a graphite monitor might tell me a job is 
running abnormally, Spark logs might tell me that's because a producer is 
misconfigured and a particular topic is only getting written for one partition. 
 I wouldn't expect to have to run my system at debug log level to get that.

Honestly, given the incredible volume of Spark logs at info level, worrying 
about one more (potentially useful) log at info level doesn't make sense to me.


> Kafka directInputStream logs what appear to be incorrect warnings
> -
>
> Key: SPARK-6569
> URL: https://issues.apache.org/jira/browse/SPARK-6569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
> Environment: Spark 1.3.0
>Reporter: Platon Potapov
>Priority: Minor
>
> During what appears to be normal operation of streaming from a Kafka topic, 
> the following log records are observed, logged periodically:
> {code}
> [Stage 391:==>  (3 + 0) / 
> 4]
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> {code}
> * the part.fromOffset placeholder is not correctly substituted to a value
> * is the condition really mandates a warning being logged?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6569) Kafka directInputStream logs what appear to be incorrect warnings

2015-03-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384038#comment-14384038
 ] 

Cody Koeninger commented on SPARK-6569:
---

I set it as warn because an empty batch can be the source of non-obvious 
problems that would be obscured if it was at the info level. Streams that don't 
get even one item during a batch are relatively rare for my use cases.
 
I don't feel super strongly about it, though, if there's a reason to reduce the 
log level.

> Kafka directInputStream logs what appear to be incorrect warnings
> -
>
> Key: SPARK-6569
> URL: https://issues.apache.org/jira/browse/SPARK-6569
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
> Environment: Spark 1.3.0
>Reporter: Platon Potapov
>Priority: Minor
>
> During what appears to be normal operation of streaming from a Kafka topic, 
> the following log records are observed, logged periodically:
> {code}
> [Stage 391:==>  (3 + 0) / 
> 4]
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> 2015-03-27 12:49:54 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping raw 0
> {code}
> * the part.fromOffset placeholder is not correctly substituted to a value
> * is the condition really mandates a warning being logged?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6434) [Streaming][Kafka] CreateDirectStream for empty topics

2015-03-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371330#comment-14371330
 ] 

Cody Koeninger commented on SPARK-6434:
---

Yeah, I didn't notice alberto had also created a ticket, we can mark one as 
duplicate.

The thing is, I definitely would expect the stream to fail if I typo the name 
of the topic.  I'm not 100% certain without trying it out whether you can tell 
the difference between a topic that doesn't exist and a topic that doesn't have 
messages, solely through the metadata api.

> [Streaming][Kafka] CreateDirectStream for empty topics
> --
>
> Key: SPARK-6434
> URL: https://issues.apache.org/jira/browse/SPARK-6434
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Cody Koeninger
>Priority: Minor
> Fix For: 1.3.1
>
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Exception-using-the-new-createDirectStream-util-method-td8.html
> Looks like kafka metadata apis don't return leaders for topics that don't 
> have messages yet.
> Should either try to work around this, or document that topics need messages 
> before starting the stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6434) [Streaming][Kafka] CreateDirectStream for empty topics

2015-03-20 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-6434:
-

 Summary: [Streaming][Kafka] CreateDirectStream for empty topics
 Key: SPARK-6434
 URL: https://issues.apache.org/jira/browse/SPARK-6434
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Cody Koeninger
Priority: Minor
 Fix For: 1.3.1


http://apache-spark-developers-list.1001551.n3.nabble.com/Exception-using-the-new-createDirectStream-util-method-td8.html

Looks like kafka metadata apis don't return leaders for topics that don't have 
messages yet.

Should either try to work around this, or document that topics need messages 
before starting the stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6249) Get Kafka offsets from consumer group in ZK when using direct stream

2015-03-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14355561#comment-14355561
 ] 

Cody Koeninger commented on SPARK-6249:
---

First, I don't think setting group.id should have any effect on whether offsets 
are pulled from / saved to ZK.  They're two different things.

Second, I think this is another case where exposing an API to make it easy to 
get / set kafka's managed consumer offsets would allow people to solve this 
problem for themselves, in the way that makes sense for them.

We already have a way to specify the beginning offsets of the stream.  We 
already have code to get / set consumer offsets from Kafka, it's just not 
exposed.  By the time 1.4 is ready to release, hopefully we'll know whether 
we're ok exposing it.

I don't want to add more config options with confusing semantics around what is 
being used for the system of record for offsets, I'd rather make it easy for 
people to explicitly do what they need.

> Get Kafka offsets from consumer group in ZK when using direct stream
> 
>
> Key: SPARK-6249
> URL: https://issues.apache.org/jira/browse/SPARK-6249
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Tathagata Das
>
> This is the proposal. 
> The simpler direct API (the one that does not take explicit offsets) can be 
> modified to also pick up the initial offset from ZK if group.id is specified. 
> This is exactly similar to how we find the latest or earliest offset in that 
> API, just that instead of latest/earliest offset of the topic we want to find 
> the offset from the consumer group. The group offsets is ZK is not used at 
> all for any further processing and restarting, so the exactly-once semantics 
> is not broken. 
> The use case where this is useful is simplified code upgrade. If the user 
> wants to upgrade the code, he/she can the context stop gracefully which will 
> ensure the ZK consumer group offset will be updated with the last offsets 
> processed. Then the new code is started (not restarted from checkpoint) can 
> pickup  the consumer group offset from ZK and continue where the previous 
> code had left off. 
> Without the functionality of picking up consumer group offsets to start (that 
> is, currently) the only way to do this is for the users to save the offsets 
> somewhere (file, database, etc.) and manage the offsets themselves. I just 
> want to simplify this process. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2808) update kafka to version 0.8.2

2015-02-11 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14316937#comment-14316937
 ] 

Cody Koeninger commented on SPARK-2808:
---

I'm also kind of curious what the motivation is - I'd assume it's just for 
scala 2.11 compatibility?

The topic-backed consumer offset stuff still sounds promising-but-sketchy to 
me, honestly

> update kafka to version 0.8.2
> -
>
> Key: SPARK-2808
> URL: https://issues.apache.org/jira/browse/SPARK-2808
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, Spark Core
>Reporter: Anand Avati
>
> First kafka_2.11 0.8.1 has to be released



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4964) Exactly-once semantics for Kafka

2015-01-24 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14290747#comment-14290747
 ] 

Cody Koeninger commented on SPARK-4964:
---

Design doc at

https://docs.google.com/a/databricks.com/document/d/1IuvZhg9cOueTf1mq4qwc1fhPb5FVcaRLcyjrtG4XU1k/edit?usp=sharing

> Exactly-once semantics for Kafka
> 
>
> Key: SPARK-4964
> URL: https://issues.apache.org/jira/browse/SPARK-4964
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Cody Koeninger
>
> for background, see 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html
> Requirements:
> - allow client code to implement exactly-once end-to-end semantics for Kafka 
> messages, in cases where their output storage is either idempotent or 
> transactional
> - allow client code access to Kafka offsets, rather than automatically 
> committing them
> - do not assume Zookeeper as a repository for offsets (for the transactional 
> case, offsets need to be stored in the same store as the data)
> - allow failure recovery without lost or duplicated messages, even in cases 
> where a checkpoint cannot be restored (for instance, because code must be 
> updated)
> Design:
> The basic idea is to make an rdd where each partition corresponds to a given 
> Kafka topic, partition, starting offset, and ending offset.  That allows for 
> deterministic replay of data from Kafka (as long as there is enough log 
> retention).
> Client code is responsible for committing offsets, either transactionally to 
> the same store that data is being written to, or in the case of idempotent 
> data, after data has been written.
> PR of a sample implementation for both the batch and dstream case is 
> forthcoming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers

2015-01-05 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14265570#comment-14265570
 ] 

Cody Koeninger commented on SPARK-4960:
---

At that point, it sounds like you're talking about an early filter rather
than an early flatmap.  Should it just be T => Option[T]?

Since this ticket no longer solves the problem raised by SPARK-3146, and it
seems unlikely that patch will ever get merged, what is the concrete plan
for giving users of receiver-based kafka implementations early access to
MessageAndMetadata?

On Mon, Jan 5, 2015 at 7:48 PM, Tathagata Das (JIRA) 



> Interceptor pattern in receivers
> 
>
> Key: SPARK-4960
> URL: https://issues.apache.org/jira/browse/SPARK-4960
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tathagata Das
>
> Sometimes it is good to intercept a message received through a receiver and 
> modify / do something with the message before it is stored into Spark. This 
> is often referred to as the interceptor pattern. There should be general way 
> to specify an interceptor function that gets applied to all receivers. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4960) Interceptor pattern in receivers

2014-12-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260144#comment-14260144
 ] 

Cody Koeninger commented on SPARK-4960:
---

You're saying for an implementation that currently extends e.g.

Receiver[T]

that a user can provide a function

T => Iterable[M]

But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't 
need a user provided function of type 

(K, V) => Iterable[M] 

We need a user provided function of type 

MessageAndMetadata => Iterable[M]  

In other words, a third type parameter.

I'm also not clear on how your proposed solution deals with the 9 different 
overloads of store(), including the one that takes raw serialized bytes.  

At that point I'm not sure that having an interceptor setter defined on a 
parent class makes a lot of sense, because it's the particular subclass that 
knows what its intermediate third type is (MessageAndMetadata in this case), as 
well as which store method(s) it cares about.

Thats part of why I think constructor arguments are actually a cleaner way to 
handle this - kafka can have an "interceptor" argument that defaults to a 
function MessageAndMetadata => Iterable[(K,V)], other implementations can have 
a type signature for the interceptor that makes sense for them.


As an aside, I think it should actually be TraversableOnce, not Iterable.  All 
we care about is being able to call foreach on it once, and the classes that 
implement TraversableOnce are a superset of those that implement iterable.

> Interceptor pattern in receivers
> 
>
> Key: SPARK-4960
> URL: https://issues.apache.org/jira/browse/SPARK-4960
> Project: Spark
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tathagata Das
>
> Sometimes it is good to intercept a message received through a receiver and 
> modify / do something with the message before it is stored into Spark. This 
> is often referred to as the interceptor pattern. There should be general way 
> to specify an interceptor function that gets applied to all receivers. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4964) Exactly-once semantics for Kafka

2014-12-24 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14258670#comment-14258670
 ] 

Cody Koeninger commented on SPARK-4964:
---

Usage example of the dstream for the transactional and idempotent cases:

https://github.com/koeninger/kafka-exactly-once/tree/master



> Exactly-once semantics for Kafka
> 
>
> Key: SPARK-4964
> URL: https://issues.apache.org/jira/browse/SPARK-4964
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Cody Koeninger
>
> for background, see 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html
> Requirements:
> - allow client code to implement exactly-once end-to-end semantics for Kafka 
> messages, in cases where their output storage is either idempotent or 
> transactional
> - allow client code access to Kafka offsets, rather than automatically 
> committing them
> - do not assume Zookeeper as a repository for offsets (for the transactional 
> case, offsets need to be stored in the same store as the data)
> - allow failure recovery without lost or duplicated messages, even in cases 
> where a checkpoint cannot be restored (for instance, because code must be 
> updated)
> Design:
> The basic idea is to make an rdd where each partition corresponds to a given 
> Kafka topic, partition, starting offset, and ending offset.  That allows for 
> deterministic replay of data from Kafka (as long as there is enough log 
> retention).
> Client code is responsible for committing offsets, either transactionally to 
> the same store that data is being written to, or in the case of idempotent 
> data, after data has been written.
> PR of a sample implementation for both the batch and dstream case is 
> forthcoming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4964) Exactly-once semantics for Kafka

2014-12-24 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-4964:
-

 Summary: Exactly-once semantics for Kafka
 Key: SPARK-4964
 URL: https://issues.apache.org/jira/browse/SPARK-4964
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Cody Koeninger


for background, see 
http://apache-spark-developers-list.1001551.n3.nabble.com/Which-committers-care-about-Kafka-td9827.html

Requirements:

- allow client code to implement exactly-once end-to-end semantics for Kafka 
messages, in cases where their output storage is either idempotent or 
transactional
- allow client code access to Kafka offsets, rather than automatically 
committing them
- do not assume Zookeeper as a repository for offsets (for the transactional 
case, offsets need to be stored in the same store as the data)
- allow failure recovery without lost or duplicated messages, even in cases 
where a checkpoint cannot be restored (for instance, because code must be 
updated)

Design:

The basic idea is to make an rdd where each partition corresponds to a given 
Kafka topic, partition, starting offset, and ending offset.  That allows for 
deterministic replay of data from Kafka (as long as there is enough log 
retention).

Client code is responsible for committing offsets, either transactionally to 
the same store that data is being written to, or in the case of idempotent 
data, after data has been written.

PR of a sample implementation for both the batch and dstream case is 
forthcoming.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM

2014-12-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14254100#comment-14254100
 ] 

Cody Koeninger commented on SPARK-3146:
---

This is a real problem for production use, not an abstract concern about what 
users might or might not do if you gave them full access to power that they 
already should have had.

The patch is a working implementation that solves the problem, we've been using 
it in production.

I think the patch needs to be improved to take a function MessageAndMetadata => 
Iterable[R] instead of the current MessageAndMetadata => R (in other words it 
needs to be a flatMap operation instead of the current map).

If for some reason this needs to be added to all receivers, and it needs to be 
done via a setter instead of a constructor argument, I don't understand why, 
but it will solve the problem, so I'm all for it.

If it really needs to be a function (Any, Any) => Iterable[R], I think that's 
worse from a user interface point of view, but again it will solve the problem, 
so I'm all for it.

If someone with the power to merge PRs will say which of those options they 
would be willing to accept, I'll happily implement it.

What I definitely don't want to see is for this ticket to languish for another 
6 months, during which time I keep having to do manual maintenance of a patched 
Spark.

> Improve the flexibility of Spark Streaming Kafka API to offer user the 
> ability to process message before storing into BM
> 
>
> Key: SPARK-3146
> URL: https://issues.apache.org/jira/browse/SPARK-3146
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Saisai Shao
>
> Currently Spark Streaming Kafka API stores the key and value of each message 
> into BM for processing, potentially this may lose the flexibility for 
> different requirements:
> 1. currently topic/partition/offset information for each message is discarded 
> by KafkaInputDStream. In some scenarios people may need this information to 
> better filter the message, like SPARK-2388 described.
> 2. People may need to add timestamp for each message when feeding into Spark 
> Streaming, which can better measure the system latency.
> 3. Checkpointing the partition/offsets or others...
> So here we add a messageHandler in interface to give people the flexibility 
> to preprocess the message before storing into BM. In the meantime time this 
> improvement keep compatible with current API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM

2014-12-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252397#comment-14252397
 ] 

Cody Koeninger commented on SPARK-3146:
---

((K, V), (topicAndPartition, offset)) is all of the public information in a 
MessageAndMetadata object, aside from the decoders, so that's probably ok.  If 
you're trying to come up with a uniform interface (Any, Any) => Iterable[R] 
doesn't seem much better than Any => Iterable[R] though.

Also, just to be clear, the particular example calls you're talking about only 
exist in ReliableKafkaReceiver, not KafkaInputDStream, and we need something 
that works for both.

> Improve the flexibility of Spark Streaming Kafka API to offer user the 
> ability to process message before storing into BM
> 
>
> Key: SPARK-3146
> URL: https://issues.apache.org/jira/browse/SPARK-3146
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Saisai Shao
>
> Currently Spark Streaming Kafka API stores the key and value of each message 
> into BM for processing, potentially this may lose the flexibility for 
> different requirements:
> 1. currently topic/partition/offset information for each message is discarded 
> by KafkaInputDStream. In some scenarios people may need this information to 
> better filter the message, like SPARK-2388 described.
> 2. People may need to add timestamp for each message when feeding into Spark 
> Streaming, which can better measure the system latency.
> 3. Checkpointing the partition/offsets or others...
> So here we add a messageHandler in interface to give people the flexibility 
> to preprocess the message before storing into BM. In the meantime time this 
> improvement keep compatible with current API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3146) Improve the flexibility of Spark Streaming Kafka API to offer user the ability to process message before storing into BM

2014-12-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252299#comment-14252299
 ] 

Cody Koeninger commented on SPARK-3146:
---

Yes, for the specific case of kafka, regardless of the name of the api, the 
user supplied function needs to be of type

MessageAndMetadata => Iterable[R]





> Improve the flexibility of Spark Streaming Kafka API to offer user the 
> ability to process message before storing into BM
> 
>
> Key: SPARK-3146
> URL: https://issues.apache.org/jira/browse/SPARK-3146
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.2, 1.1.0
>Reporter: Saisai Shao
>
> Currently Spark Streaming Kafka API stores the key and value of each message 
> into BM for processing, potentially this may lose the flexibility for 
> different requirements:
> 1. currently topic/partition/offset information for each message is discarded 
> by KafkaInputDStream. In some scenarios people may need this information to 
> better filter the message, like SPARK-2388 described.
> 2. People may need to add timestamp for each message when feeding into Spark 
> Streaming, which can better measure the system latency.
> 3. Checkpointing the partition/offsets or others...
> So here we add a messageHandler in interface to give people the flexibility 
> to preprocess the message before storing into BM. In the meantime time this 
> improvement keep compatible with current API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4122) Add library to write data back to Kafka

2014-12-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252261#comment-14252261
 ] 

Cody Koeninger commented on SPARK-4122:
---

+1 for the idea of making this write from both DStreams and normal RDDs

> Add library to write data back to Kafka
> ---
>
> Key: SPARK-4122
> URL: https://issues.apache.org/jira/browse/SPARK-4122
> Project: Spark
>  Issue Type: Bug
>Reporter: Hari Shreedharan
>Assignee: Hari Shreedharan
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   >