[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72790044 The warning is for metadata.broker.list, since its not expected by the existing ConsumerConfig (its used by other config classes) Couldn't get subclassing to work, the verifiedproperties class it uses is very dependent on order of operations during construction. I think the simplest thing is a class that is constructed using kafkaparams, and uses the static defaults from the ConsumerConfig object. I'm currently waiting in an ER with my child with a 105 fever, so won't be getting to it for a few hours to say the least. On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote: I think the simplest solution is to assign zookeeper.connect. But you are assigning it in KafkaCluster lines 338 - 345. So why is this warning being thrown? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72787965. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72783141 Yeah, more importantly it's so defaults for things like connection timeouts match what kafka provides. It's possible to assign fake zookeeper.connect and have it pass verification, that's what existing code does. Unfortunately ConsumerConfig has a private constructor so subclassing it in order for the broker list to pass verification without that warning may prove to be tricky. Worst case scenario I'll re-implement a config that uses the kafka defaults. On Tue, Feb 3, 2015 at 9:05 PM, Tathagata Das notificati...@github.com wrote: I see. ConsumerConfig is really necessary only for high-level consumer, but you are using it configure stuff in the low level consumer as well. That is so that you dont have to introduce parameter strings to configure them yourselves. Is it possible to assign fake but verified zookeeper.connect ? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72782434. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019631 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], --- End diff -- That's correct on both counts. If you don't provide a way for clients to supply offset ranges and leaders at the same, you're forcing twice the number of remote calls (because the usual way to get the end of the offset range is to talk to the leader). Yes, there's no way for people to actually use this currently unless they have their own copy of the functionality provided by KafkaCluster. In my case, I'm just going to remove SparkException from KafkaCluster, since it's the only spark dependency, and distribute it as a separate jar under a different namespace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24018460 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { --- End diff -- There is definitely a bytecode difference, try running diff on the class files. It statically guarantees you can call .hasOffsetRanges on the thing returned from createRDD. Without it, you'd have to cast at runtime. If you add e.g. a .chzburger method to KafkaRDD, you wont be able to call it without asInstanceOf. If you then made a Chzburger interface, implement it on KafkaRDD, change the return type to RDD with HasOffsetranges with Chzburger, you would. I hear your concern about binary compatibility. As far as exposing KafkaRDD instead... that's the way I originally designed things. The current design is the result of a compromise between TD's desire as a maintainer to hide as much as possible, and my desire as a user to expose what's necessary to get my job done. You can usually tell it's a good compromise if no one is happy :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72691392 Regarding naming, I agree. The name has been a point of discussion for a month, how to get some consensus? Regarding Java wrappers, there have already been a number of changes directed at java compatibility (array of Leader instead of a map[topic, broker], .create in addition to .apply). I wonder how relevant those are if we're doing separate java wrappers (which yes, I agree should be in a follow-on PR) On Tue, Feb 3, 2015 at 3:13 AM, Patrick Wendell notificati...@github.com wrote: I took a pass through the public API. I'm not very familiar with Kafka so it was somewhat slow going. However, some reactions: 1. We should try to tighten, simplify, and clarify the way we name and document everything in this public API. Most of the comments were about this. The most important IMO is coming up with a good name for the new streams returned and clearly explaining how they differ from the old Kafka stream. To me, the main differences seems to be in the way we (a) decide what goes into which batch and (b) actually ingest the data. I proposed javadoc and naming scheme that emphasizing that distinction. 2. Is there plans to add a Java and Python wrappers here next? Those are straightforward and it would be good to have them. Maybe in a follow on PR? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72617088. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24017652 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019904 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- I think we want the first sentence of the doc to convey why someone would choose this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019256 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ --- End diff -- There was discussion of this earlier that has since gotten buried. Here's the gist: https://gist.github.com/koeninger/561a61482cd1b5b3600c The classloader being used for restoring the checkpoint doesn't have that class, probably because it's in external (and thus included in the user assembly), rather than one of the spark jars thats on the default classpath. I went ahead and duplicated that comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24017456 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- There are 2 methods, one that takes a messagehandler (and thus returns RDD[R], and one that doesnt take a messagehandler as an argument, but provides a default one, so instead returning RDD[(K, V)] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019815 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24018579 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +object OffsetRange { + private[spark] + type OffsetRangeTuple = (String, Int, Long, Long) + + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = --- End diff -- TD thought that a static method named .create was more idiomatic for java. It's obviously more idiomatic for scala to have a .apply method since the syntax sugar for it is baked into the language. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24020676 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24021621 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24020938 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import org.scalatest.BeforeAndAfter +import kafka.common.TopicAndPartition + +class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + val brokerHost = localhost + + val kafkaParams = Map(metadata.broker.list - s$brokerHost:$brokerPort) + + val kc = new KafkaCluster(kafkaParams) + + val topic = kcsuitetopic + Random.nextInt(1) + + val topicAndPartition = TopicAndPartition(topic, 0) + + before { +setupKafka() +createTopic(topic) +produceAndSendMessage(topic, Map(a - 1)) + } + + after { +tearDownKafka() + } + + test(metadata apis) { +val leader = kc.findLeaders(Set(topicAndPartition)).right.get +assert(leader(topicAndPartition) === (brokerHost, brokerPort), didn't get leader) + +val parts = kc.getPartitions(Set(topic)).right.get +assert(parts(topicAndPartition), didn't get partitions) + } + + test(leader offset apis) { +val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get +assert(earliest(topicAndPartition).offset === 0, didn't get earliest) + +val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get +assert(latest(topicAndPartition).offset === 1, didn't get latest) + } + + test(consumer offset apis) { +val group = kcsuitegroup + Random.nextInt(1) + +val offset = Random.nextInt(1) + +val set = kc.setConsumerOffsets(group, Map(topicAndPartition - offset)) +assert(set.isRight, didn't set consumer offsets) + --- End diff -- I'm not sure exactly what the question here is, but this test is just verifying that the consumer offset apis work. They aren't publicly exposed, so the question of how people might misuse them is somewhat premature. That being said, the reason you'd typically want to use this api would be for interop with existing kafka monitoring tools that expect offsets in ZK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24021039 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import kafka.serializer.StringDecoder +import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.SparkContext._ + +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var sc: SparkContext = _ + before { +setupKafka() + } + + after { +if (sc != null) { + sc.stop + sc = null +} +tearDownKafka() + } + + test(Kafka RDD) { +val sparkConf = new SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName) +sc = new SparkContext(sparkConf) +val topic = topic1 +val sent = Map(a - 5, b - 3, c - 10) +createTopic(topic) +produceAndSendMessage(topic, sent) + +val kafkaParams = Map(metadata.broker.list - slocalhost:$brokerPort, + group.id - stest-consumer-${Random.nextInt(1)}) + +val kc = new KafkaCluster(kafkaParams) + +val rdd = getRdd(kc, Set(topic)) +// this is the lots of messages case +// make sure we get all of them +assert(rdd.isDefined) +assert(rdd.get.count === sent.values.sum) + +kc.setConsumerOffsets( --- End diff -- See previous answer. Also, there's nothing inherently wrong with keeping offsets in ZK for the idempotent case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24030347 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24031550 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- The comment about unused R was referring to a prior version of the PR that had a copy-pasted type-level R parameter even for the version that returned RDD[(K, V)]. Github probably just got confused because the comment wasn't attached to the particular line in question. Pretty sure things are correct at this point. On Tue, Feb 3, 2015 at 1:05 PM, Patrick Wendell notificati...@github.com wrote: In external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala https://github.com/apache/spark/pull/3798#discussion_r24029463: + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( Yeah, makes sense. But the comment here suggests R is not used, however, I see R being used in the return type. So that was my point. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798/files#r24029463. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24033509 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- The documentation has already been changed several times since your previous comments. The current version of it doesn't make any comparison to existing createStream calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24031208 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72758821 Besides introducing 2 classes where 1 would do, it implies that there are (or could be) multiple implementations of the abstract class. You're not using it because you're actually planning for subclassing, you're using it as a workaround for returning a slightly less complicated type from a single method, where there's an alternative... just return RDD[(K, V)] for that one method. This really is a situation where there's only 1 implementation for the foreseeable future, and a single final concrete class would be cleaner. On Tue, Feb 3, 2015 at 5:21 PM, Tathagata Das notificati...@github.com wrote: I dont get it, what's the complication with abstract classes? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72758029. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72760900 To put it another way, the type you return has to be public. If you return a public abstract class, what are you going to do when someone else subclasses it? Making it a final concrete class doesn't have that issue. On Feb 3, 2015 5:34 PM, Tathagata Das notificati...@github.com wrote: But of course there can be multiple implementations! For example, there is both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so that the code path for existing uses is not disturbed when we are introducing experimental code paths that are optionally enabled with flags. We never envisioned that happening, but when it occur, we could do this because the KafkaReceiver was not exposed, only the Receiver interface was exposed. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72759859. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72754442 Like patrick said, I really don't see any reason not to just expose KafkaRDD. You can still hide its constructor without making a superflous abstract class, and you can still make another subclass of KafkaRDD later if you need to. Even if you don't want the static createRDD method to return a KafkaRDD, we can just take the with HasOffsetRanges off and people who care about getting to the offsets can cast it (they'll have to cast it for the stream case anyway) On Tue, Feb 3, 2015 at 4:30 PM, Tathagata Das notificati...@github.com wrote: @koeninger https://github.com/koeninger Thank you very much for all the changes. They are looking good. Unfortunately the issue with createRDD returning RDD[] with OffsetRange (i.e., the issue that @pwendell https://github.com/pwendell raised) could be a problem in the future in terms of binary compatibility. Basically, we have not used such things in the rest of Spark to keep things as Java-friendly and binary compatible as possible. Also in the generated Java doc this looks scary. So here is an alternate suggestion that I am trying to implement on top of your PR branch. How about this. We effectively combine KafkaRDD and HasOffsetRanges into a abstract class. abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: SparkContext) extends RDD[T](sc, Nil) private[kafka] class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] { ... } KafkaUtils.createRDD(...simple one without messageHandler...): KafkaRDD[(K, V)] = { // return KafkaRDDImpl[K, V, KD, VD, (K, V)] } KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = { // return KafkaRDDImpl[K, V, KD, VD, R] } Advantages - No binary compatibility issues - Easy to read from Java - KafkaRDD implementation and constructor all hidden as before - Can still extend KafkaRDD to expose more methods in future. What do you think? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72749763. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72757731 Just make the simplified createRDD return a static type of RDD[(K, V)], that's what I'm saying. You're already going to have to deal with those other type parameters in java if you want to call a more complex version of createRDD, because you have to know about the serializers and message handler. Make the more complex version return KafkaRDD. I agree that casting to KafkaRDD is better than casting to HasOffsetRanges. The abstract class is an unnecessary complication though. On Tue, Feb 3, 2015 at 5:11 PM, Tathagata Das notificati...@github.com wrote: I spent some time talking to Patrick offline about this. If we expose the KafkaRDD as is (while keeping its constructor private), then the simplified createRDD would be KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)] Imagine how one would use it in Java. KafkaRDDString, String, StringDecoder, StringDecode, Product2String, String rdd = KafkaUtils.createRDD() That's not very Java friendly if you ask a Java developer. And we a huge fraction of the community as Java developers. Furthemore, we want to add Python API as well, and that also requires the interfaces to be Java-friendly. Here is the alternative (I think) with what I proposed. KafkaRDDString, String rdd = KafkaUtils.createRDD() Much simpler. Regarding casting, there are two cases, 1. casting RDD generated from createRDD - If we take off HasOffsetRanges ( KafkaUtils.createRDD returns only RDD), then user have to cast. But if we return abstract class KafkaRDD, then no casting necessary. 2. casting RDD in DStream.foreachRDD - Casting is necessary either ways. But isnt it more intuitive to write rdd.asInstanceOf[KafkaRDD] than rdd.asInstanceOf[HasOffsetRanges]? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72756440. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72773257 To be clear, I'm ok with any solution that gives me access to what I need, which in this case are offsets. What's coming across as me feeling strongly about it is, I think, really because I'm not clear on what your principles with regard to the interface are ... well, that and frustration because it's been changed 3 times already :) For instance, why would you be willing to take on the fragile base class maintenance problem in exposing KafkaRDD as something that could be subclassed... but not ok with exposing the DStream (so that people could override the batch generation policy)? In the interests of moving this forward, if we're really just talking about changing KafkaUtil's use of RDD[..] with HasOffsetRanges to RDD[..] I can make that that change On Tue, Feb 3, 2015 at 7:02 PM, Tathagata Das notificati...@github.com wrote: Okay here are the two options. 1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses rdd.asInstanceOf[HasOffsetRanges] 2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and DStream.foreachRDD uses rdd.asInstanceOf[KafkaRDD[_]] I think I am okay with either one. Stepping back, my original concern was returning something that had no binary compatibility issues. Both solution suffices. Between these two, since you feel so strongly against (2), lets go with (1). â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72770451. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72775631 Hey man, I'd rather talk about the code anyway. I think there's just something I'm missing as far as your underlying assumptions about interfaces go :) Thanks for your help on this. Just made the createRDD change. Not clear on what createNewStream change you mean. Rename it to createStream, or something else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23974964 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { --- End diff -- All of the methods in kafka cluster are currently based on the idea of trying (at most) all of the brokers, then giving up and letting the caller establish an error handling policy. Sleeping and retrying may not in general be the correct error handling policy. I know it is for the input dstream's usage right here, but that doesn't make sense to bake into KafkaCluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972944 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = --- End diff -- By loop do you mean the findLeaders method? Yeah, it's returning the leaders as a map of TopicAndPartition to (host, port) The nesting directly maps to the nesting of the kafka api's return values... pretty much all of the nesting in this class is just grabbing stuff out of kafka data structures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972610 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { --- End diff -- I don't want KafkaCluster throwing exceptions though On Mon, Feb 2, 2015 at 6:12 PM, Tathagata Das notificati...@github.com wrote: In external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala https://github.com/apache/spark/pull/3798#discussion_r23971976: + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23904918 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = batch.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaderMap = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +val rddParts = batch.zipWithIndex.map { case (o, i) = +val tp = TopicAndPartition(o.topic, o.partition) +val (host, port) = leaderMap(tp) +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) +}.toArray +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, rddParts, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { +val leaderMap = leaders.map(l = (l.topic, l.partition) - (l.host, l.port)).toMap +val rddParts = batch.zipWithIndex.map { case (o, i) = +val (host, port) = leaderMap((o.topic, o.partition)) +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) +}.toArray + +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, rddParts, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23904616 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = batch.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaderMap = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +val rddParts = batch.zipWithIndex.map { case (o, i) = +val tp = TopicAndPartition(o.topic, o.partition) +val (host, port) = leaderMap(tp) +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) +}.toArray +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, rddParts, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { +val leaderMap = leaders.map(l = (l.topic, l.partition) - (l.host, l.port)).toMap +val rddParts = batch.zipWithIndex.map { case (o, i) = +val (host, port) = leaderMap((o.topic, o.partition)) +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) +}.toArray + +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, rddParts, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23889820 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.{FetchRequestBuilder, FetchResponse} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each KafkaRDDPartition in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ +private[spark] +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag] private[spark] ( +sc: SparkContext, +kafkaParams: Map[String, String], +private[spark] val batch: Array[KafkaRDDPartition], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + def offsetRanges: Array[OffsetRange] = batch.asInstanceOf[Array[OffsetRange]] + + override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]] + + override def getPreferredLocations(thePart: Partition): Seq[String] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +// TODO is additional hostname resolution necessary here +Seq(part.host) + } + + private def errBeginAfterEnd(part: KafkaRDDPartition): String = +sBeginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} + + sfor topic ${part.topic} partition ${part.partition}. + + You either provided an invalid fromOffset, or the Kafka topic has been damaged + + private def errRanOutBeforeEnd(part: KafkaRDDPartition): String = +sRan out of messages before reaching ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates that messages may have been lost + + private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String = +sGot ${itemOffset} ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates a message may have been skipped + + override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part)) +if (part.fromOffset == part.untilOffset) { + log.warn(Beginning offset ${part.fromOffset} is the same as ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new KafkaRDDIterator(part, context) +} + } + + private class KafkaRDDIterator( + part
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23890348 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.{FetchRequestBuilder, FetchResponse} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each KafkaRDDPartition in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ +private[spark] +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag] private[spark] ( +sc: SparkContext, +kafkaParams: Map[String, String], +private[spark] val batch: Array[KafkaRDDPartition], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + def offsetRanges: Array[OffsetRange] = batch.asInstanceOf[Array[OffsetRange]] + + override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]] + + override def getPreferredLocations(thePart: Partition): Seq[String] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +// TODO is additional hostname resolution necessary here +Seq(part.host) + } + + private def errBeginAfterEnd(part: KafkaRDDPartition): String = +sBeginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} + + sfor topic ${part.topic} partition ${part.partition}. + + You either provided an invalid fromOffset, or the Kafka topic has been damaged + + private def errRanOutBeforeEnd(part: KafkaRDDPartition): String = +sRan out of messages before reaching ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates that messages may have been lost + + private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String = +sGot ${itemOffset} ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates a message may have been skipped + + override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part)) +if (part.fromOffset == part.untilOffset) { + log.warn(Beginning offset ${part.fromOffset} is the same as ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new KafkaRDDIterator(part, context) +} + } + + private class KafkaRDDIterator( + part
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23890594 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.{FetchRequestBuilder, FetchResponse} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each KafkaRDDPartition in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ +private[spark] +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag] private[spark] ( +sc: SparkContext, +kafkaParams: Map[String, String], +private[spark] val batch: Array[KafkaRDDPartition], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + def offsetRanges: Array[OffsetRange] = batch.asInstanceOf[Array[OffsetRange]] + + override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]] + + override def getPreferredLocations(thePart: Partition): Seq[String] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +// TODO is additional hostname resolution necessary here +Seq(part.host) + } + + private def errBeginAfterEnd(part: KafkaRDDPartition): String = +sBeginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} + + sfor topic ${part.topic} partition ${part.partition}. + + You either provided an invalid fromOffset, or the Kafka topic has been damaged + + private def errRanOutBeforeEnd(part: KafkaRDDPartition): String = +sRan out of messages before reaching ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates that messages may have been lost + + private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String = +sGot ${itemOffset} ending offset ${part.untilOffset} + +sfor topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. + + This should not happen, and indicates a message may have been skipped + + override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +assert(part.fromOffset = part.untilOffset, errBeginAfterEnd(part)) +if (part.fromOffset == part.untilOffset) { + log.warn(Beginning offset ${part.fromOffset} is the same as ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new KafkaRDDIterator(part, context) +} + } + + private class KafkaRDDIterator( + part
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23726952 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,116 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] = { +val parts = batch.zipWithIndex.map { case (o, i) = +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.host, o.port) +}.toArray +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler) + } + + /** + * This DOES NOT guarantee that side-effects of an action will see each message exactly once. --- End diff -- I mentioned this in the design docs, but I'm not comfortable with naming something exactlyOnce when it doesn't actually guarantee that for the usual meaning of exactly once in a messaging system. Hence the allcaps warning. I think a better option would be to rename the method, suggested a couple of possibilities in the design doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23737948 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,116 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] = { +val parts = batch.zipWithIndex.map { case (o, i) = +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.host, o.port) +}.toArray +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler) + } + + /** + * This DOES NOT guarantee that side-effects of an action will see each message exactly once. --- End diff -- If you're willing to add whatever is necessary, what I'm suggesting is necessary is a different method name. What's the harm in picking a more accurate method name? Why open the possibility of someone looking at code completion in their IDE, seeing a method named exactlyOnce, thinking it actually means exactly once, then losing money because their messages got duplicated? Many of the existing input stream classes would meet your definition of exactly once, but no where else in spark streaming is there a method named exactlyOnce. Why is this method special? The distinction you are drawing between receiving exactly once and outputting exactly once may be clear to you, but from talking with average users at conferences and meetups, it is not clear to them. Judging from the comments on this thread and the design doc, it's not even perfectly clear to dedicated members of the community. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23729727 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +trait OffsetRange { --- End diff -- It's a trait with no implemented methods, so it will get compiled to a single class file with the same bytecode as a java interface (plus a scala signature annotation). It won't make a separate OffsetRange$class.class file as you may have seen for scala traits with default method implementations. The point of the trait/interface is that, as far as I understood, you were concerned about publicly exposing KafkaRDDPartition (which already is just a simple class, not a case class). If you want one common supertype for both KafkaRDDPartition and whatever people pass into public methods to construct a KafkaRDD, your choices are an interface or a (possibly abstract) class. I think an interface is cleaner. TLDR - if you're fine with exposing KafkaRDDPartition, let's just do that. - if you're super concerned that a trait with no implementation can't be used from java, i'll move it to java code and change trait for interface --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23729934 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala --- @@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } def createTopic(topic: String) { -CreateTopicCommand.createTopic(zkClient, topic, 1, 1, 0) --- End diff -- Change in the way the kafka api is namespaced. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23729871 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,116 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] = { +val parts = batch.zipWithIndex.map { case (o, i) = +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.host, o.port) +}.toArray +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler) + } + + /** + * This DOES NOT guarantee that side-effects of an action will see each message exactly once. + * If you need that guarantee, get the offsets from this stream and store them with your output. + * Nor does this store offsets in Kafka / Zookeeper. + * If checkpointed, it will store offset ranges in the checkpoint, such that each message + * will be transformed effectively exactly once even after failure, + * provided you have sufficient Kafka log retention. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param messageHandler function for translating each message into the desired type + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ + def createExactlyOnceStream[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + ssc: StreamingContext, + kafkaParams: Map[String, String], + fromOffsets: Map[TopicAndPartition, Long], + messageHandler: MessageAndMetadata[K, V] = R, + maxRetries: Int + ): InputDStream[R] = { +new DeterministicKafkaInputDStream[K, V, U, T, R]( + ssc, kafkaParams, fromOffsets, messageHandler, maxRetries) + } + + /** + * This DOES NOT guarantee that side-effects of an action will see each message exactly once. + * If you need that guarantee, get the offsets from this stream and store them with your output. + * Nor does this store offsets in Kafka / Zookeeper. + * If checkpointed, it will store offset ranges in the checkpoint, such that each message + * will be transformed effectively exactly once even after failure, + * provided you have sufficient Kafka log retention. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * If starting without a checkpoint, auto.offset.reset may be set to largest or smallest + * to determine where the stream starts (defaults to largest) + * @param topics names of the topics to consume + */ + def createExactlyOnceStream[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Set[String
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23746235 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +trait OffsetRange { --- End diff -- Ok, so there are a couple of different concerns here. First, the easy one. Case classes. KafkaRDDPartition isn't a case class. The only case class in the entire PR is LeaderOffset, which isn't public and probably doesn't need to be a case class anyway. No worries. Second, the question of whether OffsetRange needs to have a host and port. The issue here is that in order to get a meaningful endpoint for the range, you have to make a remote call to find the kafka leader anyway. So if you give people a constructor that allows them to specify an ending offset, but don't allow them to specify a preferred leader, you are forcing an interface that requires 2x the number of remote calls. Third, clients need to not only define offset ranges, they need to obtain offsets from the stream (for those that need them for exactly-once, or zookeeper interop, or whatever). The idea of the interface is to provide limited access to the offsets without exposing any concrete implementation classes, so that you can change them later if need be. That allows clients to do stream.foreachRDD { rdd = rdd.foreachPartitionWithIndex { (i, iter) = val offsetRange = rdd.partitions(i).asInstanceOf[OffsetRange] or stream.foreachRDD { rdd = val allOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges without knowing anything at all about KafkaRDD or its partition class (or any concrete classes for that matter). I'm pretty sure the same cannot be done with your suggestion, because there's nothing public to cast the RDD or the partition to. I updated the usage examples to show how this works. https://github.com/koeninger/kafka-exactly-once/commit/d1641718807fc97f46e729e28acaba96ebc94c33 The asInstanceOf is unfortunate, but because of the way DStream is implemented, we cannot say anything at compile time about what the RDDs returned in a DStream are capable of. By this I mean we can make KafkaUtils.createRDD return a RDD[R] with HasOffsetRanges instead of KafkaRDD, but we cannot make a corresponding change to KafkaUtils.createNewStream, because foreachRDD just returns RDD, not a parameterized type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23746262 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,116 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + batch: Array[OffsetRange], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] = { +val parts = batch.zipWithIndex.map { case (o, i) = +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.host, o.port) +}.toArray +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler) + } + + /** + * This DOES NOT guarantee that side-effects of an action will see each message exactly once. --- End diff -- Cool, changing it to createNewStream for now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23746302 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala --- @@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } def createTopic(topic: String) { -CreateTopicCommand.createTopic(zkClient, topic, 1, 1, 0) --- End diff -- Yeah, they just shuffled some of their classes around. You'll notice the PR floating around for writing to kafka does the same thing, because it also upgrades kafka --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23746360 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala --- @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { --- End diff -- As I recall, it's because the stream has a kafka cluster as a member value, and it needs to be able to be checkpointed. The current design of KafkaCluster is essentially stateless aside from configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71968759 packaging, makes sense method name, agreed, named it createNewStream for now offset range, see my explanation of the interface above. I think this is the last substantive / non-style issue to get worked out unit tests for the stream / kafkacluster, will see what I can do On Wed, Jan 28, 2015 at 8:49 PM, Tathagata Das notificati...@github.com wrote: At a high level these are the high level design issues that are still pending - Packaging - I think all the classes should be org.apache.spark.streaming.kafka (even KafkaRDD), because (i) all of them are published in spark-streaming-kafka artifact, (ii) just importing one path (o.a.spark.streaming.kafka.KafkaUtils._) is sufficient get all the relevant classes. - KafkaUtils method name and scala doc - Lets keep thinking of names, but lets not stay blocked on this and continue focussing on other issues. - OffsetRange - I have proposed a design as well. Either we will go for Java interface or the simple class that I proposed. - Unit tests - No unit test for the new stream. Also is it possible to include a few unit tests for KafkaCluster? Other than these, I will start commenting on the code styles, etc very soon. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71960076. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23748806 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +trait OffsetRange { --- End diff -- No, it's not attaching the offset to every record, that's what i'm trying to avoid. It's dealing with the offsets either once per rdd, or once per partition, depending on what is necessary from a client semantics point of view. Hopefully accessing the RDD from inside foreachRDD isn't contentious? As for accessing the partition, yeah its pretty weird to have to go index the rdd to get the partition... but it's also pretty weird that the existing apis named 'mapPartition*' don't actually give you access to the partition... after all, the partition is serializable. On Wed, Jan 28, 2015 at 11:47 PM, Tathagata Das notificati...@github.com wrote: In external/kafka/src/main/scala/org/apache/spark/rdd/kafka/OffsetRange.scala https://github.com/apache/spark/pull/3798#discussion_r23747942: + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +trait OffsetRange { Right, I get your point. Though I thought about the usage based on the example, and I think we need to think this a little bit more. From what I understood, you are attaching the offset in every records, and shuffling everything with that offset attached. That is quite a loss of efficiency. Also, accessing the RDD and its partition object from within the mapPartition function is very confusing, and ... does it actually work If at all this works, thats not even the recommended RDD operation! We really need to come up with a better way to expose offsets. Brainstorming a little more on this. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798/files#r23747942. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23616474 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag]( } /** + * Applies a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def foreachPartitionWithIndex( --- End diff -- Yeah, I was originally doing map followed by empty foreach and thought it looked confusing. I think it's really a question of what's easier to explain, it's just a syntax sugar issue not a correctness issue, so no problem either way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71667244 Most of Either's problems can be fixed with a one-line implicit conversion to RightProjection. I've seen scalactic before, seems like overkill by comparison. On Mon, Jan 26, 2015 at 8:08 PM, Imran Rashid notificati...@github.com wrote: @koeninger https://github.com/koeninger I doubt that we want to go this route in this case, but just in case you're interested, I think a much better way to handle multiple errors gracefully is with scalactic's Or http://www.scalactic.org/user_guide/OrAndEvery. Its much better than Either for this case of building up a set of errors to report back to the user. And scalactic is a nicely designed, small library (eg. you're not pulling scalaz). Probably not worth it for this one case, but thought you might find it interesting :) â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71577982. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71549132 Just updated it On Mon, Jan 26, 2015 at 4:06 PM, Hari Shreedharan notificati...@github.com wrote: @koeninger https://github.com/koeninger Can you run sbt scalastyle and fix the style violations? There are like 2 instances where there are lines 100 chars. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71548572. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23576495 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.{FetchRequestBuilder, FetchResponse} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +/** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each KafkaRDDPartition in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param messageHandler function for translating each message into the desired type + */ +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +sc: SparkContext, +val kafkaParams: Map[String, String], +val batch: Array[KafkaRDDPartition], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging { + + override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]] + + override def getPreferredLocations(thePart: Partition): Seq[String] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +// TODO is additional hostname resolution necessary here +Seq(part.host) + } + + override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +if (part.fromOffset = part.untilOffset) { + log.warn(Beginning offset is same or after ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new NextIterator[R] { +context.addTaskCompletionListener{ context = closeIfNeeded() } + +log.info(sComputing topic ${part.topic}, partition ${part.partition} + + soffsets ${part.fromOffset} - ${part.untilOffset}) + +val kc = new KafkaCluster(kafkaParams) +val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[K]] +val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[V]] +val consumer = connectLeader +var requestOffset = part.fromOffset +var iter: Iterator[MessageAndOffset] = null + +// The idea is to use the provided preferred host, except on task retry atttempts, +// to minimize number of kafka metadata requests +private def connectLeader: SimpleConsumer = { + if (context.attemptNumber 0) { +kc.connectLeader(part.topic, part.partition).fold( + errs = throw new Exception( +sCouldn't connect to leader for topic
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71564796 I'm not a big fan of either either :) The issue here is that KafkaCluster is potentially dealing with multiple exceptions due to multiple brokers. As a user of library code, I'd potentially want access to all of the exceptions, not just the first or last one. It sounds like TD is leaning towards not making KafkaCluster exposed, in which case we can do whatever makes sense for the internal usage of it. On Mon, Jan 26, 2015 at 5:03 PM, Reynold Xin notificati...@github.com wrote: BTW one other thing - Either is really dangerous and very complicated to use. It almost always leads to downstream code becoming a mess. You are mostly just using it for exception propagation. Why not just throw exceptions? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71557370. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71565526 I think as long as offsets are available for advanced users that want them, relying on checkpointing for the happy path should be ok. Will probably be some design doc discussion on that shortly. On Mon, Jan 26, 2015 at 4:29 PM, Hari Shreedharan notificati...@github.com wrote: @koeninger https://github.com/koeninger - One general question: Since each checkpoint has the last offset of each batch - could we not start pulling data from Kafka from that offset, rather than having the user save the offset and restart from there? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71552262. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23418815 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.annotation.tailrec +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int = 1 +) extends InputDStream[R](ssc_) with Logging { + + private val kc = new KafkaCluster(kafkaParams) + + private val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + // TODO based on the design of InputDStream's lastValidTime, it appears there isn't a + // thread safety concern with private mutable state, but is this certain? + private var currentOffsets = fromOffsets + + @tailrec + private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, Long] = { +val o = kc.getLatestLeaderOffsets(currentOffsets.keys.toSet) +// Either.fold would confuse @tailrec, do it manually +if (o.isLeft) { + val err = o.left.get.toString + if (retries = 0) { +throw new Exception(err) + } else { +log.error(err) +Thread.sleep(kc.config.refreshLeaderBackoffMs) +latestLeaderOffsets(retries - 1) + } +} else { + o.right.get +} + } + + private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = { +maxMessagesPerPartition.map { mmp = + leaderOffsets.map { kv = +val (k, v) = kv +val curr = currentOffsets(k) +val diff = v - curr +if (diff mmp) (k, curr + mmp
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71122072 I need to know, perhaps even at the driver, what the ending offset is in order to be able to commit it. I also have several use cases where I want to end a batch at a specific point which may or may not be now. On Jan 22, 2015 5:33 PM, Hari Shreedharan notificati...@github.com wrote: OK. Just a thought: Do you think there might be a way to avoid the spikes? Once the current RDD is checkpointed, create a new pending RDD, which continuously receives data, until the compute method is called. When compute gets called, the last offset we received can be considered to be the upper bound, and the data is now available for transformations. That way, we could spread out network transfers from Kafka over a larger period. Not sure if there are holes in that algorithm, but it looks almost equivalent to the current model, no? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71121466. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71120030 Yeah, it's pulled down every batch interval. That way you know exactly what the upper and lower bounds of the offsets are. On Thu, Jan 22, 2015 at 5:15 PM, Hari Shreedharan notificati...@github.com wrote: I like this! I didn't try building it, but the logic is great! So, to sum up the idea - the key detail here is that the checkpoint contains the metadata to regenerate the RDDs, thus original order and batches are recovered. That looks good - it was the same thing I was trying to see if we could do in the Kafka receiver, but it would be difficult without some API changes. That brings me to a question - so in this PR, is the data pulled down from Kafka only once every batch interval - say every 2 seconds, or is there a way to generate it continuously rather than have spikes? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71119114. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-71131380 Point is, it's up to client code to commit, so that it can implement exactly-once semantics if necessary. Committing automatically at the end of compute would get you something like at-most-once semantics. Regarding the spikiness, in practice, I think you're just going to end up doing what you do with any other streaming job, namely tuning the batch size down until it's just comfortably above the processing time. On Thu, Jan 22, 2015 at 6:02 PM, Hari Shreedharan notificati...@github.com wrote: Say we start pulling down info in real time, wouldn't it be possible to say get me only n messages -- that should take care of the second point. I am not sure how the ending offset part is a problem. Wouldn't it make sense to do the commits at the end of the compute call? (You don't actually commit the offsets to Kafka anyway, correct?) â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-71124795. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-69754083 1. Yes, I removed ivy and maven cache, verified the example app failed to locate the dependency, re-published from the spark dev version, verified the example app now found it 2. Yes, I've tried spark-streaming both provided and included in the assembly. The real issue is probably that spark-streaming-kafka can't be marked as provided and must be included in the assembly (more on this in a second) 3. Yes, it's spark-submit to an instance of spark running out of the same spark dev version. So like I was saying about #2, the class that is failing to load, KafkaRDDPartition, is in the spark-streaming-kafka jar, not spark or spark-streaming, so it's not available by default. It clearly will end up on the classpath when it's included in an application jar, because the committed working version of the code that checkpoints tuples can successfully convert to KafkaRDDPartition in restore(). It's just not available in the classloader that's reading the checkpoint. Further evidence of this is that if I move only KafkaRDDPartition into the spark-streaming artifact, KafkaRDDPartition can be successfully read from the checkpoint. KafkaRDDPartition doesn't actually have any dependencies on anything other than Partition, so moving it into spark-streaming might be a solution... your call on whether you think that's uglier than saving to / from tuples, or if you want to dig further into the classloader issue. On Mon, Jan 12, 2015 at 10:53 PM, Tathagata Das notificati...@github.com wrote: Can you confirm the following. 1. In your SBT/maven app used for testing, you are using your development Spark version to compile? That is, the dev version is locally publish and you are compiling your app against spark version 1.3.0-SNAPSHOT? 2. Do you have spark-streaming dependency as provided scope or the default compile scope? And then are you creating uber jar of the app? 3. Are you submitting the app through spark-submit to the same development Spark version to compile? On Mon, Jan 12, 2015 at 2:13 PM, Cody Koeninger notificati...@github.com wrote: Yeah, this is on a local development version, after assembly / publish local. Here's a gist of the exception and the diff that causes it (using KafkaRDDPartition instead of a tuple) https://gist.github.com/koeninger/561a61482cd1b5b3600c â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-69656800. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-69695353. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-69651872 The classloader issue was when reading from the checkpoint. If we want to rely on subclassing, some of the implementation (e.g. currentOffsets and latestLeaderOffsets) should probably be made protected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-69656800 Yeah, this is on a local development version, after assembly / publish local. Here's a gist of the exception and the diff that causes it (using KafkaRDDPartition instead of a tuple) https://gist.github.com/koeninger/561a61482cd1b5b3600c --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-69446001 I went ahead and implemented locality and checkpointing of generated rdds. Couple of points - still depends on SPARK-4014 eventually being merged, for efficiency's sake. - I ran into classloader / class not found issues trying to checkpoint KafkaRDDPartition directly. Current solution is to transform them to/from tuples, ugly but it works. If you know what the issue is there, let me know. - I've got a use case that requires overriding the compute method on the DStream (basically, modifying offsets to a fixed delay rather than now). I'm assuming you'd prefer a user supplied function to do the transformation rather than subclassing, but let me know. On Mon, Jan 5, 2015 at 7:59 PM, Tathagata Das notificati...@github.com wrote: Great! Keep me posted. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-68815205. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22653565 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.control.NonFatal +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.Err + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connect(hostAndPort: (String, Int)): SimpleConsumer = +connect(hostAndPort._1, hostAndPort._2) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(connect) + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(seedBrokers, errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { t = +t.partitionsMetadata.find(_.partitionId == partition) + }.foreach { partitionMeta = +partitionMeta.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +getPartitionMetadata(topicAndPartitions.map(_.topic)).right.flatMap { tms = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keys.toSet) +val err = new Err +err.append(new Exception(sCouldn't find leaders for ${missing})) +Left(err) + } +} + } + + def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = +getPartitionMetadata(topics).right.map { r = + r.flatMap { tm
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22653442 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.control.NonFatal +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.Err + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connect(hostAndPort: (String, Int)): SimpleConsumer = +connect(hostAndPort._1, hostAndPort._2) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(connect) + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(seedBrokers, errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { t = +t.partitionsMetadata.find(_.partitionId == partition) + }.foreach { partitionMeta = +partitionMeta.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +getPartitionMetadata(topicAndPartitions.map(_.topic)).right.flatMap { tms = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keys.toSet) +val err = new Err +err.append(new Exception(sCouldn't find leaders for ${missing})) +Left(err) + } +} + } + + def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = +getPartitionMetadata(topics).right.map { r = + r.flatMap { tm
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22653905 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.FetchRequestBuilder +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +case class KafkaRDDPartition( + override val index: Int, + topic: String, + partition: Int, + fromOffset: Long, + untilOffset: Long +) extends Partition + +/** A batch-oriented interface for consuming from Kafka. + * Each given Kafka topic/partition corresponds to an RDD partition. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the batch + * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) + * ending point of the batch + * @param messageHandler function for translating each message into the desired type + */ +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +sc: SparkContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +val untilOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging { + + assert(fromOffsets.keys == untilOffsets.keys, +Must provide both from and until offsets for each topic/partition) + + override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi = +val ((tp, from), index) = kvi +new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp)) + }.toArray + + override def compute(thePart: Partition, context: TaskContext) = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +if (part.fromOffset = part.untilOffset) { + log.warn(Beginning offset is same or after ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new NextIterator[R] { +context.addTaskCompletionListener{ context = closeIfNeeded() } + +val kc = new KafkaCluster(kafkaParams) +log.info(sComputing topic ${part.topic}, partition ${part.partition} + + soffsets ${part.fromOffset} - ${part.untilOffset}) +val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[K]] +val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[V]] +val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold( + errs = throw new Exception
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-68804547 I'm hopeful that SPARK-4014 will be finalized soon, waiting on that before doing the refactor for preferred locations. That will involve changing the partition fields to add leader host and port, and not looking up leaders on the executors until after a failure. After that, I can take your suggestion and emulate the approach taken by FileInputDStream to restore generatedRdds from a map of Time - Array[KafkaRddPartition] On Mon, Jan 5, 2015 at 5:43 PM, Tathagata Das notificati...@github.com wrote: Any thoughts on my comments on driver fault-tolerance? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-68802901. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22498572 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag]( } /** + * Applies a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def foreachPartitionWithIndex( --- End diff -- Are you saying that you would prefer for users of this class to have to use mapPartitionsWithIndex with a side-effect of storing data, and then add an empty foreach in order to trigger the job to actually be scheduled? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3543#discussion_r22446683 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala --- @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : NewOutputFormat[_, _]], - conf: Configuration = new Configuration) { --- End diff -- Based on what Marcelo Vanzin said on the dev list when I brought this issue up, the only reason the problem was still around for me to run into is because he changed some of the uses of new Configuration but not all of them. I agree it's used in a lot of different places, but I'm not sure how piecemeal fixes to only some of the places is helpful to users. Were there still specific concerns about particular classes? On Sun, Jan 4, 2015 at 6:28 AM, Tathagata Das notificati...@github.com wrote: In streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala https://github.com/apache/spark/pull/3543#discussion-diff-22438855: @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : NewOutputFormat[_, _]], - conf: Configuration = new Configuration) { The scope of this PR is pretty wide in terms of the number of classes it touches, causing issues as different places needs to be handled differently. If you considered moving this sort of changes (new Configuration to sparkContext.hadoopConfiguration) into a different PR that might be easier to get in. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3543/files#r22438855. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22446804 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.annotation.tailrec +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int = 1 +) extends InputDStream[R](ssc_) with Logging { + + private val kc = new KafkaCluster(kafkaParams) + + private val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) --- End diff -- I'm not sure specifically what you mean by window operations that require past data which needs to be pulled from Kafka every time. The current KafkaRDD code has a log every time compute() is called on the rdd to pull data from kafka, and for a job with a window operation, I only see compute called once for a given offset range, not repeatedly every time. Regarding the bigger question of how this approach stacks up to the two existing approaches... they're all different approaches with different tradeoffs, I don't think one has to win. I'd still have a use for the original receiver based class (not the WAL one), especially if SPARK-3146 or SPARK-4960 ever get merged. On Sat, Jan 3, 2015 at 8:57 PM, Tathagata Das notificati...@github.com wrote: In external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala https://github.com/apache/spark/pull/3798#discussion-diff-22436219: + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22362375 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + +import java.util.Properties +import kafka.api.FetchRequestBuilder +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties + +case class KafkaRDDPartition( + override val index: Int, + topic: String, + partition: Int, + fromOffset: Long, + untilOffset: Long +) extends Partition + +/** A batch-oriented interface for consuming from Kafka. + * Each given Kafka topic/partition corresponds to an RDD partition. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the batch + * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) + * ending point of the batch + * @param messageHandler function for translating each message into the desired type + */ +class KafkaRDD[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +sc: SparkContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +val untilOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R + ) extends RDD[R](sc, Nil) with Logging { + + assert(fromOffsets.keys == untilOffsets.keys, +Must provide both from and until offsets for each topic/partition) + + override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi = +val ((tp, from), index) = kvi +new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp)) + }.toArray + + override def compute(thePart: Partition, context: TaskContext) = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +if (part.fromOffset = part.untilOffset) { + log.warn(Beginning offset is same or after ending offset + +sskipping ${part.topic} ${part.partition}) + Iterator.empty +} else { + new NextIterator[R] { +context.addTaskCompletionListener{ context = closeIfNeeded() } + +val kc = new KafkaCluster(kafkaParams) +log.info(sComputing topic ${part.topic}, partition ${part.partition} + + soffsets ${part.fromOffset} - ${part.untilOffset}) +val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[K]] +val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(kc.config.props) + .asInstanceOf[Decoder[V]] +val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold( + errs = throw new Exception
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22364279 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.annotation.tailrec +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int = 1 +) extends InputDStream[R](ssc_) with Logging { + + private val kc = new KafkaCluster(kafkaParams) + + private val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) --- End diff -- You can have multiple receivers per topic, and the closest receiver-based analogue to my approach would be 1 receiver per partition, hence why I set it up that way. The semantics are documented in the scaladoc for the class. If we want people to be able to configure a very granular per-topic-per-partition maximum per batch we can, but it should probably be done as an (optional) argument rather than a configuration property. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22364358 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -788,6 +788,20 @@ abstract class RDD[T: ClassTag]( } /** + * Applies a function to each partition of this RDD, while tracking the index + * of the original partition. + */ + def foreachPartitionWithIndex( --- End diff -- The code for actually using the rdd and committing offsets transactionally is quite awkward without that method, see https://github.com/koeninger/kafka-exactly-once/commit/cb812c918f724b3ae5e57c66618276b6947ed30d --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22366068 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.annotation.tailrec +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int = 1 +) extends InputDStream[R](ssc_) with Logging { + + private val kc = new KafkaCluster(kafkaParams) + + private val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = ssc.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + // TODO based on the design of InputDStream's lastValidTime, it appears there isn't a + // thread safety concern with private mutable state, but is this certain? + private var currentOffsets = fromOffsets + + @tailrec + private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, Long] = { +val o = kc.getLatestLeaderOffsets(currentOffsets.keys.toSet) +// Either.fold would confuse @tailrec, do it manually +if (o.isLeft) { + val err = o.left.get.toString + if (retries = 0) { +throw new Exception(err) + } else { +log.error(err) +Thread.sleep(kc.config.refreshLeaderBackoffMs) +latestLeaderOffsets(retries - 1) + } +} else { + o.right.get +} + } + + private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = { +maxMessagesPerPartition.map { mmp = + leaderOffsets.map { kv = +val (k, v) = kv +val curr = currentOffsets(k) +val diff = v - curr +if (diff mmp) (k, curr + mmp
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22366140 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.control.NonFatal +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +class KafkaCluster(val kafkaParams: Map[String, String]) { --- End diff -- The rdd would be really unpleasant to actually use without the convenience methods exposed by KafkaCluster, especially if you're keeping your offsets in zookeeper and doing idempotent writes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22366683 --- Diff: external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.control.NonFatal +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +class KafkaCluster(val kafkaParams: Map[String, String]) { --- End diff -- for example https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala#L60 We also use it for doing things like e.g. starting a stream at the leader offsets before a given time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4014] Change TaskContext.attemptId to r...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3849#issuecomment-68405794 Thanks for this. Most of the uses of attemptId I've seen look like they were assuming it meant the 0-based attempt number. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4014] Change TaskContext.attemptId to r...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3849#issuecomment-68407388 The flip side is that it's already documented as doing the right thing: http://spark.apache.org/docs/1.1.1/api/scala/index.html#org.apache.spark.TaskContext val attemptId: Long the number of attempts to execute this task On Tue, Dec 30, 2014 at 4:38 PM, Patrick Wendell notificati...@github.com wrote: So personally I don't think we should change the semantics of attemptId because this has been exposed to user applications and they could silently break if we modify the meaning of the field (my original JIRA referred to an internal use of this). What it means right now is a global GUID over all attempts - that is a bit of an awkward definition, but I don't think it's fair to call this a bug - it was just a weird definition. So I'd be in favor of deprecating this in favor of taskAttemptId (a new field) and say that it was renamed to avoid confusion. Then we can add another field, attemptCount or attemptNum or something to convey the more intuitive thing. It will be slightly awkward, but if anyone reads the docs it should be obvious. In fact, we should probably spruce up the docs here for things like partitionID which right now are probably not super clear to users. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3849#issuecomment-68406594. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r22330325 --- Diff: external/kafka/pom.xml --- @@ -44,7 +44,7 @@ dependency groupIdorg.apache.kafka/groupId artifactIdkafka_${scala.binary.version}/artifactId - version0.8.0/version --- End diff -- It's due to the consumer offset management api only being available in 0.8.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-68307147 I got some good feedback from Koert Kuipers at Tresata regarding location awareness, so I'll be doing some refactoring to add that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-68149432 Hi @jerryshao I'd politely ask that anyone with questions read at least KafkaRDD.scala and the example usage linked from the jira ticket (it's only about 50 significant lines of code): https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalExample.scala I'll try to address your points. 1. Yes, each RDD partition maps directly to a Kafka (topic, partition, inclusive starting offset, exclusive ending offset) 2. It's a pull model, not a receiver push model. All the InputDStream implementation is doing is checking the leaders' highest offsets and defining an RDD based on that. When the RDD is run, its iterator makes a connection to kafka and pulls the data. This is done because it's simpler, and because using existing network receiver code would require dedicating 1 core per kafka partition, which is unacceptable from an ops standpoint. 3. Yes. The fault tolerance model is that it should be safe for any or all of the spark machines to be completely destroyed at any point in the job, and the job should be able to be safely restarted. I don't think you can do better than this. This is achieved because all important state, especially the storage of offsets, are controlled by client code, not spark. In both the transactional and idempotent client code approaches, offsets aren't stored until data is stored, so restart should be safe. Regarding your approach that you link, the problem there is (a) it's not a part of the spark distribution so people won't know about it, and (b) it assumes control of kafka offsets and storage in zookeeper, which makes it impossible for client code to control exactly once semantics. Regarding the possible semantic disconnect between spark streaming and treating kafka as a durable store of data from the past (assuming that's what you meant)... I agree there is a disconnect there. But it's a fundamental problem with spark streaming in that it implicitly depends on now rather than a time embedded in the data stream. I don't think we're fixing that with this ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/3798 [SPARK-4964] [Streaming] Exactly-once semantics for Kafka You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 kafkaRdd Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3798.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3798 commit 76913e23179228481c98fbba36a54ca32fe20aed Author: cody koeninger c...@koeninger.org Date: 2014-11-23T03:15:30Z Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader commit 1d706257ac848d37caeaff0409bf60b080d66e48 Author: cody koeninger c...@koeninger.org Date: 2014-11-23T06:10:56Z WIP on kafka cluster commit 0b94b3363cbc97c5d99e78c42f9be1c08a974fb1 Author: cody koeninger c...@koeninger.org Date: 2014-11-24T14:49:24Z use dropWhile rather than filter to trim beginning of fetch response commit 4dafd1b0d58215cb27218e569cb5bea9d5146815 Author: cody koeninger c...@koeninger.org Date: 2014-11-24T16:45:40Z method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster commit ce91c591569b8ac4e91dd29d013961fe0ee5c316 Author: cody koeninger c...@koeninger.org Date: 2014-11-24T18:07:24Z method to get consumer offsets, explicit error handling commit 7d050bcb0bcacfbd4a7b858cffae809fd2af8e9d Author: cody koeninger c...@koeninger.org Date: 2014-11-24T22:11:24Z methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior commit 783b4775c89dbcbde9172d34653eab2718eee494 Author: cody koeninger c...@koeninger.org Date: 2014-11-25T14:29:20Z update tests for kafka 8.1.1 commit 29c6b430cc6bf5e2354b397289c4445f4993fc5b Author: cody koeninger c...@koeninger.org Date: 2014-11-25T15:33:45Z cleanup logging commit 3c2a96af2322754e8c76000b083ec3630a03e8c8 Author: cody koeninger c...@koeninger.org Date: 2014-11-25T20:02:37Z fix scalastyle errors commit 4b078bf1e71745a6bc160c0836b54cc7b0d4171d Author: cody koeninger c...@koeninger.org Date: 2014-11-25T20:48:32Z differentiate between leader and consumer offsets in error message commit 8d7de4ab5a447a53f65be852702ca90512b2a639 Author: cody koeninger c...@koeninger.org Date: 2014-11-25T23:54:40Z make sure leader offsets can be found even for leaders that arent in the seed brokers commit 979da25f4d48e5ffccf13ba1ff66eb2527ff01f9 Author: cody koeninger c...@koeninger.org Date: 2014-11-26T15:31:38Z dont allow empty leader offsets to be returned commit 38bb727cf31744fa625248c86c2a666920e83c36 Author: cody koeninger c...@koeninger.org Date: 2014-12-03T21:42:25Z give easy access to the parameters of a KafkaRDD commit 326ff3cbda37066ebef7492241276754164d2879 Author: cody koeninger c...@koeninger.org Date: 2014-12-16T21:27:44Z add some tests commit 6bf14f2850f9f40f53b4c1eec373214e1b6d3465 Author: cody koeninger c...@koeninger.org Date: 2014-12-24T17:38:52Z first attempt at a Kafka dstream that allows for exactly-once semantics commit bcca8a4b69f73b48f71b9558adf718b5324ed933 Author: cody koeninger c...@koeninger.org Date: 2014-12-24T20:35:43Z Merge branch 'master' of https://github.com/apache/spark into kafkaRdd commit 37d305320e72de1ee6ffcd42f6a45d331a4d465d Author: cody koeninger c...@koeninger.org Date: 2014-12-25T04:41:40Z make KafkaRDDPartition available to users so offsets can be committed per partition commit cac63eec4a0bee6b662c4577404622a08904f0cb Author: cody koeninger c...@koeninger.org Date: 2014-12-25T07:11:58Z additional testing, fix fencepost error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3543#issuecomment-67337516 Jenkins is failing org.apache.spark.scheduler.SparkListenerSuite.local metrics org.apache.spark.streaming.flume.FlumeStreamSuite.flume input compressed stream I can't reproduce those test failures locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3543#discussion_r21610025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A : Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { --- End diff -- I seem to recall there being potential thread safety issues related to hadoop configuration objects, resulting in the need to create / clone them. Quick search turned up e.g. https://issues.apache.org/jira/browse/SPARK-2546 I'm not sure how relevant that is to all of these existing situations where new Configuration() is being called. On Tue, Dec 9, 2014 at 5:07 PM, Tathagata Das notificati...@github.com wrote: In sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala https://github.com/apache/spark/pull/3543#discussion-diff-21571141: @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A : Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { I think this should be using the hadoopConfiguration object in the SparkContext. That has all the hadoop related configuration already setup and should be what is automatically used. @marmbrus https://github.com/marmbrus should have a better idea. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3543/files#r21571141. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3543#discussion_r21638810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -262,7 +263,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def createParquetFile[A : Product : TypeTag]( path: String, allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { --- End diff -- So let me see if I have things straight - Currently, the code is using new Configuration() as a default, which may have some thread safety issues due to the constructor - my original patch uses SparkHadoopUtil.get.conf, which is a singleton, so should decrease the constructor thread safety problem, but increase the problems if the hadoop configuration is modified. It also won't do the right thing for people who have altered the sparkConf, which makes it no good (I haven't run into this in personal usage of the patched version, because I always pass in a complete sparkConf via properties rather than setting it in code) - @tdas suggested to use this.sparkContext.hadoopConfiguration. This will use the right spark config, but may have thread safety issues both at construction the time the spark context is created, and if the configuration is modified. So Use tdas' suggestion, add a HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized block to SparkHadoopUtil.newConfiguration? And people are out of luck if they have code that used to work because they were modifying new blank instances of Configuration, rather than the now-shared one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3543#discussion_r21192361 --- Diff: docs/configuration.md --- @@ -664,6 +665,24 @@ Apart from these, the following properties are also available, and may be useful /td /tr tr +tdcodespark.executor.heartbeatInterval/code/td --- End diff -- Pretty sure that's just diff getting confused based on where the hadoop doc changes were inserted, same lines are marked as removed lower in the diff --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Closes SPARK-4229 Create hadoop configuration ...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/3543 Closes SPARK-4229 Create hadoop configuration in a consistent way You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-4229-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3543.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3543 commit c41a4b4d0752b1a5b057611c796e367c5a806be6 Author: cody koeninger c...@koeninger.org Date: 2014-11-04T22:40:17Z SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are copied from spark config Resolved conflicts in favor of master. Conflicts: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala commit b48ad63fb9c31de90c8b5b0541129e2c71bd3478 Author: cody koeninger c...@koeninger.org Date: 2014-11-04T22:41:07Z SPARK-4229 document handling of spark.hadoop.* properties commit 413f916bafc5b218ab334cb9d66b67f3dbc117f7 Author: cody koeninger c...@koeninger.org Date: 2014-11-05T03:26:26Z SPARK-4229 fix broken table in documentation, make hadoop doc formatting match that of runtime env --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3102#issuecomment-65176731 Yes, the new hadoop config documentation is just documenting the behavior of SparkHadoopUtil.scala lines 95-100 Sorry about the branch situation, I was unclear on what the plan for 1.2 merges was. Opened a new PR that should merge cleanly into master https://github.com/apache/spark/pull/3543 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...
Github user koeninger closed the pull request at: https://github.com/apache/spark/pull/3102 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark 4229 Create hadoop configuration in a co...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/3102 Spark 4229 Create hadoop configuration in a consistent way You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-4229 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3102.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3102 commit 3cd384f77ba9505fe7c94c82980e07044f6b128c Author: cody koeninger c...@koeninger.org Date: 2014-11-04T22:40:17Z SPARK-4229 use SparkHadoopUtil.get.conf so that hadoop properties are copied from spark config commit f2ee4f9f1ed717d54fb7916ff2cf3ae85468eab0 Author: cody koeninger c...@koeninger.org Date: 2014-11-04T22:41:07Z SPARK-4229 document handling of spark.hadoop.* properties commit eebbdcc53caa214079612732d3a4a13e57cecffe Author: cody koeninger c...@koeninger.org Date: 2014-11-05T03:26:26Z SPARK-4229 fix broken table in documentation, make hadoop doc formatting match that of runtime env --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3462 push down filters and projections i...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/2345#issuecomment-55336261 @marbrus I see what you mean. Updated to basically what you suggested, aside from building the map once. Let me know, once it's finalized I can try to test one more time on live data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3462 push down filters and projections i...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/2345 SPARK-3462 push down filters and projections into Unions You can merge this pull request into a Git repository by running: $ git pull https://github.com/mediacrossinginc/spark SPARK-3462 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2345.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2345 commit ef47b3b80dd92f4652947ccffa5c9fea97adffb0 Author: Cody Koeninger cody.koenin...@mediacrossing.com Date: 2014-09-10T05:07:58Z SPARK-3462 push down filters and projections into Unions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/1612#discussion_r17279539 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcResultSetRDDSuite.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql._ + +import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.TestSQLContext._ + +class JdbcResultSetRDDSuite extends QueryTest with BeforeAndAfter { + + before { +Class.forName(org.apache.derby.jdbc.EmbeddedDriver) +val conn = DriverManager.getConnection(jdbc:derby:target/JdbcSchemaRDDSuiteDb;create=true) +try { + val create = conn.createStatement + create.execute( +CREATE TABLE FOO( + ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), + DATA INTEGER +)) + create.close() + val insert = conn.prepareStatement(INSERT INTO FOO(DATA) VALUES(?)) + (1 to 100).foreach { i = +insert.setInt(1, i * 2) +insert.executeUpdate + } + insert.close() +} catch { + case e: SQLException if e.getSQLState == X0Y32 = +// table exists +} finally { + conn.close() +} + } + + test(basic functionality) { +val jdbcResultSetRDD = jdbcResultSet(jdbc:derby:target/JdbcSchemaRDDSuiteDb, SELECT DATA FROM FOO) +jdbcResultSetRDD.registerAsTable(foo) + +checkAnswer( + sql(select count(*) from foo), + 100 +) +checkAnswer( + sql(select sum(DATA) from foo), + 10100 +) + } + + after { +try { + DriverManager.getConnection(jdbc:derby:;shutdown=true) +} catch { + case se: SQLException if se.getSQLState == XJ015 = --- End diff -- http://db.apache.org/derby/papers/DerbyTut/embedded_intro.html A clean shutdown always throws SQL exception XJ015, which can be ignored. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/1612#discussion_r17279806 --- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala --- @@ -81,8 +113,14 @@ class JdbcRDD[T: ClassTag]( logInfo(statement fetch size set to: + stmt.getFetchSize + to force MySQL streaming ) } -stmt.setLong(1, part.lower) -stmt.setLong(2, part.upper) +val parameterCount = stmt.getParameterMetaData.getParameterCount +if (parameterCount 0) { --- End diff -- Not that there's anything wrong with backwards compatible fixes/enhancements, but a few things I noticed here: 1. If it's a sufficiently small table that a user is only using 1 partition, why not encourage them to query it from the driver and broadcast it? 2. As it stands, it looks like you allow 0, 1, 2, or more ? placeholders, but the doc comment change only describes the 0 or 2 case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/1612#discussion_r17280404 --- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala --- @@ -67,6 +73,32 @@ class JdbcRDD[T: ClassTag]( }).toArray } + def getSchema: Seq[(String, Int, Boolean)] = { +if (null != schema) { + return schema +} + +val conn = getConnection() --- End diff -- Is this connection guaranteed to get closed? It won't benefit from the addOnCompleteCallback below, for instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2710] [SQL] Build SchemaRDD from a Jdbc...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/1612#discussion_r17280425 --- Diff: core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala --- @@ -67,6 +73,32 @@ class JdbcRDD[T: ClassTag]( }).toArray } + def getSchema: Seq[(String, Int, Boolean)] = { +if (null != schema) { + return schema +} + +val conn = getConnection() +val stmt = conn.prepareStatement(sql) +val metadata = stmt.getMetaData +try { + if (null != stmt ! stmt.isClosed()) { +stmt.close() + } +} catch { + case e: Exception = logWarning(Exception closing statement, e) +} +schema = Seq[(String, Int, Boolean)]() +for(i - 1 to metadata.getColumnCount) { + schema :+= ( --- End diff -- Are there any thread safety concerns regarding mutating schema here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1097: Do not introduce deadlock while fi...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/1409#issuecomment-49066268 Testing that patch, it seems to have fixed the deadlock we were seeing in production. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---