[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113690514
  
Wait, oh, the title, please fix order :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113690492
  
Merging this to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113682475
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113682450
  
  [Test build #35331 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35331/console)
 for   PR 6632 at commit 
[`321340d`](https://github.com/apache/spark/commit/321340d6e88bd424d62c1417d2f2a2111e7ac986).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  // class ParentClass(parentField: Int)`
  * `  // class ChildClass(childField: Int) extends ParentClass(1)`
  * `  // If the class type corresponding to current slot has 
writeObject() defined,`
  * `  // then its not obvious which fields of the class will be 
serialized as the writeObject()`
  * `abstract class GeneratedClass `
  * `case class Bin(child: Expression)`
  * `case class Md5(child: Expression)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113676707
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113676455
  
  [Test build #35329 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35329/console)
 for   PR 6632 at commit 
[`5a05d0f`](https://github.com/apache/spark/commit/5a05d0f633b66ffe42f8e7bb8f4e09308d79fa29).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  // class ParentClass(parentField: Int)`
  * `  // class ChildClass(childField: Int) extends ParentClass(1)`
  * `  // If the class type corresponding to current slot has 
writeObject() defined,`
  * `  // then its not obvious which fields of the class will be 
serialized as the writeObject()`
  * `abstract class GeneratedClass `
  * `case class Bin(child: Expression)`
  * `case class Md5(child: Expression)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113657139
  
  [Test build #35331 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35331/consoleFull)
 for   PR 6632 at commit 
[`321340d`](https://github.com/apache/spark/commit/321340d6e88bd424d62c1417d2f2a2111e7ac986).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113656771
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113656747
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113656191
  
  [Test build #35329 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35329/consoleFull)
 for   PR 6632 at commit 
[`5a05d0f`](https://github.com/apache/spark/commit/5a05d0f633b66ffe42f8e7bb8f4e09308d79fa29).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113656030
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113656013
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32872739
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
+
+// size-related method optimizations return sane results
+assert(rdd.count === messages.size)
+assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+assert(! rdd.isEmpty)
+assert(rdd.take(1).size === 1)
+assert(messages(rdd.take(1).head._2))
--- End diff --

Shouldnt the test be stronger that it return the expected message from the 
right offset and not just any of the messages? Basically if there is a bug in 
the code where take(1) returns the last message in the offset range rather than 
the first message, it wont be caught. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32872490
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
+
+// size-related method optimizations return sane results
+assert(rdd.count === messages.size)
+assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+assert(! rdd.isEmpty)
+assert(rdd.take(1).size === 1)
+assert(messages(rdd.take(1).head._2))
--- End diff --

It's asserting that item taken from the rdd is a member of the set of
messages sent

On Fri, Jun 19, 2015 at 4:07 PM, Tathagata Das 
wrote:

> In
> 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
> :
>
> > @@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
> >
> >  val received = rdd.map(_._2).collect.toSet
> >  assert(received === messages)
> > +
> > +// size-related method optimizations return sane results
> > +assert(rdd.count === messages.size)
> > +assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
> > +assert(! rdd.isEmpty)
> > +assert(rdd.take(1).size === 1)
> > +assert(messages(rdd.take(1).head._2))
>
> What does this check? Shouldnt it check that rdd.take(1) === "the" //
> whatever is expected
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113645504
  
Just a couple of more comments on the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32869403
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
+
+// size-related method optimizations return sane results
+assert(rdd.count === messages.size)
+assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+assert(! rdd.isEmpty)
--- End diff --

There is not check whether isEmpty is successful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32869380
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
+
+// size-related method optimizations return sane results
+assert(rdd.count === messages.size)
+assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+assert(! rdd.isEmpty)
+assert(rdd.take(1).size === 1)
+assert(messages(rdd.take(1).head._2))
--- End diff --

What does this check? Shouldnt it check that `rdd.take(1) === "the"  // 
whatever is expected` 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113644879
  
  [Test build #35310 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35310/console)
 for   PR 6632 at commit 
[`f68bd32`](https://github.com/apache/spark/commit/f68bd3266df27fc8238195ac443c3e2cdb37803a).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  // class ParentClass(parentField: Int)`
  * `  // class ChildClass(childField: Int) extends ParentClass(1)`
  * `  // If the class type corresponding to current slot has 
writeObject() defined,`
  * `  // then its not obvious which fields of the class will be 
serialized as the writeObject()`
  * `abstract class GeneratedClass `
  * `case class Bin(child: Expression)`
  * `case class Md5(child: Expression)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32869331
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
+
+// size-related method optimizations return sane results
+assert(rdd.count === messages.size)
+assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+assert(! rdd.isEmpty)
--- End diff --

nit: extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113644969
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32869239
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -158,15 +158,30 @@ object KafkaUtils {
 
   /** get leaders for the given offset ranges, or throw an exception */
   private def leadersForRanges(
-  kafkaParams: Map[String, String],
+  kc: KafkaCluster,
   offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, 
Int)] = {
-val kc = new KafkaCluster(kafkaParams)
 val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-val leaders = kc.findLeaders(topics).fold(
-  errs => throw new SparkException(errs.mkString("\n")),
-  ok => ok
-)
-leaders
+val leaders = kc.findLeaders(topics)
+KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
--- End diff --

Oh I felt it was fairly general utility function for Kafka to check whether 
any offset range is valid.
Nonetheless I am good to go with this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113611260
  
  [Test build #35310 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35310/consoleFull)
 for   PR 6632 at commit 
[`f68bd32`](https://github.com/apache/spark/commit/f68bd3266df27fc8238195ac443c3e2cdb37803a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113610636
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113610662
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32859841
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -399,7 +418,7 @@ object KafkaUtils {
 val kc = new KafkaCluster(kafkaParams)
 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
 
-(for {
+KafkaCluster.checkErrors(for {
--- End diff --

that checkErrors method is just giving a name to the repeated pattern of 
folding over the EIther returned from the kafka cluster api, to either unwrap 
it to a result or throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32859740
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -158,15 +158,30 @@ object KafkaUtils {
 
   /** get leaders for the given offset ranges, or throw an exception */
   private def leadersForRanges(
-  kafkaParams: Map[String, String],
+  kc: KafkaCluster,
   offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, 
Int)] = {
-val kc = new KafkaCluster(kafkaParams)
 val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-val leaders = kc.findLeaders(topics).fold(
-  errs => throw new SparkException(errs.mkString("\n")),
-  ok => ok
-)
-leaders
+val leaders = kc.findLeaders(topics)
+KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
--- End diff --

Those are just repeated blocks of code in KafkaUtils because it has 
duplicate constructors that are doing similar things.  I don't really feel 
they're generally useful utilities (unlike the folding / checkErrors, which 
happens in several places)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113597869
  
I think its almost good to go. Few minor points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32855579
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -399,7 +418,7 @@ object KafkaUtils {
 val kc = new KafkaCluster(kafkaParams)
 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
 
-(for {
+KafkaCluster.checkErrors(for {
--- End diff --

What is this for? And the same above comment applies, reduce nesting by 
putting `for-yield` in separate val


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32854988
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -158,15 +158,30 @@ object KafkaUtils {
 
   /** get leaders for the given offset ranges, or throw an exception */
   private def leadersForRanges(
-  kafkaParams: Map[String, String],
+  kc: KafkaCluster,
   offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, 
Int)] = {
-val kc = new KafkaCluster(kafkaParams)
 val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-val leaders = kc.findLeaders(topics).fold(
-  errs => throw new SparkException(errs.mkString("\n")),
-  ok => ok
-)
-leaders
+val leaders = kc.findLeaders(topics)
+KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
--- End diff --

Shouldnt methods like checkOffsets be put present insides KafkaCluster. I 
dont like that the fact that we are cluttering the KafkaUtils class with so 
much private utility functions. Putting them in KafkaCluster may be cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32854375
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -158,15 +158,30 @@ object KafkaUtils {
 
   /** get leaders for the given offset ranges, or throw an exception */
   private def leadersForRanges(
-  kafkaParams: Map[String, String],
+  kc: KafkaCluster,
   offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, 
Int)] = {
-val kc = new KafkaCluster(kafkaParams)
 val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-val leaders = kc.findLeaders(topics).fold(
-  errs => throw new SparkException(errs.mkString("\n")),
-  ok => ok
-)
-leaders
+val leaders = kc.findLeaders(topics)
+KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
+  kc: KafkaCluster,
+  offsetRanges: Array[OffsetRange]): Unit = {
+val topics = offsetRanges.map(_.topicAndPartition).toSet
+val badRanges = KafkaCluster.checkErrors(for {
+  low <- kc.getEarliestLeaderOffsets(topics).right
+  high <- kc.getLatestLeaderOffsets(topics).right
+} yield {
+  offsetRanges.filterNot { o =>
+low(o.topicAndPartition).offset <= o.fromOffset &&
+o.untilOffset <= high(o.topicAndPartition).offset
+  }
+})
+if (! badRanges.isEmpty) {
--- End diff --

nit: Extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r32854351
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -158,15 +158,30 @@ object KafkaUtils {
 
   /** get leaders for the given offset ranges, or throw an exception */
   private def leadersForRanges(
-  kafkaParams: Map[String, String],
+  kc: KafkaCluster,
   offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, 
Int)] = {
-val kc = new KafkaCluster(kafkaParams)
 val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-val leaders = kc.findLeaders(topics).fold(
-  errs => throw new SparkException(errs.mkString("\n")),
-  ok => ok
-)
-leaders
+val leaders = kc.findLeaders(topics)
+KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
+  kc: KafkaCluster,
+  offsetRanges: Array[OffsetRange]): Unit = {
+val topics = offsetRanges.map(_.topicAndPartition).toSet
+val badRanges = KafkaCluster.checkErrors(for {
--- End diff --

@koeninger I know this is probably a good scala way, but this is kinda hard 
to read for the nesting. Could you take a the for-yield and put it in a 
separate variable? And then check for errors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-19 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-113562935
  
@tdas is there anything else you feel needs to be done on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/6632#discussion_r31868393
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -60,6 +62,49 @@ class KafkaRDD[
 }.toArray
   }
 
+  override def count(): Long = offsetRanges.map(_.count).sum
--- End diff --

Now checking offset ranges in the createRdd method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109514581
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109514579
  
  [Test build #34348 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34348/console)
 for   PR 6632 at commit 
[`253031d`](https://github.com/apache/spark/commit/253031d5c9b8025a432715c2a53724e11263f8e9).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109495283
  
  [Test build #34348 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34348/consoleFull)
 for   PR 6632 at commit 
[`253031d`](https://github.com/apache/spark/commit/253031d5c9b8025a432715c2a53724e11263f8e9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109495173
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109495167
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109463869
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109463823
  
  [Test build #34323 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34323/console)
 for   PR 6632 at commit 
[`8974b9e`](https://github.com/apache/spark/commit/8974b9e32bb38bfd7300f3e67ed89bb3a269b9a0).
 * This patch **fails MiMa tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109457043
  
  [Test build #34323 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34323/consoleFull)
 for   PR 6632 at commit 
[`8974b9e`](https://github.com/apache/spark/commit/8974b9e32bb38bfd7300f3e67ed89bb3a269b9a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109456903
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Streaming][Kafka][SPARK-8127] KafkaRDD optimi...

2015-06-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6632#issuecomment-109456939
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org