[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72893619
  
  [Test build #26763 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26763/consoleFull)
 for   PR 3798 at commit 
[`1dc2941`](https://github.com/apache/spark/commit/1dc29415e3c0ac23a4207513686dfe5ee5ab2725).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72893647
  
Here's a solution for subclassing ConsumerConfig while still silencing the 
warning.
My son is doing ok(ish) now, thanks for the concern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72902696
  
That's a nifty solution :) I like it.
Lets merge this as soon as the tests pass. Smaller changes like docs, etc. 
, we can do it in the next PR. 
@jerryshao I will add the example in a different PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72902389
  
Glad to hear that your son is doing ok, hope he gets better (okish-- 
great) real real soon. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72907295
  
  [Test build #26763 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26763/consoleFull)
 for   PR 3798 at commit 
[`1dc2941`](https://github.com/apache/spark/commit/1dc29415e3c0ac23a4207513686dfe5ee5ab2725).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class DirectKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) `
  * `class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable `
  * `  case class LeaderOffset(host: String, port: Int, offset: Long)`
  * `class KafkaRDDPartition(`
  * `trait HasOffsetRanges `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread harishreedharan
Github user harishreedharan commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72926364
  
Yay!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72907304
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26763/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72925200
  
Merging this. Thanks so much Cody! 
There will be a PR to fix a few things here and there soon. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3798


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72779779
  
Why did you choose the parameters metadata.broker.list and the 
bootstrap.servers as the required kafka params? I looked at the Kafka docs, 
and it says that for consumers, the necessary properties are 
zookeeper.connect  and group.id. And intuitively the application is 
consuming, so the consumer configs should apply (not group.id, but 
zookeeper.connect). So our interface should also require zookeeper.connect 
and not other two. Isnt it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72782334
  
  [Test build #26701 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26701/consoleFull)
 for   PR 3798 at commit 
[`8c31855`](https://github.com/apache/spark/commit/8c31855cf6b7327c6b6611e715457ba15bb79355).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class DeterministicKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) `
  * `class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable `
  * `  case class LeaderOffset(host: String, port: Int, offset: Long)`
  * `class KafkaRDDPartition(`
  * `trait HasOffsetRanges `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72782343
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26701/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72775833
  
  [Test build #26701 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26701/consoleFull)
 for   PR 3798 at commit 
[`8c31855`](https://github.com/apache/spark/commit/8c31855cf6b7327c6b6611e715457ba15bb79355).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72777123
  
Ohh I meant createStream -- createDirectStream. I would have preferred 
something like createReceiverLessStream but thats a mouthful. I think direct 
is something that comes close without being a mouthful. Had not occurred to me 
until Patrick suggested it.

And the underlying assumptions, I confess are not super concrete. 
Somethings like binary compatiblity issues (ex, do not use scala traits with 
implemented methods) are fairly concrete, where as things about API elegance 
(e.g. rdd.asInstanceOf[KafkaRDD] vs rdd.asInstanceOf[HasOffsetRanges]) are a 
little fuzzy and opinions vary from person to person. Often what seems 
intuitive to me is not intuitive to someone else, even within the key 
committers like Patrick, Michael, Matei, etc. We usually argue about this in 
design docs, get as many eyeballs as possible, and try to reach a consensus. 
Its is indeed a bit fuzzy, but its all towards making the API that we *think* 
will be the best for the developers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72778614
  
  [Test build #26706 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26706/consoleFull)
 for   PR 3798 at commit 
[`59e29f6`](https://github.com/apache/spark/commit/59e29f61cd6a730eeea4e47a5316cbbe47615618).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72780349
  
High level consumers connect to ZK.

Simple consumers (which is what this is using) connect to brokers directly
instead.  See

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

I chose to accept either of the two existing means in Kafka of specifying a
list of seed brokers, rather than making up yet a third way



On Tue, Feb 3, 2015 at 8:36 PM, Tathagata Das notificati...@github.com
wrote:

 Why did you choose the parameters metadata.broker.list and the
 bootstrap.servers as the required kafka params? I looked at the Kafka
 docs, and it says that for consumers, the necessary properties are
 zookeeper.connect and group.id. And intuitively the application is
 consuming, so the consumer configs should apply (not group.id, but
 zookeeper.connect). So our interface should also require
 zookeeper.connect and not other two. Isnt it?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72779779.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72784748
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26706/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72784745
  
  [Test build #26706 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26706/consoleFull)
 for   PR 3798 at commit 
[`59e29f6`](https://github.com/apache/spark/commit/59e29f61cd6a730eeea4e47a5316cbbe47615618).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class DirectKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) `
  * `class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable `
  * `  case class LeaderOffset(host: String, port: Int, offset: Long)`
  * `class KafkaRDDPartition(`
  * `trait HasOffsetRanges `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72787965
  
I think the simplest solution is to assign zookeeper.connect. But you are 
assigning it in KafkaCluster lines 338 - 345. So why is this warning being 
thrown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72789850
  
Hi @tdas , should we add a example to show users how to use this new Kafka 
API correctly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72791220
  
Holy crap! Dont bother about this at all. This can wait. I hope everything
is okay. Take care and all the best!
On Feb 3, 2015 8:45 PM, Cody Koeninger notificati...@github.com wrote:

 The warning is for metadata.broker.list, since its not expected by the
 existing ConsumerConfig (its used by other config classes)

 Couldn't get subclassing to work, the verifiedproperties class it uses is
 very dependent on order of operations during construction.

 I think the simplest thing is a class that is constructed using
 kafkaparams, and uses the static defaults from the ConsumerConfig object.

 I'm currently waiting in an ER with my child with a 105 fever, so won't be
 getting to it for a few hours to say the least.
 On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote:

  I think the simplest solution is to assign zookeeper.connect. But you 
are
  assigning it in KafkaCluster lines 338 - 345. So why is this warning
 being
  thrown?
 
  —
  Reply to this email directly or view it on GitHub
  https://github.com/apache/spark/pull/3798#issuecomment-72787965.
 

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72790044.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72779615
  
Yeah, there's a weird distinction in Kafka between simple consumers and
high level consumers in that they have a lot of common configuration
parameters, but one of them talks directly to brokers and the other goes
through zk.

I'll see if I can make a private subclass of ConsumerConfig to shut that
warning up.

On Tue, Feb 3, 2015 at 8:28 PM, Tathagata Das notificati...@github.com
wrote:

 Hey Cody, I was trying it and I found a odd behavior. It was printing this
 repeatedly.

 15/02/03 18:22:08 WARN VerifiableProperties: Property 
metadata.broker.list is not valid

 I was using this code.

 val kafkaParams = Map[String, String](metadata.broker.list - 
brokerList)
 val lines = KafkaUtils.createNewStream[String, String, StringDecoder, 
StringDecoder](
   ssc, kafkaParams, topicsSet)

 I chose metadata.broker.list from the code in KafkaCluster, because
 without that I was getting exception from the KafkaCluster.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72779120.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72790044
  
The warning is for metadata.broker.list, since its not expected by the
existing ConsumerConfig (its used by other config classes)

Couldn't get subclassing to work, the verifiedproperties class it uses is
very dependent on order of operations during construction.

I think the simplest thing is a class that is constructed using
kafkaparams, and uses the static defaults from the ConsumerConfig object.

I'm currently waiting in an ER with my child with a 105 fever, so won't be
getting to it for a few hours to say the least.
On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote:

 I think the simplest solution is to assign zookeeper.connect. But you are
 assigning it in KafkaCluster lines 338 - 345. So why is this warning being
 thrown?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72787965.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72783141
  
Yeah, more importantly it's so defaults for things like connection timeouts
match what kafka provides.

It's possible to assign fake zookeeper.connect and have it pass
verification, that's what existing code does.

Unfortunately ConsumerConfig has a private constructor so subclassing it in
order for the broker list to pass verification without that warning may
prove to be tricky.  Worst case scenario I'll re-implement a config that
uses the kafka defaults.

On Tue, Feb 3, 2015 at 9:05 PM, Tathagata Das notificati...@github.com
wrote:

 I see. ConsumerConfig is really necessary only for high-level consumer,
 but you are using it configure stuff in the low level consumer as well.
 That is so that you dont have to introduce parameter strings to configure
 them yourselves.

 Is it possible to assign fake but verified zookeeper.connect ?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72782434.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72779120
  
Hey Cody, I was trying it and I found a odd behavior. It was printing this 
repeatedly. 
```
15/02/03 18:22:08 WARN VerifiableProperties: Property metadata.broker.list 
is not valid
```

I was using this code. 
```
val kafkaParams = Map[String, String](metadata.broker.list - brokerList)
val lines = KafkaUtils.createNewStream[String, String, StringDecoder, 
StringDecoder](
  ssc, kafkaParams, topicsSet)
```
I chose metadata.broker.list from the code in KafkaCluster, because 
without that I was getting exception from the KafkaCluster.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72782434
  
I see. ConsumerConfig is really necessary only for high-level consumer, but 
you are using it configure stuff in the low level consumer as well. That is so 
that you dont have to introduce parameter strings to configure them yourselves. 

Is it possible to assign fake but verified zookeeper.connect ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24010956
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23988445
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
--- End diff --

I noticed that KafkaRDD isn't exposed, so maybe this is why. Not sure I see 
a big issue with exposing KafkaRDD and its constructor given that it's 
basically the same level of visibility as this static factory function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23988871
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, 
untilOffset)
+}
+
+object OffsetRange {
+  private[spark]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
--- End diff --

It's confusing to have both `create` and the apply methods here. Why not 
just have one way of creating these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23989829
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
--- End diff --

How is this message handler different than having the user just call a map 
function on a returned RDD? It seems a little risky because this is exposing a 
Kafka class in the byte code signature, which they could relocate in a future 
release in a way that causes this to break for callers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23989786
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
--- End diff --

Is this version of the constructor assuming that they caller has their own 
code for finding the leaders? From what I can tell we've locked down the 
utility function for doing that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23990370
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23988318
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
--- End diff --

I've never seen a trait mixin in a return type. What does this actually 
mean? I looked at the compiled byte code and the byte code signature is still 
RDD.

Can we just return a `KafkaRDD` here? If this is enforced somehow by the 
scala compiler, returning an interface here ties our hands in the future, 
because we can't add functionality to the returned type without breaking binary 
compatibility. For instance, we may want to return an RDD that has additional 
methods beyond just accessing its offset ranges.

I ran a simple example and I couldn't see any byte code reference to the 
mixed in trait:

```
trait Trait {}

class Class extends Trait {}

object Object {
  def getTrait: Class with Trait = {new Class()}
}

 javap -v Object
  public static Class getTrait();
flags: ACC_PUBLIC, ACC_STATIC
Code:
  stack=1, locals=0, args_size=0
 0: getstatic #16 // Field 
Object$.MODULE$:LObject$;
 3: invokevirtual #18 // Method 
Object$.getTrait:()LClass;
 6: areturn   
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23988994
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Host info for the leader of a Kafka TopicAndPartition */
+final class Leader private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** kafka hostname */
+val host: String,
+/** kafka host's port */
+val port: Int) extends Serializable
+
+object Leader {
+  def create(topic: String, partition: Int, host: String, port: Int): 
Leader =
--- End diff --

Similar with offset ranges, can't we just have a single way to construct 
these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23988976
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
--- End diff --

This comment might be more helpful to include where `OffsetRangeTuple` is 
defined rather than here. I spent a long time trying to figure out why this 
extra class existed.

Also, can you give a bit more detail. Not sure I see why you can't recover 
from a checkpoint safely provided that the recovering JVM has the class 
`OffsetRangeTuple` defined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23989107
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, 
untilOffset)
+}
+
+object OffsetRange {
+  private[spark]
+  type OffsetRangeTuple = (String, Int, Long, Long)
--- End diff --

Can you group this at the bottom with the related `apply` method? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23989943
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

Isn't the returned RDD of type `RDD[R]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72749763
  
@koeninger Thank you very much for all the changes. They are looking good. 
Unfortunately the issue with `createRDD` returning `RDD[] with OffsetRange` 
(i.e., the issue that @pwendell raised) could be a problem in the future in 
terms of binary compatibility. Basically, we have not used such things in the 
rest of Spark to keep things as Java-friendly and binary compatible as 
possible. Also in the generated Java doc this looks scary. So here is an 
alternate suggestion that I am trying to implement on top of your PR branch. 
How about this. We effectively combine KafkaRDD and HasOffsetRanges into a 
abstract class. 


```
abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: 
SparkContext) 
   extends RDD[T](sc, Nil)

private[kafka]
class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] {
  ...
}

KafkaUtils.createRDD(...simple one without messageHandler...): KafkaRDD[(K, 
V)] = {
   // return KafkaRDDImpl[K, V, KD, VD, (K, V)]
}

KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = {
   // return KafkaRDDImpl[K, V, KD, VD, R]
}

```

Advantages
- No binary compatibility issues
- Easy to read from Java
- KafkaRDD implementation and constructor all hidden as before
- Can still extend KafkaRDD to expose more methods in future.  

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019631
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
--- End diff --

That's correct on both counts.

If you don't provide a way for clients to supply offset ranges and leaders 
at the same, you're forcing twice the number of remote calls (because the usual 
way to get the end of the offset range is to talk to the leader).

Yes, there's no way for people to actually use this currently unless they 
have their own copy of the functionality provided by KafkaCluster.  In my case, 
I'm just going to remove SparkException from KafkaCluster, since it's the only 
spark dependency, and distribute it as a separate jar under a different 
namespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24018460
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
--- End diff --

There is definitely a bytecode difference, try running diff on the class 
files.  It statically guarantees you can call .hasOffsetRanges on the thing 
returned from createRDD.  Without it, you'd have to cast at runtime.  If you 
add e.g. a .chzburger method to KafkaRDD, you wont be able to call it without 
asInstanceOf.  If you then made a Chzburger interface, implement it on 
KafkaRDD, change the return type to RDD with HasOffsetranges with Chzburger, 
you would.  I hear your concern about binary compatibility.

As far as exposing KafkaRDD instead... that's the way I originally designed 
things.

The current design is the result of a compromise between TD's desire as a 
maintainer to hide as much as possible, and my desire as a user to expose 
what's necessary to get my job done.  You can usually tell it's a good 
compromise if no one is happy :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72691392
  
Regarding naming, I agree.  The name has been a point of discussion for a
month, how to get some consensus?

Regarding Java wrappers, there have already been a number of changes
directed at java compatibility (array of Leader instead of a map[topic,
broker], .create in addition to .apply).  I wonder how relevant those are
if we're doing separate java wrappers (which yes, I agree should be in a
follow-on PR)

On Tue, Feb 3, 2015 at 3:13 AM, Patrick Wendell notificati...@github.com
wrote:

 I took a pass through the public API. I'm not very familiar with Kafka so
 it was somewhat slow going. However, some reactions:

1. We should try to tighten, simplify, and clarify the way we name and
document everything in this public API. Most of the comments were about
this. The most important IMO is coming up with a good name for the new
streams returned and clearly explaining how they differ from the old 
Kafka
stream. To me, the main differences seems to be in the way we (a) 
decide
what goes into which batch and (b) actually ingest the data. I proposed
javadoc and naming scheme that emphasizing that distinction.
2. Is there plans to add a Java and Python wrappers here next? Those
are straightforward and it would be good to have them. Maybe in a 
follow on
PR?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72617088.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24017652
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019904
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

I think we want the first sentence of the doc to convey why someone would 
choose this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019256
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
--- End diff --

There was discussion of this earlier that has since gotten buried.  Here's 
the gist:

https://gist.github.com/koeninger/561a61482cd1b5b3600c

The classloader being used for restoring the checkpoint doesn't have that 
class, probably because it's in external (and thus included in the user 
assembly), rather than one of the spark jars thats on the default classpath.

I went ahead and duplicated that comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24015154
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24017456
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

There are 2 methods, one that takes a messagehandler (and thus returns 
RDD[R], and one that doesnt take a messagehandler as an argument, but provides 
a default one, so instead returning RDD[(K, V)]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019815
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24018579
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, 
untilOffset)
+}
+
+object OffsetRange {
+  private[spark]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
--- End diff --

TD thought that a static method named .create was more idiomatic for java.  
It's obviously more idiomatic for scala to have a .apply method since the 
syntax sugar for it is baked into the language.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24020676
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23999633
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread dibbhatt
Github user dibbhatt commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24001371
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  var sc: SparkContext = _
+  before {
+setupKafka()
+  }
+
+  after {
+if (sc != null) {
+  sc.stop
+  sc = null
+}
+tearDownKafka()
+  }
+
+  test(Kafka RDD) {
+val sparkConf = new 
SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName)
+sc = new SparkContext(sparkConf)
+val topic = topic1
+val sent = Map(a - 5, b - 3, c - 10)
+createTopic(topic)
+produceAndSendMessage(topic, sent)
+
+val kafkaParams = Map(metadata.broker.list - 
slocalhost:$brokerPort,
+  group.id - stest-consumer-${Random.nextInt(1)})
+
+val kc = new KafkaCluster(kafkaParams)
+
+val rdd = getRdd(kc, Set(topic))
+// this is the lots of messages case
+// make sure we get all of them
+assert(rdd.isDefined)
+assert(rdd.get.count === sent.values.sum)
+
+kc.setConsumerOffsets(
--- End diff --

Here you are committing the RDD offset to ZK via SimpleConsumer 
commitOffset call for next getRdd call . If committOffset call fails , what to 
do ? Isn't it breaking the logic of keeping the offset along with RDD concept 
mentioned in the design document ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72709829
  
  [Test build #26658 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26658/consoleFull)
 for   PR 3798 at commit 
[`0df3ebe`](https://github.com/apache/spark/commit/0df3ebe1eed5b149c03a828db621fbc60edf).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class DeterministicKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) `
  * `class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable `
  * `  case class LeaderOffset(host: String, port: Int, offset: Long)`
  * `class KafkaRDDPartition(`
  * `trait HasOffsetRanges `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72709851
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26658/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24021621
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72696610
  
  [Test build #26658 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26658/consoleFull)
 for   PR 3798 at commit 
[`0df3ebe`](https://github.com/apache/spark/commit/0df3ebe1eed5b149c03a828db621fbc60edf).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24020938
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  val brokerHost = localhost
+
+  val kafkaParams = Map(metadata.broker.list - 
s$brokerHost:$brokerPort)
+
+  val kc = new KafkaCluster(kafkaParams)
+
+  val topic = kcsuitetopic + Random.nextInt(1)
+
+  val topicAndPartition = TopicAndPartition(topic, 0)
+
+  before {
+setupKafka()
+createTopic(topic)
+produceAndSendMessage(topic, Map(a - 1))
+  }
+
+  after {
+tearDownKafka()
+  }
+
+  test(metadata apis) {
+val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+assert(leader(topicAndPartition) === (brokerHost, brokerPort), didn't 
get leader)
+
+val parts = kc.getPartitions(Set(topic)).right.get
+assert(parts(topicAndPartition), didn't get partitions)
+  }
+
+  test(leader offset apis) {
+val earliest = 
kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+assert(earliest(topicAndPartition).offset === 0, didn't get earliest)
+
+val latest = 
kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+assert(latest(topicAndPartition).offset === 1, didn't get latest)
+  }
+
+  test(consumer offset apis) {
+val group = kcsuitegroup + Random.nextInt(1)
+
+val offset = Random.nextInt(1)
+
+val set = kc.setConsumerOffsets(group, Map(topicAndPartition - 
offset))
+assert(set.isRight, didn't set consumer offsets)
+
--- End diff --

I'm not sure exactly what the question here is, but this test is just 
verifying that the consumer offset apis work.  They aren't publicly exposed, so 
the question of how people might misuse them is somewhat premature.

That being said, the reason you'd typically want to use this api would be 
for interop with existing kafka monitoring tools that expect offsets in ZK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24021039
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  var sc: SparkContext = _
+  before {
+setupKafka()
+  }
+
+  after {
+if (sc != null) {
+  sc.stop
+  sc = null
+}
+tearDownKafka()
+  }
+
+  test(Kafka RDD) {
+val sparkConf = new 
SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName)
+sc = new SparkContext(sparkConf)
+val topic = topic1
+val sent = Map(a - 5, b - 3, c - 10)
+createTopic(topic)
+produceAndSendMessage(topic, sent)
+
+val kafkaParams = Map(metadata.broker.list - 
slocalhost:$brokerPort,
+  group.id - stest-consumer-${Random.nextInt(1)})
+
+val kc = new KafkaCluster(kafkaParams)
+
+val rdd = getRdd(kc, Set(topic))
+// this is the lots of messages case
+// make sure we get all of them
+assert(rdd.isDefined)
+assert(rdd.get.count === sent.values.sum)
+
+kc.setConsumerOffsets(
--- End diff --

See previous answer.

Also, there's nothing inherently wrong with keeping offsets in ZK for the 
idempotent case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24029555
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24030535
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
--- End diff --

Created this so we don't lose track: 
https://issues.apache.org/jira/browse/SPARK-5569


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24029025
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
--- End diff --

It's just confusing though because the byte code reutrn type does not 
reflect the presence of the trait. I guess it is embbedded in the ScalaSig for 
the function. In any case, let me talk to TD, becuase this is an issue wrt 
binary compatiblity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24030347
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24031550
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

The comment about unused R was referring to a prior version of the PR that
had a copy-pasted type-level R parameter even for the version that returned
RDD[(K, V)].  Github probably just got confused because the comment wasn't
attached to the particular line in question.

Pretty sure things are correct at this point.

On Tue, Feb 3, 2015 at 1:05 PM, Patrick Wendell notificati...@github.com
wrote:

 In
 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 https://github.com/apache/spark/pull/3798#discussion_r24029463:

  +   * so that you can control exactly-once semantics.
  +   * @param sc SparkContext object
  +   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
  +   * configuration parameters/a.
  +   *   Requires metadata.broker.list or bootstrap.servers to be 
set with Kafka broker(s),
  +   *   NOT zookeeper servers, specified in host1:port1,host2:port2 
form.
  +   * @param batch Each OffsetRange in the batch corresponds to a
  +   *   range of offsets for a given Kafka topic/partition
  +   */
  +  @Experimental
  +  def createRDD[
  +K: ClassTag,
  +V: ClassTag,
  +U : Decoder[_]: ClassTag,
  +T : Decoder[_]: ClassTag,
  +R: ClassTag] (

 Yeah, makes sense. But the comment here suggests R is not used, however,
 I see R being used in the return type. So that was my point.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798/files#r24029463.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24028967
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

That applies to people who already know what createStream does. But think 
about the people who is looking at KafkaUtilsl forthe first time, and happens 
to look upon this method first. Unless the first sentence conveys what this 
does, they will get extremely confused. 

As a developer I think the question i need answering first is what does 
this do. what does this do different from the other one comes second, and 
only after I know what this one does. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24029286
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24033509
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

The documentation has already been changed several times since your 
previous comments.  The current version of it doesn't make any comparison to 
existing createStream calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24028643
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24029463
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

Yeah, makes sense. But the comment here suggests `R` is not used, however, 
I see `R` being used in the return type. So that was my point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24029652
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
--- End diff --

Thanks cody - I am going to create a JIRA to document this since it's not 
obvious, not sure if it's a bug or a feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24031208
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72758821
  
Besides introducing 2 classes where 1 would do, it implies that there are
(or could be) multiple implementations of the abstract class.  You're not
using it because you're actually planning for subclassing, you're using it
as a workaround for returning a slightly less complicated type from a
single method, where there's an alternative... just return RDD[(K, V)] for
that one method.

This really is a situation where there's only 1 implementation for the
foreseeable future, and a single final concrete class would be cleaner.


On Tue, Feb 3, 2015 at 5:21 PM, Tathagata Das notificati...@github.com
wrote:

 I dont get it, what's the complication with abstract classes?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72758029.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72759859
  
But of course there can be multiple implementations! For example, there is 
both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so that 
the code path for existing uses is not disturbed when we are introducing 
experimental code paths that are optionally enabled with flags. We never 
envisioned that happening, but when it occur, we could do this because the 
KafkaReceiver was not exposed, only the Receiver interface was exposed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72758029
  
I dont get it, what's the complication with abstract classes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72760900
  
To put it another way, the type you return has to be public.

If you return a public abstract class, what are you going to do when
someone else subclasses it?

Making it a final concrete class doesn't have that issue.
On Feb 3, 2015 5:34 PM, Tathagata Das notificati...@github.com wrote:

 But of course there can be multiple implementations! For example, there is
 both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so
 that the code path for existing uses is not disturbed when we are
 introducing experimental code paths that are optionally enabled with 
flags.
 We never envisioned that happening, but when it occur, we could do this
 because the KafkaReceiver was not exposed, only the Receiver interface was
 exposed.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72759859.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72754442
  
Like patrick said, I really don't see any reason not to just expose
KafkaRDD.  You can still hide its constructor without making a superflous
abstract class, and you can still make another subclass of KafkaRDD later
if you need to.

Even if you don't want the static createRDD method to return a KafkaRDD, we
can just take the with HasOffsetRanges off and people who care about
getting to the offsets can cast it (they'll have to cast it for the stream
case anyway)

On Tue, Feb 3, 2015 at 4:30 PM, Tathagata Das notificati...@github.com
wrote:

 @koeninger https://github.com/koeninger Thank you very much for all the
 changes. They are looking good. Unfortunately the issue with createRDD
 returning RDD[] with OffsetRange (i.e., the issue that @pwendell
 https://github.com/pwendell raised) could be a problem in the future in
 terms of binary compatibility. Basically, we have not used such things in
 the rest of Spark to keep things as Java-friendly and binary compatible as
 possible. Also in the generated Java doc this looks scary. So here is an
 alternate suggestion that I am trying to implement on top of your PR
 branch. How about this. We effectively combine KafkaRDD and 
HasOffsetRanges
 into a abstract class.

 abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: 
SparkContext)
extends RDD[T](sc, Nil)

 private[kafka]
 class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] {
   ...
 }

 KafkaUtils.createRDD(...simple one without messageHandler...): 
KafkaRDD[(K, V)] = {
// return KafkaRDDImpl[K, V, KD, VD, (K, V)]
 }

 KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = 
{
// return KafkaRDDImpl[K, V, KD, VD, R]
 }


 Advantages

- No binary compatibility issues
- Easy to read from Java
- KafkaRDD implementation and constructor all hidden as before
- Can still extend KafkaRDD to expose more methods in future.

 What do you think?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72749763.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72756440
  
I spent some time talking to Patrick offline about this. If we expose the 
KafkaRDD as is (while keeping its constructor private), then the simplified 
createRDD would be 
```
KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)]
```
Imagine how one would use it in Java. 
```
KafkaRDDString, String, StringDecoder, StringDecode, Product2String, 
String rdd = KafkaUtils.createRDD()
```
That's not very Java friendly if you ask a Java developer. And we a huge 
fraction of the community as Java developers. Furthemore, we want to add Python 
API as well, and that also requires the interfaces to be Java-friendly. Here is 
the alternative (I think) with what I proposed.
```
KafkaRDDString, String rdd = KafkaUtils.createRDD()
```
Much simpler. 


Regarding casting, there are two cases, 
1. casting RDD generated from createRDD - If we take off `HasOffsetRanges` 
(`KafkaUtils.createRDD` returns only RDD), then user have to cast. But if we 
return abstract class KafkaRDD, then no casting necessary.
2. casting RDD in `DStream.foreachRDD` - Casting is necessary either ways. 
But isnt it more intuitive to write `rdd.asInstanceOf[KafkaRDD]` than 
`rdd.asInstanceOf[HasOffsetRanges]`?







---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72757731
  
Just make the simplified createRDD return a static type of RDD[(K, V)],
that's what I'm saying.

You're already going to have to deal with those other type parameters in
java if you want to call a more complex version of createRDD, because you
have to know about the serializers and message handler.  Make the more
complex version return KafkaRDD.

I agree that casting to KafkaRDD is better than casting to
HasOffsetRanges.  The abstract class is an unnecessary complication though.


On Tue, Feb 3, 2015 at 5:11 PM, Tathagata Das notificati...@github.com
wrote:

 I spent some time talking to Patrick offline about this. If we expose the
 KafkaRDD as is (while keeping its constructor private), then the 
simplified
 createRDD would be

 KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)]

 Imagine how one would use it in Java.

 KafkaRDDString, String, StringDecoder, StringDecode, Product2String, 
String rdd = KafkaUtils.createRDD()

 That's not very Java friendly if you ask a Java developer. And we a huge
 fraction of the community as Java developers. Furthemore, we want to add
 Python API as well, and that also requires the interfaces to be
 Java-friendly. Here is the alternative (I think) with what I proposed.

 KafkaRDDString, String rdd = KafkaUtils.createRDD()

 Much simpler.

 Regarding casting, there are two cases,
 1. casting RDD generated from createRDD - If we take off HasOffsetRanges (
 KafkaUtils.createRDD returns only RDD), then user have to cast. But if we
 return abstract class KafkaRDD, then no casting necessary.
 2. casting RDD in DStream.foreachRDD - Casting is necessary either ways.
 But isnt it more intuitive to write rdd.asInstanceOf[KafkaRDD] than
 rdd.asInstanceOf[HasOffsetRanges]?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72756440.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72761134
  
In terms of number of classes, the abstract KafkaRDD is essentially 
replacing HasOffsetRanges. There is no need for this HasOffsetRanges trait that 
gets used only (assuming createRDD returns RDD[(K,V)]) inside 
```
DStream.foreachRDD { rdd = 
val offsetRanges rdd.asInstaneOf[HasOffsetRanges].getOffsetRanges() 
 
} 
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72770451
  
Okay here are the two options.

1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses 
rdd.asInstanceOf[HasOffsetRanges]
2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and DStream.foreachRDD 
uses rdd.asInstanceOf[KafkaRDD[_]]

I think I am okay with either one. Stepping back, my original concern was 
returning something that had no binary compatibility issues. Both solution 
suffices. Between these two, since you feel so strongly against (2), lets go 
with (1). 






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72617088
  
I took a pass through the public API. I'm not very familiar with Kafka so 
it was somewhat slow going. However, some reactions:

1. We should try to tighten, simplify, and clarify the way we name and 
document everything in this public API. Most of the comments were about this. 
The most important IMO is coming up with a good name for the new streams 
returned and clearly explaining how they differ from the old Kafka stream. To 
me, the main differences seems to be in the way we (a) decide what goes into 
which batch and (b) actually ingest the data. I proposed javadoc and naming 
scheme that emphasizing that distinction.
2. Is there plans to add a Java and Python wrappers here next? Those are 
straightforward and it would be good to have them. Maybe in a follow on PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23990592
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

For the top level doc here isn't it something like:

```
Returns a Kafka stream that computes a specific offset range for each
partition, then reads those offsets directly from Kafka without the use of 
receivers.

Because this stream deterministically assigns offset ranges to specific 
batches, it can
support exactly once semantics (as defined in the programming guide). 
Specifically,
a streaming program experiences task failures or restarts from a job 
failure, output
batches appear as if each record was ingested and processed exactly once.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72773257
  
To be clear, I'm ok with any solution that gives me access to what I need,
which in this case are offsets.

What's coming across as me feeling strongly about it is, I think, really
because I'm not clear on what your principles with regard to the interface
are ... well, that and frustration because it's been changed 3 times
already :)

For instance, why would you be willing to take on the fragile base class
maintenance problem in exposing KafkaRDD as something that could be
subclassed... but not ok with exposing the DStream (so that people could
override the batch generation policy)?

In the interests of moving this forward, if we're really just talking about
changing KafkaUtil's use of

RDD[..] with HasOffsetRanges
to
RDD[..]

I can make that that change


On Tue, Feb 3, 2015 at 7:02 PM, Tathagata Das notificati...@github.com
wrote:

 Okay here are the two options.

1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses
rdd.asInstanceOf[HasOffsetRanges]
2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and
DStream.foreachRDD uses rdd.asInstanceOf[KafkaRDD[_]]

 I think I am okay with either one. Stepping back, my original concern was
 returning something that had no binary compatibility issues. Both solution
 suffices. Between these two, since you feel so strongly against (2), lets
 go with (1).

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72770451.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72775013
  
Yes, Cody, I agree that this has changed a number of time. But this is not 
unusual for adding such a significant, publicly visible API such as this. 
USually this level of API arguments occurs over design docs over a period of 
time. However, since we were short on time in this matter and not all 
implementation complexities were evident from the beginning, this had to be 
done over the code. I can thank you enough for your cooperation!!!

So the two main changes are 

1. sc.createNewStream (maybe rename DeterministicKafkaStream to Direct )
2. createRDD returns RDD

There are smaller suggestions and issues regarding documentation, 
indentation, etc. However, I am willing to address them in another PR. Let's 
merge this after you have made these two changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72775631
  
Hey man, I'd rather talk about the code anyway.  I think there's just 
something I'm missing as far as your underlying assumptions about interfaces go 
:)  Thanks for your help on this.

Just made the createRDD change.  Not clear on what createNewStream change 
you mean. Rename it to createStream, or something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23971703
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
--- End diff --

Lets rename this configuration. Its very confusing to overload this 
configuration because the system does not behave in the same way. 
`receiver.maxRate` applies to receivers which is not used by this stream. In 
fact the mechanism used here is very specific to this input stream and applies 
to not other input stream. So lets rename it to something like 
`spark.streaming.kafka.maxRatePerPartition`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972368
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
--- End diff --

```
def findLeaders(
 topicAndPartitions: Set[TopicAndPartition]  // indent of 4
  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {   // indent of 2
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72575053
  
  [Test build #26571 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26571/consoleFull)
 for   PR 3798 at commit 
[`4354bce`](https://github.com/apache/spark/commit/4354bced65a7f37a51bde9081d8d19dc9b9316cd).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23967901
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
+val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+// Either.fold would confuse @tailrec, do it manually
+if (o.isLeft) {
+  val err = o.left.get.toString
+  if (retries = 0) {
+throw new Exception(err)
+  } else {
+log.error(err)
+Thread.sleep(kc.config.refreshLeaderBackoffMs)
+latestLeaderOffsets(retries - 1)
+  }
+} else {
+  o.right.get
+}
+  }
+
+  protected def clamp(
+leaderOffsets: Map[TopicAndPartition, LeaderOffset]): 
Map[TopicAndPartition, LeaderOffset] = {
+maxMessagesPerPartition.map { mmp =
+  leaderOffsets.map { case (tp, lo) =
+tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp, 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972007
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
+val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+// Either.fold would confuse @tailrec, do it manually
+if (o.isLeft) {
+  val err = o.left.get.toString
+  if (retries = 0) {
+throw new Exception(err)
+  } else {
+log.error(err)
+Thread.sleep(kc.config.refreshLeaderBackoffMs)
+latestLeaderOffsets(retries - 1)
+  }
+} else {
+  o.right.get
+}
+  }
+
+  protected def clamp(
--- End diff --

Please add some documentation on what this method does. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23971976
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
--- End diff --

This function should be part of KafkaCluster. The getLatestLeaderOffset 
could take an optional parameter of retries. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972917
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
--- End diff --

Can you break this up into multiple steps 
```
val x = getPartitionMetadata(topcis).right
val y = x.flatMap { tms: ...
}
```
where ,  are semantically meaningful names.
 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23974964
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
--- End diff --

All of the methods in kafka cluster are currently based on the idea of 
trying (at most) all of the brokers, then giving up and letting the caller 
establish an error handling policy.

Sleeping and retrying may not in general be the correct error handling 
policy.  I know it is for the input dstream's usage right here, but that 
doesn't make sense to bake into KafkaCluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972944
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
--- End diff --

By loop do you mean the findLeaders method?  Yeah, it's returning the 
leaders as a map of TopicAndPartition to (host, port)

The nesting directly maps to the nesting of the kafka api's return 
values... pretty much all of the nesting in this class is just grabbing stuff 
out of kafka data structures.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972979
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23971788
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
+val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+// Either.fold would confuse @tailrec, do it manually
+if (o.isLeft) {
+  val err = o.left.get.toString
+  if (retries = 0) {
+throw new Exception(err)
--- End diff --

Exception -- SparkException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972523
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
--- End diff --

Its not obvious what this loop is doing. What is the result? leaders? 
The loop is also nested quite deep to understand. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23973072
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23973375
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23973371
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23973357
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23973351
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keySet)
+val err = new Err
+err.append(new 

  1   2   3   >