[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72790044
  
The warning is for metadata.broker.list, since its not expected by the
existing ConsumerConfig (its used by other config classes)

Couldn't get subclassing to work, the verifiedproperties class it uses is
very dependent on order of operations during construction.

I think the simplest thing is a class that is constructed using
kafkaparams, and uses the static defaults from the ConsumerConfig object.

I'm currently waiting in an ER with my child with a 105 fever, so won't be
getting to it for a few hours to say the least.
On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote:

 I think the simplest solution is to assign zookeeper.connect. But you are
 assigning it in KafkaCluster lines 338 - 345. So why is this warning being
 thrown?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72787965.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72783141
  
Yeah, more importantly it's so defaults for things like connection timeouts
match what kafka provides.

It's possible to assign fake zookeeper.connect and have it pass
verification, that's what existing code does.

Unfortunately ConsumerConfig has a private constructor so subclassing it in
order for the broker list to pass verification without that warning may
prove to be tricky.  Worst case scenario I'll re-implement a config that
uses the kafka defaults.

On Tue, Feb 3, 2015 at 9:05 PM, Tathagata Das notificati...@github.com
wrote:

 I see. ConsumerConfig is really necessary only for high-level consumer,
 but you are using it configure stuff in the low level consumer as well.
 That is so that you dont have to introduce parameter strings to configure
 them yourselves.

 Is it possible to assign fake but verified zookeeper.connect ?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72782434.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019631
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
--- End diff --

That's correct on both counts.

If you don't provide a way for clients to supply offset ranges and leaders 
at the same, you're forcing twice the number of remote calls (because the usual 
way to get the end of the offset range is to talk to the leader).

Yes, there's no way for people to actually use this currently unless they 
have their own copy of the functionality provided by KafkaCluster.  In my case, 
I'm just going to remove SparkException from KafkaCluster, since it's the only 
spark dependency, and distribute it as a separate jar under a different 
namespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24018460
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
--- End diff --

There is definitely a bytecode difference, try running diff on the class 
files.  It statically guarantees you can call .hasOffsetRanges on the thing 
returned from createRDD.  Without it, you'd have to cast at runtime.  If you 
add e.g. a .chzburger method to KafkaRDD, you wont be able to call it without 
asInstanceOf.  If you then made a Chzburger interface, implement it on 
KafkaRDD, change the return type to RDD with HasOffsetranges with Chzburger, 
you would.  I hear your concern about binary compatibility.

As far as exposing KafkaRDD instead... that's the way I originally designed 
things.

The current design is the result of a compromise between TD's desire as a 
maintainer to hide as much as possible, and my desire as a user to expose 
what's necessary to get my job done.  You can usually tell it's a good 
compromise if no one is happy :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72691392
  
Regarding naming, I agree.  The name has been a point of discussion for a
month, how to get some consensus?

Regarding Java wrappers, there have already been a number of changes
directed at java compatibility (array of Leader instead of a map[topic,
broker], .create in addition to .apply).  I wonder how relevant those are
if we're doing separate java wrappers (which yes, I agree should be in a
follow-on PR)

On Tue, Feb 3, 2015 at 3:13 AM, Patrick Wendell notificati...@github.com
wrote:

 I took a pass through the public API. I'm not very familiar with Kafka so
 it was somewhat slow going. However, some reactions:

1. We should try to tighten, simplify, and clarify the way we name and
document everything in this public API. Most of the comments were about
this. The most important IMO is coming up with a good name for the new
streams returned and clearly explaining how they differ from the old 
Kafka
stream. To me, the main differences seems to be in the way we (a) 
decide
what goes into which batch and (b) actually ingest the data. I proposed
javadoc and naming scheme that emphasizing that distinction.
2. Is there plans to add a Java and Python wrappers here next? Those
are straightforward and it would be good to have them. Maybe in a 
follow on
PR?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72617088.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24017652
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019904
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

I think we want the first sentence of the doc to convey why someone would 
choose this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019256
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
--- End diff --

There was discussion of this earlier that has since gotten buried.  Here's 
the gist:

https://gist.github.com/koeninger/561a61482cd1b5b3600c

The classloader being used for restoring the checkpoint doesn't have that 
class, probably because it's in external (and thus included in the user 
assembly), rather than one of the spark jars thats on the default classpath.

I went ahead and duplicated that comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24017456
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

There are 2 methods, one that takes a messagehandler (and thus returns 
RDD[R], and one that doesnt take a messagehandler as an argument, but provides 
a default one, so instead returning RDD[(K, V)]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24019815
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24018579
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+/** kafka topic name */
+val topic: String,
+/** kafka partition id */
+val partition: Int,
+/** inclusive starting offset */
+val fromOffset: Long,
+/** exclusive ending offset */
+val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, 
untilOffset)
+}
+
+object OffsetRange {
+  private[spark]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
--- End diff --

TD thought that a static method named .create was more idiomatic for java.  
It's obviously more idiomatic for scala to have a .apply method since the 
syntax sugar for it is baked into the language.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24020676
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24021621
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new SparkException(Must specify 
metadata.broker.list or bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+  topicAndPartitions: Set[TopicAndPartition]
+): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+val response = getPartitionMetadata(topics).right
+val answer = response.flatMap { tms: Set[TopicMetadata] =
+  val leaderMap = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm: PartitionMetadata =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+
+  if (leaderMap.keys.size == topicAndPartitions.size) {
+Right(leaderMap)
+  } else {
+val

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24020938
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  val brokerHost = localhost
+
+  val kafkaParams = Map(metadata.broker.list - 
s$brokerHost:$brokerPort)
+
+  val kc = new KafkaCluster(kafkaParams)
+
+  val topic = kcsuitetopic + Random.nextInt(1)
+
+  val topicAndPartition = TopicAndPartition(topic, 0)
+
+  before {
+setupKafka()
+createTopic(topic)
+produceAndSendMessage(topic, Map(a - 1))
+  }
+
+  after {
+tearDownKafka()
+  }
+
+  test(metadata apis) {
+val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+assert(leader(topicAndPartition) === (brokerHost, brokerPort), didn't 
get leader)
+
+val parts = kc.getPartitions(Set(topic)).right.get
+assert(parts(topicAndPartition), didn't get partitions)
+  }
+
+  test(leader offset apis) {
+val earliest = 
kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+assert(earliest(topicAndPartition).offset === 0, didn't get earliest)
+
+val latest = 
kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+assert(latest(topicAndPartition).offset === 1, didn't get latest)
+  }
+
+  test(consumer offset apis) {
+val group = kcsuitegroup + Random.nextInt(1)
+
+val offset = Random.nextInt(1)
+
+val set = kc.setConsumerOffsets(group, Map(topicAndPartition - 
offset))
+assert(set.isRight, didn't set consumer offsets)
+
--- End diff --

I'm not sure exactly what the question here is, but this test is just 
verifying that the consumer offset apis work.  They aren't publicly exposed, so 
the question of how people might misuse them is somewhat premature.

That being said, the reason you'd typically want to use this api would be 
for interop with existing kafka monitoring tools that expect offsets in ZK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24021039
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  var sc: SparkContext = _
+  before {
+setupKafka()
+  }
+
+  after {
+if (sc != null) {
+  sc.stop
+  sc = null
+}
+tearDownKafka()
+  }
+
+  test(Kafka RDD) {
+val sparkConf = new 
SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName)
+sc = new SparkContext(sparkConf)
+val topic = topic1
+val sent = Map(a - 5, b - 3, c - 10)
+createTopic(topic)
+produceAndSendMessage(topic, sent)
+
+val kafkaParams = Map(metadata.broker.list - 
slocalhost:$brokerPort,
+  group.id - stest-consumer-${Random.nextInt(1)})
+
+val kc = new KafkaCluster(kafkaParams)
+
+val rdd = getRdd(kc, Set(topic))
+// this is the lots of messages case
+// make sure we get all of them
+assert(rdd.isDefined)
+assert(rdd.get.count === sent.values.sum)
+
+kc.setConsumerOffsets(
--- End diff --

See previous answer.

Also, there's nothing inherently wrong with keeping offsets in ZK for the 
idempotent case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24030347
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24031550
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
--- End diff --

The comment about unused R was referring to a prior version of the PR that
had a copy-pasted type-level R parameter even for the version that returned
RDD[(K, V)].  Github probably just got confused because the comment wasn't
attached to the particular line in question.

Pretty sure things are correct at this point.

On Tue, Feb 3, 2015 at 1:05 PM, Patrick Wendell notificati...@github.com
wrote:

 In
 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 https://github.com/apache/spark/pull/3798#discussion_r24029463:

  +   * so that you can control exactly-once semantics.
  +   * @param sc SparkContext object
  +   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
  +   * configuration parameters/a.
  +   *   Requires metadata.broker.list or bootstrap.servers to be 
set with Kafka broker(s),
  +   *   NOT zookeeper servers, specified in host1:port1,host2:port2 
form.
  +   * @param batch Each OffsetRange in the batch corresponds to a
  +   *   range of offsets for a given Kafka topic/partition
  +   */
  +  @Experimental
  +  def createRDD[
  +K: ClassTag,
  +V: ClassTag,
  +U : Decoder[_]: ClassTag,
  +T : Decoder[_]: ClassTag,
  +R: ClassTag] (

 Yeah, makes sense. But the comment here suggests R is not used, however,
 I see R being used in the return type. So that was my point.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798/files#r24029463.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24033509
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
--- End diff --

The documentation has already been changed several times since your 
previous comments.  The current version of it doesn't make any comparison to 
existing createStream calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r24031208
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,174 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = offsetRanges.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaders = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+
+val leaderMap = leaders
+  .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
+  .toMap
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation is
+   * idempotent, or transactionally store offsets with the output. See the 
programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72758821
  
Besides introducing 2 classes where 1 would do, it implies that there are
(or could be) multiple implementations of the abstract class.  You're not
using it because you're actually planning for subclassing, you're using it
as a workaround for returning a slightly less complicated type from a
single method, where there's an alternative... just return RDD[(K, V)] for
that one method.

This really is a situation where there's only 1 implementation for the
foreseeable future, and a single final concrete class would be cleaner.


On Tue, Feb 3, 2015 at 5:21 PM, Tathagata Das notificati...@github.com
wrote:

 I dont get it, what's the complication with abstract classes?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72758029.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72760900
  
To put it another way, the type you return has to be public.

If you return a public abstract class, what are you going to do when
someone else subclasses it?

Making it a final concrete class doesn't have that issue.
On Feb 3, 2015 5:34 PM, Tathagata Das notificati...@github.com wrote:

 But of course there can be multiple implementations! For example, there is
 both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so
 that the code path for existing uses is not disturbed when we are
 introducing experimental code paths that are optionally enabled with 
flags.
 We never envisioned that happening, but when it occur, we could do this
 because the KafkaReceiver was not exposed, only the Receiver interface was
 exposed.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72759859.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72754442
  
Like patrick said, I really don't see any reason not to just expose
KafkaRDD.  You can still hide its constructor without making a superflous
abstract class, and you can still make another subclass of KafkaRDD later
if you need to.

Even if you don't want the static createRDD method to return a KafkaRDD, we
can just take the with HasOffsetRanges off and people who care about
getting to the offsets can cast it (they'll have to cast it for the stream
case anyway)

On Tue, Feb 3, 2015 at 4:30 PM, Tathagata Das notificati...@github.com
wrote:

 @koeninger https://github.com/koeninger Thank you very much for all the
 changes. They are looking good. Unfortunately the issue with createRDD
 returning RDD[] with OffsetRange (i.e., the issue that @pwendell
 https://github.com/pwendell raised) could be a problem in the future in
 terms of binary compatibility. Basically, we have not used such things in
 the rest of Spark to keep things as Java-friendly and binary compatible as
 possible. Also in the generated Java doc this looks scary. So here is an
 alternate suggestion that I am trying to implement on top of your PR
 branch. How about this. We effectively combine KafkaRDD and 
HasOffsetRanges
 into a abstract class.

 abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: 
SparkContext)
extends RDD[T](sc, Nil)

 private[kafka]
 class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] {
   ...
 }

 KafkaUtils.createRDD(...simple one without messageHandler...): 
KafkaRDD[(K, V)] = {
// return KafkaRDDImpl[K, V, KD, VD, (K, V)]
 }

 KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = 
{
// return KafkaRDDImpl[K, V, KD, VD, R]
 }


 Advantages

- No binary compatibility issues
- Easy to read from Java
- KafkaRDD implementation and constructor all hidden as before
- Can still extend KafkaRDD to expose more methods in future.

 What do you think?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72749763.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72757731
  
Just make the simplified createRDD return a static type of RDD[(K, V)],
that's what I'm saying.

You're already going to have to deal with those other type parameters in
java if you want to call a more complex version of createRDD, because you
have to know about the serializers and message handler.  Make the more
complex version return KafkaRDD.

I agree that casting to KafkaRDD is better than casting to
HasOffsetRanges.  The abstract class is an unnecessary complication though.


On Tue, Feb 3, 2015 at 5:11 PM, Tathagata Das notificati...@github.com
wrote:

 I spent some time talking to Patrick offline about this. If we expose the
 KafkaRDD as is (while keeping its constructor private), then the 
simplified
 createRDD would be

 KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)]

 Imagine how one would use it in Java.

 KafkaRDDString, String, StringDecoder, StringDecode, Product2String, 
String rdd = KafkaUtils.createRDD()

 That's not very Java friendly if you ask a Java developer. And we a huge
 fraction of the community as Java developers. Furthemore, we want to add
 Python API as well, and that also requires the interfaces to be
 Java-friendly. Here is the alternative (I think) with what I proposed.

 KafkaRDDString, String rdd = KafkaUtils.createRDD()

 Much simpler.

 Regarding casting, there are two cases,
 1. casting RDD generated from createRDD - If we take off HasOffsetRanges (
 KafkaUtils.createRDD returns only RDD), then user have to cast. But if we
 return abstract class KafkaRDD, then no casting necessary.
 2. casting RDD in DStream.foreachRDD - Casting is necessary either ways.
 But isnt it more intuitive to write rdd.asInstanceOf[KafkaRDD] than
 rdd.asInstanceOf[HasOffsetRanges]?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72756440.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72773257
  
To be clear, I'm ok with any solution that gives me access to what I need,
which in this case are offsets.

What's coming across as me feeling strongly about it is, I think, really
because I'm not clear on what your principles with regard to the interface
are ... well, that and frustration because it's been changed 3 times
already :)

For instance, why would you be willing to take on the fragile base class
maintenance problem in exposing KafkaRDD as something that could be
subclassed... but not ok with exposing the DStream (so that people could
override the batch generation policy)?

In the interests of moving this forward, if we're really just talking about
changing KafkaUtil's use of

RDD[..] with HasOffsetRanges
to
RDD[..]

I can make that that change


On Tue, Feb 3, 2015 at 7:02 PM, Tathagata Das notificati...@github.com
wrote:

 Okay here are the two options.

1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses
rdd.asInstanceOf[HasOffsetRanges]
2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and
DStream.foreachRDD uses rdd.asInstanceOf[KafkaRDD[_]]

 I think I am okay with either one. Stepping back, my original concern was
 returning something that had no binary compatibility issues. Both solution
 suffices. Between these two, since you feel so strongly against (2), lets
 go with (1).

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-72770451.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-03 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-72775631
  
Hey man, I'd rather talk about the code anyway.  I think there's just 
something I'm missing as far as your underlying assumptions about interfaces go 
:)  Thanks for your help on this.

Just made the createRDD change.  Not clear on what createNewStream change 
you mean. Rename it to createStream, or something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23974964
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
--- End diff --

All of the methods in kafka cluster are currently based on the idea of 
trying (at most) all of the brokers, then giving up and letting the caller 
establish an error handling policy.

Sleeping and retrying may not in general be the correct error handling 
policy.  I know it is for the input dstream's usage right here, but that 
doesn't make sense to bake into KafkaCluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972944
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.{Err, LeaderOffset}
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(Random.shuffle(seedBrokers), errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { tm: 
TopicMetadata =
+tm.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { pm: PartitionMetadata =
+pm.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+val topics = topicAndPartitions.map(_.topic)
+getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =
+  val result = tms.flatMap { tm: TopicMetadata =
--- End diff --

By loop do you mean the findLeaders method?  Yeah, it's returning the 
leaders as a map of TopicAndPartition to (host, port)

The nesting directly maps to the nesting of the kafka api's return 
values... pretty much all of the nesting in this class is just grabbing stuff 
out of kafka data structures.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23972610
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+private[streaming]
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int
+) extends InputDStream[R](ssc_) with Logging {
+
+  protected[streaming] override val checkpointData =
+new DeterministicKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
--- End diff --

I don't want KafkaCluster throwing exceptions though

On Mon, Feb 2, 2015 at 6:12 PM, Tathagata Das notificati...@github.com
wrote:

 In
 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 https://github.com/apache/spark/pull/3798#discussion_r23971976:

  +  protected val kc = new KafkaCluster(kafkaParams)
  +
  +  protected val maxMessagesPerPartition: Option[Long] = {
  +val ratePerSec = 
context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
  +if (ratePerSec  0) {
  +  val secsPerBatch = 
context.graph.batchDuration.milliseconds.toDouble / 1000
  +  Some((secsPerBatch * ratePerSec).toLong

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23904918
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = batch.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaderMap = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+val rddParts = batch.zipWithIndex.map { case (o, i) =
+val tp = TopicAndPartition(o.topic, o.partition)
+val (host, port) = leaderMap(tp)
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
+}.toArray
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, rddParts, 
messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+val leaderMap = leaders.map(l = (l.topic, l.partition) - (l.host, 
l.port)).toMap
+val rddParts = batch.zipWithIndex.map { case (o, i) =
+val (host, port) = leaderMap((o.topic, o.partition))
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
+}.toArray
+
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, rddParts, messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-02-01 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23904616
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,249 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange]
+  ): RDD[(K, V)] with HasOffsetRanges = {
+val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, 
mmd.message)
+val kc = new KafkaCluster(kafkaParams)
+val topics = batch.map(o = TopicAndPartition(o.topic, 
o.partition)).toSet
+val leaderMap = kc.findLeaders(topics).fold(
+  errs = throw new SparkException(errs.mkString(\n)),
+  ok = ok
+)
+val rddParts = batch.zipWithIndex.map { case (o, i) =
+val tp = TopicAndPartition(o.topic, o.partition)
+val (host, port) = leaderMap(tp)
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
+}.toArray
+new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, rddParts, 
messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  @Experimental
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  leaders: Array[Leader],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] with HasOffsetRanges = {
+val leaderMap = leaders.map(l = (l.topic, l.partition) - (l.host, 
l.port)).toMap
+val rddParts = batch.zipWithIndex.map { case (o, i) =
+val (host, port) = leaderMap((o.topic, o.partition))
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
+}.toArray
+
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, rddParts, messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in 
transformations
+   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after 
failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For 
interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output 
operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, 
receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output 
operation

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23889820
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param batch Each KafkaRDDPartition in the batch corresponds to a
+ *   range of offsets for a given Kafka topic/partition
+ * @param messageHandler function for translating each message into the 
desired type
+ */
+private[spark]
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag] private[spark] (
+sc: SparkContext,
+kafkaParams: Map[String, String],
+private[spark] val batch: Array[KafkaRDDPartition],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  def offsetRanges: Array[OffsetRange] = 
batch.asInstanceOf[Array[OffsetRange]]
+
+  override def getPartitions: Array[Partition] = 
batch.asInstanceOf[Array[Partition]]
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+// TODO is additional hostname resolution necessary here
+Seq(part.host)
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+sBeginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset}  +
+  sfor topic ${part.topic} partition ${part.partition}.  +
+  You either provided an invalid fromOffset, or the Kafka topic has 
been damaged
+
+  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+sRan out of messages before reaching ending offset 
${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates that messages may have been 
lost
+
+  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
+sGot ${itemOffset}  ending offset ${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates a message may have been 
skipped
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[R] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part))
+if (part.fromOffset == part.untilOffset) {
+  log.warn(Beginning offset ${part.fromOffset} is the same as ending 
offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new KafkaRDDIterator(part, context)
+}
+  }
+
+  private class KafkaRDDIterator(
+  part

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23890348
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param batch Each KafkaRDDPartition in the batch corresponds to a
+ *   range of offsets for a given Kafka topic/partition
+ * @param messageHandler function for translating each message into the 
desired type
+ */
+private[spark]
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag] private[spark] (
+sc: SparkContext,
+kafkaParams: Map[String, String],
+private[spark] val batch: Array[KafkaRDDPartition],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  def offsetRanges: Array[OffsetRange] = 
batch.asInstanceOf[Array[OffsetRange]]
+
+  override def getPartitions: Array[Partition] = 
batch.asInstanceOf[Array[Partition]]
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+// TODO is additional hostname resolution necessary here
+Seq(part.host)
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+sBeginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset}  +
+  sfor topic ${part.topic} partition ${part.partition}.  +
+  You either provided an invalid fromOffset, or the Kafka topic has 
been damaged
+
+  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+sRan out of messages before reaching ending offset 
${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates that messages may have been 
lost
+
+  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
+sGot ${itemOffset}  ending offset ${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates a message may have been 
skipped
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[R] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part))
+if (part.fromOffset == part.untilOffset) {
+  log.warn(Beginning offset ${part.fromOffset} is the same as ending 
offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new KafkaRDDIterator(part, context)
+}
+  }
+
+  private class KafkaRDDIterator(
+  part

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-31 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23890594
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+ * configuration parameters/a.
+ *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param batch Each KafkaRDDPartition in the batch corresponds to a
+ *   range of offsets for a given Kafka topic/partition
+ * @param messageHandler function for translating each message into the 
desired type
+ */
+private[spark]
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag] private[spark] (
+sc: SparkContext,
+kafkaParams: Map[String, String],
+private[spark] val batch: Array[KafkaRDDPartition],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  def offsetRanges: Array[OffsetRange] = 
batch.asInstanceOf[Array[OffsetRange]]
+
+  override def getPartitions: Array[Partition] = 
batch.asInstanceOf[Array[Partition]]
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+// TODO is additional hostname resolution necessary here
+Seq(part.host)
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+sBeginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset}  +
+  sfor topic ${part.topic} partition ${part.partition}.  +
+  You either provided an invalid fromOffset, or the Kafka topic has 
been damaged
+
+  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+sRan out of messages before reaching ending offset 
${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates that messages may have been 
lost
+
+  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
+sGot ${itemOffset}  ending offset ${part.untilOffset}  +
+sfor topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}. +
+ This should not happen, and indicates a message may have been 
skipped
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[R] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part))
+if (part.fromOffset == part.untilOffset) {
+  log.warn(Beginning offset ${part.fromOffset} is the same as ending 
offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new KafkaRDDIterator(part, context)
+}
+  }
+
+  private class KafkaRDDIterator(
+  part

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23726952
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,116 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] = {
+val parts = batch.zipWithIndex.map { case (o, i) =
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, o.host, o.port)
+}.toArray
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler)
+  }
+
+  /**
+   * This DOES NOT guarantee that side-effects of an action will see each 
message exactly once.
--- End diff --

I mentioned this in the design docs, but I'm not comfortable with naming 
something exactlyOnce when it doesn't actually guarantee that for the usual 
meaning of exactly once in a messaging system.  Hence the allcaps warning.

I think a better option would be to rename the method, suggested a couple 
of possibilities in the design doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23737948
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,116 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] = {
+val parts = batch.zipWithIndex.map { case (o, i) =
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, o.host, o.port)
+}.toArray
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler)
+  }
+
+  /**
+   * This DOES NOT guarantee that side-effects of an action will see each 
message exactly once.
--- End diff --

If you're willing to add whatever is necessary, what I'm suggesting is 
necessary is a different method name.

What's the harm in picking a more accurate method name?  

Why open the possibility of someone looking at code completion in their 
IDE, seeing a method named exactlyOnce, thinking it actually means exactly 
once, then losing money because their messages got duplicated?

Many of the existing input stream classes would meet your definition of 
exactly once, but no where else in spark streaming is there a method named 
exactlyOnce.  Why is this method special?

The distinction you are drawing between receiving exactly once and 
outputting exactly once may be clear to you, but from talking with average 
users at conferences and meetups, it is not clear to them.  Judging from the 
comments on this thread and the design doc, it's not even perfectly clear to 
dedicated members of the community.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23729727
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+trait OffsetRange {
--- End diff --

It's a trait with no implemented methods, so it will get compiled to a 
single class file with the same bytecode as a java interface (plus a scala 
signature annotation).  It won't make a separate OffsetRange$class.class file 
as you may have seen for scala traits with default method implementations.

The point of the trait/interface is that, as far as I understood, you were 
concerned about publicly exposing KafkaRDDPartition (which already is just a 
simple class, not a case class).  If you want one common supertype for both 
KafkaRDDPartition and whatever people pass into public methods to construct a 
KafkaRDD, your choices are an interface or a (possibly abstract) class.  I 
think an interface is cleaner.

TLDR 
- if you're fine with exposing KafkaRDDPartition, let's just do that.
- if you're super concerned that a trait with no implementation can't be 
used from java, i'll move it to java code and change trait for interface



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23729934
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 ---
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite 
with Eventually with Loggin
   }
 
   def createTopic(topic: String) {
-CreateTopicCommand.createTopic(zkClient, topic, 1, 1, 0)
--- End diff --

Change in the way the kafka api is namespaced.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23729871
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,116 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] = {
+val parts = batch.zipWithIndex.map { case (o, i) =
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, o.host, o.port)
+}.toArray
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler)
+  }
+
+  /**
+   * This DOES NOT guarantee that side-effects of an action will see each 
message exactly once.
+   * If you need that guarantee, get the offsets from this stream and 
store them with your output.
+   * Nor does this store offsets in Kafka / Zookeeper.
+   * If checkpointed, it will store offset ranges in the checkpoint, such 
that each message
+   * will be transformed effectively exactly once even after failure,
+   * provided you have sufficient Kafka log retention.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param messageHandler function for translating each message into the 
desired type
+   * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+   *  starting point of the stream
+   * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+   */
+  def createExactlyOnceStream[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  ssc: StreamingContext,
+  kafkaParams: Map[String, String],
+  fromOffsets: Map[TopicAndPartition, Long],
+  messageHandler: MessageAndMetadata[K, V] = R,
+  maxRetries: Int
+  ): InputDStream[R] = {
+new DeterministicKafkaInputDStream[K, V, U, T, R](
+  ssc, kafkaParams, fromOffsets, messageHandler, maxRetries)
+  }
+
+  /**
+   * This DOES NOT guarantee that side-effects of an action will see each 
message exactly once.
+   * If you need that guarantee, get the offsets from this stream and 
store them with your output.
+   * Nor does this store offsets in Kafka / Zookeeper.
+   * If checkpointed, it will store offset ranges in the checkpoint, such 
that each message
+   * will be transformed effectively exactly once even after failure,
+   * provided you have sufficient Kafka log retention.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *   If starting without a checkpoint, auto.offset.reset may be set to 
largest or smallest
+   *   to determine where the stream starts (defaults to largest)
+   * @param topics names of the topics to consume
+   */
+  def createExactlyOnceStream[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag] (
+  ssc: StreamingContext,
+  kafkaParams: Map[String, String],
+  topics: Set[String

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23746235
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+trait OffsetRange {
--- End diff --

Ok, so there are a couple of different concerns here.

First, the easy one.  Case classes.  KafkaRDDPartition isn't a case class.  
The only case class in the entire PR is LeaderOffset, which isn't public and 
probably doesn't need to be a case class anyway.  No worries.

Second, the question of whether OffsetRange needs to have a host and port.  
The issue here is that in order to get a meaningful endpoint for the range, you 
have to make a remote call to find the kafka leader anyway.  So if you give 
people a constructor that allows them to specify an ending offset, but don't 
allow them to specify a preferred leader, you are forcing an interface that 
requires 2x the number of remote calls.

Third, clients need to not only define offset ranges, they need to obtain 
offsets from the stream (for those that need them for exactly-once, or 
zookeeper interop, or whatever).  The idea of the interface is to provide 
limited access to the offsets without exposing any concrete implementation 
classes, so that you can change them later if need be.  That allows clients to 
do

stream.foreachRDD { rdd =
  rdd.foreachPartitionWithIndex { (i, iter) =
val offsetRange = rdd.partitions(i).asInstanceOf[OffsetRange]

or

stream.foreachRDD { rdd =
  val allOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

without knowing anything at all about KafkaRDD or its partition class (or 
any concrete classes for that matter).  I'm pretty sure the same cannot be done 
with your suggestion, because there's nothing public to cast the RDD or the 
partition to.  I updated the usage examples to show how this works.


https://github.com/koeninger/kafka-exactly-once/commit/d1641718807fc97f46e729e28acaba96ebc94c33

The asInstanceOf is unfortunate, but because of the way DStream is 
implemented, we cannot say anything at compile time about what the RDDs 
returned in a DStream are capable of.  By this I mean we can make 
KafkaUtils.createRDD return a RDD[R] with HasOffsetRanges instead of 
KafkaRDD, but we cannot make a corresponding change to 
KafkaUtils.createNewStream, because foreachRDD just returns RDD, not a 
parameterized type.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23746262
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -144,4 +150,116 @@ object KafkaUtils {
 createStream[K, V, U, T](
   jssc.ssc, kafkaParams.toMap, 
Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   * configuration parameters/a.
+   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param batch Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  def createRDD[
+K: ClassTag,
+V: ClassTag,
+U : Decoder[_]: ClassTag,
+T : Decoder[_]: ClassTag,
+R: ClassTag] (
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  batch: Array[OffsetRange],
+  messageHandler: MessageAndMetadata[K, V] = R
+  ): RDD[R] = {
+val parts = batch.zipWithIndex.map { case (o, i) =
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, o.host, o.port)
+}.toArray
+new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler)
+  }
+
+  /**
+   * This DOES NOT guarantee that side-effects of an action will see each 
message exactly once.
--- End diff --

Cool, changing it to createNewStream for now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23746302
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 ---
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite 
with Eventually with Loggin
   }
 
   def createTopic(topic: String) {
-CreateTopicCommand.createTopic(zkClient, topic, 1, 1, 0)
--- End diff --

Yeah, they just shuffled some of their classes around.  You'll notice the 
PR floating around for writing to kafka does the same thing, because it also 
upgrades kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23746360
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+  * Convenience methods for interacting with a Kafka cluster.
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+  */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
--- End diff --

As I recall, it's because the stream has a kafka cluster as a member value, 
and it needs to be able to be checkpointed.  The current design of KafkaCluster 
is essentially stateless aside from configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71968759
  
packaging, makes sense

method name, agreed, named it createNewStream for now

offset range, see my explanation of the interface above.  I think this is
the last substantive / non-style issue to get worked out

unit tests for the stream / kafkacluster, will see what I can do

On Wed, Jan 28, 2015 at 8:49 PM, Tathagata Das notificati...@github.com
wrote:

 At a high level these are the high level design issues that are still
 pending

- Packaging - I think all the classes should be
org.apache.spark.streaming.kafka (even KafkaRDD), because (i) all of 
them
are published in spark-streaming-kafka artifact, (ii) just importing 
one
path (o.a.spark.streaming.kafka.KafkaUtils._) is sufficient get all the
relevant classes.
- KafkaUtils method name and scala doc - Lets keep thinking of names,
but lets not stay blocked on this and continue focussing on other 
issues.
- OffsetRange - I have proposed a design as well. Either we will go
for Java interface or the simple class that I proposed.
- Unit tests - No unit test for the new stream. Also is it possible to
include a few unit tests for KafkaCluster?

 Other than these, I will start commenting on the code styles, etc very
 soon.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71960076.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-28 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23748806
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+trait OffsetRange {
--- End diff --

No, it's not attaching the offset to every record, that's what i'm trying
to avoid.  It's dealing with the offsets either once per rdd, or once per
partition, depending on what is necessary from a client semantics point of
view.

Hopefully accessing the RDD from inside foreachRDD isn't contentious?

As for accessing the partition, yeah its pretty weird to have to go index
the rdd to get the partition... but it's also pretty weird that the
existing apis named 'mapPartition*' don't actually give you access to the
partition... after all, the partition is serializable.



On Wed, Jan 28, 2015 at 11:47 PM, Tathagata Das notificati...@github.com
wrote:

 In
 external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala
 https://github.com/apache/spark/pull/3798#discussion_r23747942:

  + * (the License); you may not use this file except in compliance with
  + * the License.  You may obtain a copy of the License at
  + *
  + *http://www.apache.org/licenses/LICENSE-2.0
  + *
  + * Unless required by applicable law or agreed to in writing, software
  + * distributed under the License is distributed on an AS IS BASIS,
  + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
  + * See the License for the specific language governing permissions and
  + * limitations under the License.
  + */
  +
  +package org.apache.spark.rdd.kafka
  +
  +/** Represents a range of offsets from a single Kafka 
TopicAndPartition */
  +trait OffsetRange {

 Right, I get your point. Though I thought about the usage based on the
 example, and I think we need to think this a little bit more. From what I
 understood, you are attaching the offset in every records, and shuffling
 everything with that offset attached. That is quite a loss of efficiency.
 Also, accessing the RDD and its partition object from within the
 mapPartition function is very confusing, and ... does it actually work
 If at all this works, thats not even the recommended RDD operation!

 We really need to come up with a better way to expose offsets.
 Brainstorming a little more on this.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798/files#r23747942.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23616474
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Applies a function to each partition of this RDD, while tracking the 
index
+   * of the original partition.
+   */
+  def foreachPartitionWithIndex(
--- End diff --

Yeah, I was originally doing map followed by empty foreach and thought it 
looked confusing.

I think it's really a question of what's easier to explain, it's just a 
syntax sugar issue not a correctness issue, so no problem either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-27 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71667244
  
Most of Either's problems can be fixed with a one-line implicit conversion
to RightProjection.  I've seen scalactic before, seems like overkill by
comparison.

On Mon, Jan 26, 2015 at 8:08 PM, Imran Rashid notificati...@github.com
wrote:

 @koeninger https://github.com/koeninger
 I doubt that we want to go this route in this case, but just in case
 you're interested, I think a much better way to handle multiple errors
 gracefully is with scalactic's Or
 http://www.scalactic.org/user_guide/OrAndEvery. Its much better than
 Either for this case of building up a set of errors to report back to the
 user. And scalactic is a nicely designed, small library (eg. you're not
 pulling scalaz). Probably not worth it for this one case, but thought you
 might find it interesting :)

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71577982.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-26 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71549132
  
Just updated it

On Mon, Jan 26, 2015 at 4:06 PM, Hari Shreedharan notificati...@github.com
wrote:

 @koeninger https://github.com/koeninger Can you run sbt scalastyle and
 fix the style violations? There are like 2 instances where there are lines
  100 chars.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71548572.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23576495
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/** A batch-oriented interface for consuming from Kafka.
+  * Starting and ending offsets are specified in advance,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param batch Each KafkaRDDPartition in the batch corresponds to a
+  *   range of offsets for a given Kafka topic/partition
+  * @param messageHandler function for translating each message into the 
desired type
+  */
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+sc: SparkContext,
+val kafkaParams: Map[String, String],
+val batch: Array[KafkaRDDPartition],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging {
+
+  override def getPartitions: Array[Partition] = 
batch.asInstanceOf[Array[Partition]]
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+// TODO is additional hostname resolution necessary here
+Seq(part.host)
+  }
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[R] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+if (part.fromOffset = part.untilOffset) {
+  log.warn(Beginning offset is same or after ending offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new NextIterator[R] {
+context.addTaskCompletionListener{ context = closeIfNeeded() }
+
+log.info(sComputing topic ${part.topic}, partition 
${part.partition}  +
+  soffsets ${part.fromOffset} - ${part.untilOffset})
+
+val kc = new KafkaCluster(kafkaParams)
+val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[K]]
+val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[V]]
+val consumer = connectLeader
+var requestOffset = part.fromOffset
+var iter: Iterator[MessageAndOffset] = null
+
+// The idea is to use the provided preferred host, except on task 
retry atttempts,
+// to minimize number of kafka metadata requests
+private def connectLeader: SimpleConsumer = {
+  if (context.attemptNumber  0) {
+kc.connectLeader(part.topic, part.partition).fold(
+  errs = throw new Exception(
+sCouldn't connect to leader for topic

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-26 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71564796
  
I'm not a big fan of either either :)

The issue here is that KafkaCluster is potentially dealing with multiple
exceptions due to multiple brokers.  As a user of library code, I'd
potentially want access to all of the exceptions, not just the first or
last one.

It sounds like TD is leaning towards not making KafkaCluster exposed, in
which case we can do whatever makes sense for the internal usage of it.

On Mon, Jan 26, 2015 at 5:03 PM, Reynold Xin notificati...@github.com
wrote:

 BTW one other thing - Either is really dangerous and very complicated to
 use. It almost always leads to downstream code becoming a mess.

 You are mostly just using it for exception propagation. Why not just throw
 exceptions?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71557370.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-26 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71565526
  
I think as long as offsets are available for advanced users that want them,
relying on checkpointing for the happy path should be ok.  Will probably be
some design doc discussion on that shortly.

On Mon, Jan 26, 2015 at 4:29 PM, Hari Shreedharan notificati...@github.com
wrote:

 @koeninger https://github.com/koeninger - One general question: Since
 each checkpoint has the last offset of each batch - could we not start
 pulling data from Kafka from that offset, rather than having the user
 save the offset and restart from there?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71552262.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r23418815
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int = 1
+) extends InputDStream[R](ssc_) with Logging {
+
+  private val kc = new KafkaCluster(kafkaParams)
+
+  private val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 
1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  // TODO based on the design of InputDStream's lastValidTime, it appears 
there isn't a
+  // thread safety concern with private mutable state, but is this certain?
+  private var currentOffsets = fromOffsets
+
+  @tailrec
+  private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, 
Long] = {
+val o = kc.getLatestLeaderOffsets(currentOffsets.keys.toSet)
+// Either.fold would confuse @tailrec, do it manually
+if (o.isLeft) {
+  val err = o.left.get.toString
+  if (retries = 0) {
+throw new Exception(err)
+  } else {
+log.error(err)
+Thread.sleep(kc.config.refreshLeaderBackoffMs)
+latestLeaderOffsets(retries - 1)
+  }
+} else {
+  o.right.get
+}
+  }
+
+  private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): 
Map[TopicAndPartition, Long] = {
+maxMessagesPerPartition.map { mmp =
+  leaderOffsets.map { kv =
+val (k, v) = kv
+val curr = currentOffsets(k)
+val diff = v - curr
+if (diff  mmp) (k, curr + mmp

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-22 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71122072
  
I need to know, perhaps even at the driver, what the ending offset is in
order to be able to commit it.

I also have several use cases where I want to end a batch at a specific
point which may or may not be now.
On Jan 22, 2015 5:33 PM, Hari Shreedharan notificati...@github.com
wrote:

 OK.

 Just a thought: Do you think there might be a way to avoid the spikes?
 Once the current RDD is checkpointed, create a new pending RDD, which
 continuously receives data, until the compute method is called. When
 compute gets called, the last offset we received can be considered to be
 the upper bound, and the data is now available for transformations. That
 way, we could spread out network transfers from Kafka over a larger 
period.

 Not sure if there are holes in that algorithm, but it looks almost
 equivalent to the current model, no?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71121466.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-22 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71120030
  
Yeah, it's pulled down every batch interval.  That way you know exactly
what the upper and lower bounds of the offsets are.

On Thu, Jan 22, 2015 at 5:15 PM, Hari Shreedharan notificati...@github.com
wrote:

 I like this! I didn't try building it, but the logic is great!

 So, to sum up the idea - the key detail here is that the checkpoint
 contains the metadata to regenerate the RDDs, thus original order and
 batches are recovered. That looks good - it was the same thing I was 
trying
 to see if we could do in the Kafka receiver, but it would be difficult
 without some API changes.

 That brings me to a question - so in this PR, is the data pulled down from
 Kafka only once every batch interval - say every 2 seconds, or is there a
 way to generate it continuously rather than have spikes?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71119114.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-22 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-71131380
  
Point is, it's up to client code to commit, so that it can implement
exactly-once semantics if necessary.  Committing automatically at the end
of compute would get you something like at-most-once semantics.

Regarding the spikiness, in practice, I think you're just going to end up
doing what you do with any other streaming job, namely tuning the batch
size down until it's just comfortably above the processing time.

On Thu, Jan 22, 2015 at 6:02 PM, Hari Shreedharan notificati...@github.com
wrote:

 Say we start pulling down info in real time, wouldn't it be possible to
 say get me only n messages -- that should take care of the second point.

 I am not sure how the ending offset part is a problem. Wouldn't it make
 sense to do the commits at the end of the compute call? (You don't 
actually
 commit the offsets to Kafka anyway, correct?)

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-71124795.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-13 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-69754083
  
1.  Yes, I removed ivy and maven cache, verified the example app failed to
locate the dependency, re-published from the spark dev version, verified
the example app now found it

2.  Yes, I've tried spark-streaming both provided and included in the
assembly.  The real issue is probably that spark-streaming-kafka can't be
marked as provided and must be included in the assembly (more on this in a
second)

3. Yes, it's spark-submit to an instance of spark running out of the same
spark dev version.

So like I was saying about #2, the class that is failing to load,
 KafkaRDDPartition, is in the spark-streaming-kafka jar, not spark or
spark-streaming, so it's not available by default.  It clearly will end up
on the classpath when it's included in an application jar, because the
committed working version of the code that checkpoints tuples can
successfully convert to KafkaRDDPartition in restore().  It's just not
available in the classloader that's reading the checkpoint.  Further
evidence of this is that if I move only KafkaRDDPartition into the
spark-streaming artifact, KafkaRDDPartition can be successfully read from
the checkpoint.

KafkaRDDPartition doesn't actually have any dependencies on anything other
than Partition, so moving it into spark-streaming might be a solution...
your call on whether you think that's uglier than saving to / from tuples,
or if you want to dig further into the classloader issue.

On Mon, Jan 12, 2015 at 10:53 PM, Tathagata Das notificati...@github.com
wrote:

 Can you confirm the following.
 1. In your SBT/maven app used for testing, you are using your development
 Spark version to compile? That is, the dev version is locally publish and
 you are compiling your app against spark version 1.3.0-SNAPSHOT?
 2. Do you have spark-streaming dependency as provided scope or the
 default compile scope? And then are you creating uber jar of the app?
 3. Are you submitting the app through spark-submit to the same development
 Spark version to compile?

 On Mon, Jan 12, 2015 at 2:13 PM, Cody Koeninger notificati...@github.com

 wrote:

  Yeah, this is on a local development version, after assembly / publish
  local.
 
  Here's a gist of the exception and the diff that causes it (using
  KafkaRDDPartition instead of a tuple)
 
  https://gist.github.com/koeninger/561a61482cd1b5b3600c
 
  —
  Reply to this email directly or view it on GitHub
  https://github.com/apache/spark/pull/3798#issuecomment-69656800.
 

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-69695353.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-12 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-69651872
  
The classloader issue was when reading from the checkpoint.

If we want to rely on subclassing, some of the implementation (e.g.
currentOffsets and latestLeaderOffsets) should probably be made protected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-12 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-69656800
  
Yeah, this is on a local development version, after assembly / publish
local.

Here's a gist of the exception and the diff that causes it (using
KafkaRDDPartition instead of a tuple)

https://gist.github.com/koeninger/561a61482cd1b5b3600c


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-09 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-69446001
  
I went ahead and implemented locality and checkpointing of generated rdds.
Couple of points

- still depends on SPARK-4014 eventually being merged, for efficiency's
sake.

- I ran into classloader / class not found issues trying to checkpoint
KafkaRDDPartition directly.  Current solution is to transform them to/from
tuples, ugly but it works.  If you know what the issue is there, let me
know.

- I've got a use case that requires overriding the compute method on the
DStream (basically, modifying offsets to a fixed delay rather than now).
I'm assuming you'd prefer a user supplied function to do the transformation
rather than subclassing, but let me know.

On Mon, Jan 5, 2015 at 7:59 PM, Tathagata Das notificati...@github.com
wrote:

 Great! Keep me posted.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-68815205.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22653565
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.util.control.NonFatal
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+  * Convenience methods for interacting with a Kafka cluster.
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+  */
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.Err
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connect(hostAndPort: (String, Int)): SimpleConsumer =
+connect(hostAndPort._1, hostAndPort._2)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(connect)
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(seedBrokers, errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { t =
+t.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { partitionMeta =
+partitionMeta.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+getPartitionMetadata(topicAndPartitions.map(_.topic)).right.flatMap { 
tms =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keys.toSet)
+val err = new Err
+err.append(new Exception(sCouldn't find leaders for ${missing}))
+Left(err)
+  }
+}
+  }
+
+  def getPartitions(topics: Set[String]): Either[Err, 
Set[TopicAndPartition]] =
+getPartitionMetadata(topics).right.map { r =
+  r.flatMap { tm

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22653442
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.util.control.NonFatal
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+  * Convenience methods for interacting with a Kafka cluster.
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+  */
+class KafkaCluster(val kafkaParams: Map[String, String]) extends 
Serializable {
+  import KafkaCluster.Err
+
+  val seedBrokers: Array[(String, Int)] =
+kafkaParams.get(metadata.broker.list)
+  .orElse(kafkaParams.get(bootstrap.servers))
+  .getOrElse(throw new Exception(Must specify metadata.broker.list or 
bootstrap.servers))
+  .split(,).map { hp =
+val hpa = hp.split(:)
+(hpa(0), hpa(1).toInt)
+  }
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: ConsumerConfig = null
+
+  def config: ConsumerConfig = this.synchronized {
+if (_config == null) {
+  _config = KafkaCluster.consumerConfig(kafkaParams)
+}
+_config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+new SimpleConsumer(host, port, config.socketTimeoutMs,
+  config.socketReceiveBufferBytes, config.clientId)
+
+  def connect(hostAndPort: (String, Int)): SimpleConsumer =
+connect(hostAndPort._1, hostAndPort._2)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+findLeader(topic, partition).right.map(connect)
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, 
Int)] = {
+val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+  0, config.clientId, Seq(topic))
+val errs = new Err
+withBrokers(seedBrokers, errs) { consumer =
+  val resp: TopicMetadataResponse = consumer.send(req)
+  resp.topicsMetadata.find(_.topic == topic).flatMap { t =
+t.partitionsMetadata.find(_.partitionId == partition)
+  }.foreach { partitionMeta =
+partitionMeta.leader.foreach { leader =
+  return Right((leader.host, leader.port))
+}
+  }
+}
+Left(errs)
+  }
+
+  def findLeaders(
+topicAndPartitions: Set[TopicAndPartition]
+  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+getPartitionMetadata(topicAndPartitions.map(_.topic)).right.flatMap { 
tms =
+  val result = tms.flatMap { tm: TopicMetadata =
+tm.partitionsMetadata.flatMap { pm =
+  val tp = TopicAndPartition(tm.topic, pm.partitionId)
+  if (topicAndPartitions(tp)) {
+pm.leader.map { l =
+  tp - (l.host - l.port)
+}
+  } else {
+None
+  }
+}
+  }.toMap
+  if (result.keys.size == topicAndPartitions.size) {
+Right(result)
+  } else {
+val missing = topicAndPartitions.diff(result.keys.toSet)
+val err = new Err
+err.append(new Exception(sCouldn't find leaders for ${missing}))
+Left(err)
+  }
+}
+  }
+
+  def getPartitions(topics: Set[String]): Either[Err, 
Set[TopicAndPartition]] =
+getPartitionMetadata(topics).right.map { r =
+  r.flatMap { tm

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22653905
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.FetchRequestBuilder
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+case class KafkaRDDPartition(
+  override val index: Int,
+  topic: String,
+  partition: Int,
+  fromOffset: Long,
+  untilOffset: Long
+) extends Partition
+
+/** A batch-oriented interface for consuming from Kafka.
+  * Each given Kafka topic/partition corresponds to an RDD partition.
+  * Starting and ending offsets are specified in advance,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the batch
+  * @param untilOffsets per-topic/partition Kafka offsets defining the 
(exclusive)
+  *  ending point of the batch
+  * @param messageHandler function for translating each message into the 
desired type
+  */
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+sc: SparkContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+val untilOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging {
+
+  assert(fromOffsets.keys == untilOffsets.keys,
+Must provide both from and until offsets for each topic/partition)
+
+  override def getPartitions: Array[Partition] = 
fromOffsets.zipWithIndex.map { kvi =
+val ((tp, from), index) = kvi
+new KafkaRDDPartition(index, tp.topic, tp.partition, from, 
untilOffsets(tp))
+  }.toArray
+
+  override def compute(thePart: Partition, context: TaskContext) = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+if (part.fromOffset = part.untilOffset) {
+  log.warn(Beginning offset is same or after ending offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new NextIterator[R] {
+context.addTaskCompletionListener{ context = closeIfNeeded() }
+
+val kc = new KafkaCluster(kafkaParams)
+log.info(sComputing topic ${part.topic}, partition 
${part.partition}  +
+  soffsets ${part.fromOffset} - ${part.untilOffset})
+val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[K]]
+val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[V]]
+val consumer: SimpleConsumer = kc.connectLeader(part.topic, 
part.partition).fold(
+  errs = throw new Exception

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-05 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-68804547
  
I'm hopeful that SPARK-4014 will be finalized soon, waiting on that before
doing the refactor for preferred locations.  That will involve changing the
partition fields to add leader host and port, and not looking up leaders on
the executors until after a failure.

After that, I can take your suggestion and emulate the approach taken by
FileInputDStream to restore generatedRdds from a map of Time -
Array[KafkaRddPartition]


On Mon, Jan 5, 2015 at 5:43 PM, Tathagata Das notificati...@github.com
wrote:

 Any thoughts on my comments on driver fault-tolerance?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3798#issuecomment-68802901.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-05 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22498572
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Applies a function to each partition of this RDD, while tracking the 
index
+   * of the original partition.
+   */
+  def foreachPartitionWithIndex(
--- End diff --

Are you saying that you would prefer for users of this class to have to use 
mapPartitionsWithIndex with a side-effect of storing data, and then add an 
empty foreach in order to trigger the job to actually be scheduled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2015-01-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3543#discussion_r22446683
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 ---
@@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, 
V)])(
   keyClass: Class[_],
   valueClass: Class[_],
   outputFormatClass: Class[_ : NewOutputFormat[_, _]],
-  conf: Configuration = new Configuration) {
--- End diff --

Based on what Marcelo Vanzin said on the dev list when I brought this issue
up, the only reason the problem was still around for me to run into is
because he changed some of the uses of new Configuration but not all of
them.

I agree it's used in a lot of different places, but I'm not sure how
piecemeal fixes to only some of the places is helpful to users. Were there
still specific concerns about particular classes?

On Sun, Jan 4, 2015 at 6:28 AM, Tathagata Das notificati...@github.com
wrote:

 In
 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 https://github.com/apache/spark/pull/3543#discussion-diff-22438855:

  @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: 
DStream[(K, V)])(
 keyClass: Class[_],
 valueClass: Class[_],
 outputFormatClass: Class[_ : NewOutputFormat[_, _]],
  -  conf: Configuration = new Configuration) {

 The scope of this PR is pretty wide in terms of the number of classes it
 touches, causing issues as different places needs to be handled
 differently. If you considered moving this sort of changes (new
 Configuration to sparkContext.hadoopConfiguration) into a different PR
 that might be easier to get in.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3543/files#r22438855.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22446804
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int = 1
+) extends InputDStream[R](ssc_) with Logging {
+
+  private val kc = new KafkaCluster(kafkaParams)
+
+  private val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
--- End diff --

I'm not sure specifically what you mean by window operations that require
past data which needs to be pulled from Kafka every time. The current
KafkaRDD code has a log every time compute() is called on the rdd to pull
data from kafka, and for a job with a window operation, I only see compute
called once for a given offset range, not repeatedly every time.

Regarding the bigger question of how this approach stacks up to the two
existing approaches... they're all different approaches with different
tradeoffs, I don't think one has to win.  I'd still have a use for the
original receiver based class (not the WAL one), especially if SPARK-3146
or SPARK-4960 ever get merged.

On Sat, Jan 3, 2015 at 8:57 PM, Tathagata Das notificati...@github.com
wrote:

 In
 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 https://github.com/apache/spark/pull/3798#discussion-diff-22436219:

  +  K: ClassTag,
  +  V: ClassTag,
  +  U : Decoder[_]: ClassTag,
  +  T : Decoder[_]: ClassTag,
  +  R: ClassTag](
  +@transient ssc_ : StreamingContext,
  +val kafkaParams: Map[String, String],
  +val fromOffsets: Map[TopicAndPartition, Long],
  +messageHandler: MessageAndMetadata[K, V] = R

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22362375
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.FetchRequestBuilder
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+case class KafkaRDDPartition(
+  override val index: Int,
+  topic: String,
+  partition: Int,
+  fromOffset: Long,
+  untilOffset: Long
+) extends Partition
+
+/** A batch-oriented interface for consuming from Kafka.
+  * Each given Kafka topic/partition corresponds to an RDD partition.
+  * Starting and ending offsets are specified in advance,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the batch
+  * @param untilOffsets per-topic/partition Kafka offsets defining the 
(exclusive)
+  *  ending point of the batch
+  * @param messageHandler function for translating each message into the 
desired type
+  */
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+sc: SparkContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+val untilOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R
+  ) extends RDD[R](sc, Nil) with Logging {
+
+  assert(fromOffsets.keys == untilOffsets.keys,
+Must provide both from and until offsets for each topic/partition)
+
+  override def getPartitions: Array[Partition] = 
fromOffsets.zipWithIndex.map { kvi =
+val ((tp, from), index) = kvi
+new KafkaRDDPartition(index, tp.topic, tp.partition, from, 
untilOffsets(tp))
+  }.toArray
+
+  override def compute(thePart: Partition, context: TaskContext) = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+if (part.fromOffset = part.untilOffset) {
+  log.warn(Beginning offset is same or after ending offset  +
+sskipping ${part.topic} ${part.partition})
+  Iterator.empty
+} else {
+  new NextIterator[R] {
+context.addTaskCompletionListener{ context = closeIfNeeded() }
+
+val kc = new KafkaCluster(kafkaParams)
+log.info(sComputing topic ${part.topic}, partition 
${part.partition}  +
+  soffsets ${part.fromOffset} - ${part.untilOffset})
+val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[K]]
+val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+  .newInstance(kc.config.props)
+  .asInstanceOf[Decoder[V]]
+val consumer: SimpleConsumer = kc.connectLeader(part.topic, 
part.partition).fold(
+  errs = throw new Exception

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22364279
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int = 1
+) extends InputDStream[R](ssc_) with Logging {
+
+  private val kc = new KafkaCluster(kafkaParams)
+
+  private val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
--- End diff --

You can have multiple receivers per topic, and the closest receiver-based 
analogue to my approach would be 1 receiver per partition, hence why I set it 
up that way.  The semantics are documented in the scaladoc for the class.

If we want people to be able to configure a very granular 
per-topic-per-partition maximum per batch we can, but it should probably be 
done as an (optional) argument rather than a configuration property.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22364358
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Applies a function to each partition of this RDD, while tracking the 
index
+   * of the original partition.
+   */
+  def foreachPartitionWithIndex(
--- End diff --

The code for actually using the rdd and committing offsets transactionally 
is quite awkward without that method, see


https://github.com/koeninger/kafka-exactly-once/commit/cb812c918f724b3ae5e57c66618276b6947ed30d




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22366068
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] = R,
+maxRetries: Int = 1
+) extends InputDStream[R](ssc_) with Logging {
+
+  private val kc = new KafkaCluster(kafkaParams)
+
+  private val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0)
+if (ratePerSec  0) {
+  val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 
1000
+  Some((secsPerBatch * ratePerSec).toLong)
+} else {
+  None
+}
+  }
+
+  // TODO based on the design of InputDStream's lastValidTime, it appears 
there isn't a
+  // thread safety concern with private mutable state, but is this certain?
+  private var currentOffsets = fromOffsets
+
+  @tailrec
+  private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, 
Long] = {
+val o = kc.getLatestLeaderOffsets(currentOffsets.keys.toSet)
+// Either.fold would confuse @tailrec, do it manually
+if (o.isLeft) {
+  val err = o.left.get.toString
+  if (retries = 0) {
+throw new Exception(err)
+  } else {
+log.error(err)
+Thread.sleep(kc.config.refreshLeaderBackoffMs)
+latestLeaderOffsets(retries - 1)
+  }
+} else {
+  o.right.get
+}
+  }
+
+  private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): 
Map[TopicAndPartition, Long] = {
+maxMessagesPerPartition.map { mmp =
+  leaderOffsets.map { kv =
+val (k, v) = kv
+val curr = currentOffsets(k)
+val diff = v - curr
+if (diff  mmp) (k, curr + mmp

[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22366140
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.util.control.NonFatal
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+  * Convenience methods for interacting with a Kafka cluster.
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+  */
+class KafkaCluster(val kafkaParams: Map[String, String]) {
--- End diff --

The rdd would be really unpleasant to actually use without the convenience 
methods exposed by KafkaCluster, especially if you're keeping your offsets in 
zookeeper and doing idempotent writes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-30 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22366683
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd.kafka
+
+import scala.util.control.NonFatal
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+/**
+  * Convenience methods for interacting with a Kafka cluster.
+  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+  * configuration parameters/a.
+  *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+  */
+class KafkaCluster(val kafkaParams: Map[String, String]) {
--- End diff --

for example


https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala#L60

We also use it for doing things like e.g. starting a stream at the leader 
offsets before a given time


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4014] Change TaskContext.attemptId to r...

2014-12-30 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3849#issuecomment-68405794
  
Thanks for this.  Most of the uses of attemptId I've seen look like they 
were assuming it meant the 0-based attempt number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4014] Change TaskContext.attemptId to r...

2014-12-30 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3849#issuecomment-68407388
  
The flip side is that it's already documented as doing the right thing:


http://spark.apache.org/docs/1.1.1/api/scala/index.html#org.apache.spark.TaskContext

val attemptId: Long

the number of attempts to execute this task

On Tue, Dec 30, 2014 at 4:38 PM, Patrick Wendell notificati...@github.com
wrote:

 So personally I don't think we should change the semantics of attemptId
 because this has been exposed to user applications and they could silently
 break if we modify the meaning of the field (my original JIRA referred to
 an internal use of this). What it means right now is a global GUID over
 all attempts - that is a bit of an awkward definition, but I don't think
 it's fair to call this a bug - it was just a weird definition.

 So I'd be in favor of deprecating this in favor of taskAttemptId (a new
 field) and say that it was renamed to avoid confusion. Then we can add
 another field, attemptCount or attemptNum or something to convey the more
 intuitive thing.

 It will be slightly awkward, but if anyone reads the docs it should be
 obvious. In fact, we should probably spruce up the docs here for things
 like partitionID which right now are probably not super clear to users.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3849#issuecomment-68406594.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22330325
  
--- Diff: external/kafka/pom.xml ---
@@ -44,7 +44,7 @@
 dependency
   groupIdorg.apache.kafka/groupId
   artifactIdkafka_${scala.binary.version}/artifactId
-  version0.8.0/version
--- End diff --

It's due to the consumer offset management api only being available in 0.8.1


https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-29 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-68307147
  
I got some good feedback from Koert Kuipers at Tresata regarding location 
awareness, so I'll be doing some refactoring to add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-26 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3798#issuecomment-68149432
  
Hi @jerryshao 

I'd politely ask that anyone with questions read at least KafkaRDD.scala 
and the example usage linked from the jira ticket (it's only about 50 
significant lines of  code):

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalExample.scala

I'll try to address your points.

1.  Yes, each RDD partition maps directly to a Kafka (topic, partition, 
inclusive starting offset, exclusive ending offset)

2.  It's a pull model, not a receiver push model.  All the InputDStream 
implementation is doing is checking the leaders' highest offsets and defining 
an RDD based on that.  When the RDD is run, its iterator makes a connection to 
kafka and pulls the data.  This is done because it's simpler, and because using 
existing network receiver code would require dedicating 1 core per kafka 
partition, which is unacceptable from an ops standpoint.

3. Yes.  The fault tolerance model is that it should be safe for any or all 
of the spark machines to be completely destroyed at any point in the job, and 
the job should be able to be safely restarted.  I don't think you can do better 
than this.  This is achieved because all important state, especially the 
storage of offsets, are controlled by client code, not spark.  In both the 
transactional and idempotent client code approaches, offsets aren't stored 
until data is stored, so restart should be safe.

Regarding your approach that you link, the problem there is (a) it's not a 
part of the spark distribution so people won't know about it, and (b) it 
assumes control of kafka offsets and storage in zookeeper, which makes it 
impossible for client code to control exactly once semantics.

Regarding the possible semantic disconnect between spark streaming and 
treating kafka as a durable store of data from the past (assuming that's what 
you meant)...  I agree there is a disconnect there.  But it's a fundamental 
problem with spark streaming in that it implicitly depends on now rather than 
a time embedded in the data stream.  I don't think we're fixing that with this 
ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2014-12-24 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/3798

[SPARK-4964] [Streaming] Exactly-once semantics for Kafka



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 kafkaRdd

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3798.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3798


commit 76913e23179228481c98fbba36a54ca32fe20aed
Author: cody koeninger c...@koeninger.org
Date:   2014-11-23T03:15:30Z

Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader

commit 1d706257ac848d37caeaff0409bf60b080d66e48
Author: cody koeninger c...@koeninger.org
Date:   2014-11-23T06:10:56Z

WIP on kafka cluster

commit 0b94b3363cbc97c5d99e78c42f9be1c08a974fb1
Author: cody koeninger c...@koeninger.org
Date:   2014-11-24T14:49:24Z

use dropWhile rather than filter to trim beginning of fetch response

commit 4dafd1b0d58215cb27218e569cb5bea9d5146815
Author: cody koeninger c...@koeninger.org
Date:   2014-11-24T16:45:40Z

method to get leader offsets, switch rdd bound to being exclusive start, 
inclusive end to match offsets typically returned from cluster

commit ce91c591569b8ac4e91dd29d013961fe0ee5c316
Author: cody koeninger c...@koeninger.org
Date:   2014-11-24T18:07:24Z

method to get consumer offsets, explicit error handling

commit 7d050bcb0bcacfbd4a7b858cffae809fd2af8e9d
Author: cody koeninger c...@koeninger.org
Date:   2014-11-24T22:11:24Z

methods to set consumer offsets and get topic metadata, switch back to 
inclusive start / exclusive end to match typical kafka consumer behavior

commit 783b4775c89dbcbde9172d34653eab2718eee494
Author: cody koeninger c...@koeninger.org
Date:   2014-11-25T14:29:20Z

update tests for kafka 8.1.1

commit 29c6b430cc6bf5e2354b397289c4445f4993fc5b
Author: cody koeninger c...@koeninger.org
Date:   2014-11-25T15:33:45Z

cleanup logging

commit 3c2a96af2322754e8c76000b083ec3630a03e8c8
Author: cody koeninger c...@koeninger.org
Date:   2014-11-25T20:02:37Z

fix scalastyle errors

commit 4b078bf1e71745a6bc160c0836b54cc7b0d4171d
Author: cody koeninger c...@koeninger.org
Date:   2014-11-25T20:48:32Z

differentiate between leader and consumer offsets in error message

commit 8d7de4ab5a447a53f65be852702ca90512b2a639
Author: cody koeninger c...@koeninger.org
Date:   2014-11-25T23:54:40Z

make sure leader offsets can be found even for leaders that arent in the 
seed brokers

commit 979da25f4d48e5ffccf13ba1ff66eb2527ff01f9
Author: cody koeninger c...@koeninger.org
Date:   2014-11-26T15:31:38Z

dont allow empty leader offsets to be returned

commit 38bb727cf31744fa625248c86c2a666920e83c36
Author: cody koeninger c...@koeninger.org
Date:   2014-12-03T21:42:25Z

give easy access to the parameters of a KafkaRDD

commit 326ff3cbda37066ebef7492241276754164d2879
Author: cody koeninger c...@koeninger.org
Date:   2014-12-16T21:27:44Z

add some tests

commit 6bf14f2850f9f40f53b4c1eec373214e1b6d3465
Author: cody koeninger c...@koeninger.org
Date:   2014-12-24T17:38:52Z

first attempt at a Kafka dstream that allows for exactly-once semantics

commit bcca8a4b69f73b48f71b9558adf718b5324ed933
Author: cody koeninger c...@koeninger.org
Date:   2014-12-24T20:35:43Z

Merge branch 'master' of https://github.com/apache/spark into kafkaRdd

commit 37d305320e72de1ee6ffcd42f6a45d331a4d465d
Author: cody koeninger c...@koeninger.org
Date:   2014-12-25T04:41:40Z

make KafkaRDDPartition available to users so offsets can be committed per 
partition

commit cac63eec4a0bee6b662c4577404622a08904f0cb
Author: cody koeninger c...@koeninger.org
Date:   2014-12-25T07:11:58Z

additional testing, fix fencepost error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2014-12-17 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3543#issuecomment-67337516
  
Jenkins is failing

org.apache.spark.scheduler.SparkListenerSuite.local metrics
org.apache.spark.streaming.flume.FlumeStreamSuite.flume input compressed 
stream

I can't reproduce those test failures locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2014-12-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3543#discussion_r21610025
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
   def createParquetFile[A : Product : TypeTag](
   path: String,
   allowExisting: Boolean = true,
-  conf: Configuration = new Configuration()): SchemaRDD = {
--- End diff --

I seem to recall there being potential thread safety issues related to
hadoop configuration objects, resulting in the need to create / clone them.

Quick search turned up e.g.

https://issues.apache.org/jira/browse/SPARK-2546

I'm not sure how relevant that is to all of these existing situations where
new Configuration() is being called.

On Tue, Dec 9, 2014 at 5:07 PM, Tathagata Das notificati...@github.com
wrote:

 In sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
 https://github.com/apache/spark/pull/3543#discussion-diff-21571141:

  @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
 def createParquetFile[A : Product : TypeTag](
 path: String,
 allowExisting: Boolean = true,
  -  conf: Configuration = new Configuration()): SchemaRDD = {

 I think this should be using the hadoopConfiguration object in the
 SparkContext. That has all the hadoop related configuration already setup
 and should be what is automatically used. @marmbrus
 https://github.com/marmbrus should have a better idea.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/3543/files#r21571141.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2014-12-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3543#discussion_r21638810
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
   def createParquetFile[A : Product : TypeTag](
   path: String,
   allowExisting: Boolean = true,
-  conf: Configuration = new Configuration()): SchemaRDD = {
--- End diff --

So let me see if I have things straight

- Currently, the code is using new Configuration() as a default, which may 
have some thread safety issues due to the constructor

- my original patch uses SparkHadoopUtil.get.conf, which is a singleton, so 
should decrease the constructor thread safety problem, but increase the 
problems if the hadoop configuration is modified.  It also won't do the right 
thing for people who have altered the sparkConf, which makes it no good (I 
haven't run into this in personal usage of the patched version, because I 
always pass in a complete sparkConf via properties rather than setting it in 
code)

- @tdas suggested to use this.sparkContext.hadoopConfiguration.  This will 
use the right spark config, but may have thread safety issues both at 
construction the time the spark context is created, and if the configuration is 
modified.

So

Use tdas' suggestion, add a 
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized block to 
SparkHadoopUtil.newConfiguration?  And people are out of luck if they have code 
that used to work because they were modifying new blank instances of 
Configuration, rather than the now-shared one? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2014-12-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3543#discussion_r21192361
  
--- Diff: docs/configuration.md ---
@@ -664,6 +665,24 @@ Apart from these, the following properties are also 
available, and may be useful
   /td
 /tr
 tr
+tdcodespark.executor.heartbeatInterval/code/td
--- End diff --

Pretty sure that's just diff getting confused based on where the hadoop doc 
changes were inserted, same lines are marked as removed lower in the diff


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Closes SPARK-4229 Create hadoop configuration ...

2014-12-01 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/3543

Closes SPARK-4229 Create hadoop configuration in a consistent way



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-4229-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3543.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3543


commit c41a4b4d0752b1a5b057611c796e367c5a806be6
Author: cody koeninger c...@koeninger.org
Date:   2014-11-04T22:40:17Z

SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are 
copied from spark config
Resolved conflicts in favor of master.

Conflicts:

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

commit b48ad63fb9c31de90c8b5b0541129e2c71bd3478
Author: cody koeninger c...@koeninger.org
Date:   2014-11-04T22:41:07Z

SPARK-4229 document handling of spark.hadoop.* properties

commit 413f916bafc5b218ab334cb9d66b67f3dbc117f7
Author: cody koeninger c...@koeninger.org
Date:   2014-11-05T03:26:26Z

SPARK-4229 fix broken table in documentation, make hadoop doc formatting 
match that of runtime env




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...

2014-12-01 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/3102#issuecomment-65176731
  
Yes, the new hadoop config documentation is just documenting the behavior 
of SparkHadoopUtil.scala lines 95-100

Sorry about the branch situation, I was unclear on what the plan for 1.2 
merges was.
Opened a new PR that should merge cleanly into master

https://github.com/apache/spark/pull/3543


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...

2014-12-01 Thread koeninger
Github user koeninger closed the pull request at:

https://github.com/apache/spark/pull/3102


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...

2014-11-04 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/3102

Spark 4229 Create hadoop configuration in a consistent way



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-4229

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3102


commit 3cd384f77ba9505fe7c94c82980e07044f6b128c
Author: cody koeninger c...@koeninger.org
Date:   2014-11-04T22:40:17Z

SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are 
copied from spark config

commit f2ee4f9f1ed717d54fb7916ff2cf3ae85468eab0
Author: cody koeninger c...@koeninger.org
Date:   2014-11-04T22:41:07Z

SPARK-4229 document handling of spark.hadoop.* properties

commit eebbdcc53caa214079612732d3a4a13e57cecffe
Author: cody koeninger c...@koeninger.org
Date:   2014-11-05T03:26:26Z

SPARK-4229 fix broken table in documentation, make hadoop doc formatting 
match that of runtime env




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-3462 push down filters and projections i...

2014-09-11 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/2345#issuecomment-55336261
  
@marbrus I see what you mean.  Updated to basically what you suggested, 
aside from building the map once.  Let me know, once it's finalized I can try 
to test one more time on live data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-3462 push down filters and projections i...

2014-09-10 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/2345

SPARK-3462 push down filters and projections into Unions



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mediacrossinginc/spark SPARK-3462

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2345


commit ef47b3b80dd92f4652947ccffa5c9fea97adffb0
Author: Cody Koeninger cody.koenin...@mediacrossing.com
Date:   2014-09-10T05:07:58Z

SPARK-3462 push down filters and projections into Unions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...

2014-09-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/1612#discussion_r17279539
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcResultSetRDDSuite.scala 
---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql._
+
+import org.scalatest.BeforeAndAfter
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.TestSQLContext._
+
+class JdbcResultSetRDDSuite extends QueryTest with BeforeAndAfter {
+
+  before {
+Class.forName(org.apache.derby.jdbc.EmbeddedDriver)
+val conn = 
DriverManager.getConnection(jdbc:derby:target/JdbcSchemaRDDSuiteDb;create=true)
+try {
+  val create = conn.createStatement
+  create.execute(
+CREATE TABLE FOO(
+  ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1),
+  DATA INTEGER
+))
+  create.close()
+  val insert = conn.prepareStatement(INSERT INTO FOO(DATA) VALUES(?))
+  (1 to 100).foreach { i =
+insert.setInt(1, i * 2)
+insert.executeUpdate
+  }
+  insert.close()
+} catch {
+  case e: SQLException if e.getSQLState == X0Y32 =
+// table exists
+} finally {
+  conn.close()
+}
+  }
+
+  test(basic functionality) {
+val jdbcResultSetRDD = 
jdbcResultSet(jdbc:derby:target/JdbcSchemaRDDSuiteDb, SELECT DATA FROM FOO)
+jdbcResultSetRDD.registerAsTable(foo)
+
+checkAnswer(
+  sql(select count(*) from foo),
+  100
+)
+checkAnswer(
+  sql(select sum(DATA) from foo),
+  10100
+)
+  }
+
+  after {
+try {
+  DriverManager.getConnection(jdbc:derby:;shutdown=true)
+} catch {
+  case se: SQLException if se.getSQLState == XJ015 =
--- End diff --

http://db.apache.org/derby/papers/DerbyTut/embedded_intro.html

 A clean shutdown always throws SQL exception XJ015, which can be ignored. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...

2014-09-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/1612#discussion_r17279806
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ---
@@ -81,8 +113,14 @@ class JdbcRDD[T: ClassTag](
   logInfo(statement fetch size set to:  + stmt.getFetchSize +  to 
force MySQL streaming )
 }
 
-stmt.setLong(1, part.lower)
-stmt.setLong(2, part.upper)
+val parameterCount = stmt.getParameterMetaData.getParameterCount
+if (parameterCount  0) {
--- End diff --

Not that there's anything wrong with backwards compatible 
fixes/enhancements, but a few things I noticed here:

1.  If it's a sufficiently small table that a user is only using 1 
partition, why not encourage them to query it from the driver and broadcast it?

2.  As it stands, it looks like you allow 0, 1, 2, or more ? placeholders, 
but the doc comment change only describes the 0 or 2 case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...

2014-09-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/1612#discussion_r17280404
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ---
@@ -67,6 +73,32 @@ class JdbcRDD[T: ClassTag](
 }).toArray
   }
 
+  def getSchema: Seq[(String, Int, Boolean)] = {
+if (null != schema) {
+  return schema
+}
+
+val conn = getConnection()
--- End diff --

Is this connection guaranteed to get closed?  It won't benefit from the 
addOnCompleteCallback below, for instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...

2014-09-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/1612#discussion_r17280425
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ---
@@ -67,6 +73,32 @@ class JdbcRDD[T: ClassTag](
 }).toArray
   }
 
+  def getSchema: Seq[(String, Int, Boolean)] = {
+if (null != schema) {
+  return schema
+}
+
+val conn = getConnection()
+val stmt = conn.prepareStatement(sql)
+val metadata = stmt.getMetaData
+try {
+  if (null != stmt  ! stmt.isClosed()) {
+stmt.close()
+  }
+} catch {
+  case e: Exception = logWarning(Exception closing statement, e)
+}
+schema = Seq[(String, Int, Boolean)]()
+for(i - 1 to metadata.getColumnCount) {
+  schema :+= (
--- End diff --

Are there any thread safety concerns regarding mutating schema here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1097: Do not introduce deadlock while fi...

2014-07-15 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/1409#issuecomment-49066268
  
Testing that patch, it seems to have fixed the deadlock we were seeing in 
production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    2   3   4   5   6   7