[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72893619 [Test build #26763 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26763/consoleFull) for PR 3798 at commit [`1dc2941`](https://github.com/apache/spark/commit/1dc29415e3c0ac23a4207513686dfe5ee5ab2725). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72893647 Here's a solution for subclassing ConsumerConfig while still silencing the warning. My son is doing ok(ish) now, thanks for the concern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72902696 That's a nifty solution :) I like it. Lets merge this as soon as the tests pass. Smaller changes like docs, etc. , we can do it in the next PR. @jerryshao I will add the example in a different PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72902389 Glad to hear that your son is doing ok, hope he gets better (okish-- great) real real soon. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72907295 [Test build #26763 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26763/consoleFull) for PR 3798 at commit [`1dc2941`](https://github.com/apache/spark/commit/1dc29415e3c0ac23a4207513686dfe5ee5ab2725). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) ` * `class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable ` * ` case class LeaderOffset(host: String, port: Int, offset: Long)` * `class KafkaRDDPartition(` * `trait HasOffsetRanges ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72926364 Yay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72907304 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26763/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72925200 Merging this. Thanks so much Cody! There will be a PR to fix a few things here and there soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3798 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72779779 Why did you choose the parameters metadata.broker.list and the bootstrap.servers as the required kafka params? I looked at the Kafka docs, and it says that for consumers, the necessary properties are zookeeper.connect and group.id. And intuitively the application is consuming, so the consumer configs should apply (not group.id, but zookeeper.connect). So our interface should also require zookeeper.connect and not other two. Isnt it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72782334 [Test build #26701 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26701/consoleFull) for PR 3798 at commit [`8c31855`](https://github.com/apache/spark/commit/8c31855cf6b7327c6b6611e715457ba15bb79355). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) ` * `class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable ` * ` case class LeaderOffset(host: String, port: Int, offset: Long)` * `class KafkaRDDPartition(` * `trait HasOffsetRanges ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72782343 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26701/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72775833 [Test build #26701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26701/consoleFull) for PR 3798 at commit [`8c31855`](https://github.com/apache/spark/commit/8c31855cf6b7327c6b6611e715457ba15bb79355). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72777123 Ohh I meant createStream -- createDirectStream. I would have preferred something like createReceiverLessStream but thats a mouthful. I think direct is something that comes close without being a mouthful. Had not occurred to me until Patrick suggested it. And the underlying assumptions, I confess are not super concrete. Somethings like binary compatiblity issues (ex, do not use scala traits with implemented methods) are fairly concrete, where as things about API elegance (e.g. rdd.asInstanceOf[KafkaRDD] vs rdd.asInstanceOf[HasOffsetRanges]) are a little fuzzy and opinions vary from person to person. Often what seems intuitive to me is not intuitive to someone else, even within the key committers like Patrick, Michael, Matei, etc. We usually argue about this in design docs, get as many eyeballs as possible, and try to reach a consensus. Its is indeed a bit fuzzy, but its all towards making the API that we *think* will be the best for the developers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72778614 [Test build #26706 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26706/consoleFull) for PR 3798 at commit [`59e29f6`](https://github.com/apache/spark/commit/59e29f61cd6a730eeea4e47a5316cbbe47615618). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72780349 High level consumers connect to ZK. Simple consumers (which is what this is using) connect to brokers directly instead. See https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example I chose to accept either of the two existing means in Kafka of specifying a list of seed brokers, rather than making up yet a third way On Tue, Feb 3, 2015 at 8:36 PM, Tathagata Das notificati...@github.com wrote: Why did you choose the parameters metadata.broker.list and the bootstrap.servers as the required kafka params? I looked at the Kafka docs, and it says that for consumers, the necessary properties are zookeeper.connect and group.id. And intuitively the application is consuming, so the consumer configs should apply (not group.id, but zookeeper.connect). So our interface should also require zookeeper.connect and not other two. Isnt it? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72779779. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72784748 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26706/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72784745 [Test build #26706 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26706/consoleFull) for PR 3798 at commit [`59e29f6`](https://github.com/apache/spark/commit/59e29f61cd6a730eeea4e47a5316cbbe47615618). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) ` * `class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable ` * ` case class LeaderOffset(host: String, port: Int, offset: Long)` * `class KafkaRDDPartition(` * `trait HasOffsetRanges ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72787965 I think the simplest solution is to assign zookeeper.connect. But you are assigning it in KafkaCluster lines 338 - 345. So why is this warning being thrown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72789850 Hi @tdas , should we add a example to show users how to use this new Kafka API correctly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72791220 Holy crap! Dont bother about this at all. This can wait. I hope everything is okay. Take care and all the best! On Feb 3, 2015 8:45 PM, Cody Koeninger notificati...@github.com wrote: The warning is for metadata.broker.list, since its not expected by the existing ConsumerConfig (its used by other config classes) Couldn't get subclassing to work, the verifiedproperties class it uses is very dependent on order of operations during construction. I think the simplest thing is a class that is constructed using kafkaparams, and uses the static defaults from the ConsumerConfig object. I'm currently waiting in an ER with my child with a 105 fever, so won't be getting to it for a few hours to say the least. On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote: I think the simplest solution is to assign zookeeper.connect. But you are assigning it in KafkaCluster lines 338 - 345. So why is this warning being thrown? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72787965. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72790044. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72779615 Yeah, there's a weird distinction in Kafka between simple consumers and high level consumers in that they have a lot of common configuration parameters, but one of them talks directly to brokers and the other goes through zk. I'll see if I can make a private subclass of ConsumerConfig to shut that warning up. On Tue, Feb 3, 2015 at 8:28 PM, Tathagata Das notificati...@github.com wrote: Hey Cody, I was trying it and I found a odd behavior. It was printing this repeatedly. 15/02/03 18:22:08 WARN VerifiableProperties: Property metadata.broker.list is not valid I was using this code. val kafkaParams = Map[String, String](metadata.broker.list - brokerList) val lines = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) I chose metadata.broker.list from the code in KafkaCluster, because without that I was getting exception from the KafkaCluster. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72779120. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72790044 The warning is for metadata.broker.list, since its not expected by the existing ConsumerConfig (its used by other config classes) Couldn't get subclassing to work, the verifiedproperties class it uses is very dependent on order of operations during construction. I think the simplest thing is a class that is constructed using kafkaparams, and uses the static defaults from the ConsumerConfig object. I'm currently waiting in an ER with my child with a 105 fever, so won't be getting to it for a few hours to say the least. On Feb 3, 2015 10:15 PM, Tathagata Das notificati...@github.com wrote: I think the simplest solution is to assign zookeeper.connect. But you are assigning it in KafkaCluster lines 338 - 345. So why is this warning being thrown? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72787965. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72783141 Yeah, more importantly it's so defaults for things like connection timeouts match what kafka provides. It's possible to assign fake zookeeper.connect and have it pass verification, that's what existing code does. Unfortunately ConsumerConfig has a private constructor so subclassing it in order for the broker list to pass verification without that warning may prove to be tricky. Worst case scenario I'll re-implement a config that uses the kafka defaults. On Tue, Feb 3, 2015 at 9:05 PM, Tathagata Das notificati...@github.com wrote: I see. ConsumerConfig is really necessary only for high-level consumer, but you are using it configure stuff in the low level consumer as well. That is so that you dont have to introduce parameter strings to configure them yourselves. Is it possible to assign fake but verified zookeeper.connect ? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72782434. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72779120 Hey Cody, I was trying it and I found a odd behavior. It was printing this repeatedly. ``` 15/02/03 18:22:08 WARN VerifiableProperties: Property metadata.broker.list is not valid ``` I was using this code. ``` val kafkaParams = Map[String, String](metadata.broker.list - brokerList) val lines = KafkaUtils.createNewStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) ``` I chose metadata.broker.list from the code in KafkaCluster, because without that I was getting exception from the KafkaCluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72782434 I see. ConsumerConfig is really necessary only for high-level consumer, but you are using it configure stuff in the low level consumer as well. That is so that you dont have to introduce parameter strings to configure them yourselves. Is it possible to assign fake but verified zookeeper.connect ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user dibbhatt commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24010956 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23988445 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { --- End diff -- I noticed that KafkaRDD isn't exposed, so maybe this is why. Not sure I see a big issue with exposing KafkaRDD and its constructor given that it's basically the same level of visibility as this static factory function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23988871 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +object OffsetRange { + private[spark] + type OffsetRangeTuple = (String, Int, Long, Long) + + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = --- End diff -- It's confusing to have both `create` and the apply methods here. Why not just have one way of creating these? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23989829 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type --- End diff -- How is this message handler different than having the user just call a map function on a returned RDD? It seems a little risky because this is exposing a Kafka class in the byte code signature, which they could relocate in a future release in a way that causes this to break for callers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23989786 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], --- End diff -- Is this version of the constructor assuming that they caller has their own code for finding the leaders? From what I can tell we've locked down the utility function for doing that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23990370 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23988318 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { --- End diff -- I've never seen a trait mixin in a return type. What does this actually mean? I looked at the compiled byte code and the byte code signature is still RDD. Can we just return a `KafkaRDD` here? If this is enforced somehow by the scala compiler, returning an interface here ties our hands in the future, because we can't add functionality to the returned type without breaking binary compatibility. For instance, we may want to return an RDD that has additional methods beyond just accessing its offset ranges. I ran a simple example and I couldn't see any byte code reference to the mixed in trait: ``` trait Trait {} class Class extends Trait {} object Object { def getTrait: Class with Trait = {new Class()} } javap -v Object public static Class getTrait(); flags: ACC_PUBLIC, ACC_STATIC Code: stack=1, locals=0, args_size=0 0: getstatic #16 // Field Object$.MODULE$:LObject$; 3: invokevirtual #18 // Method Object$.getTrait:()LClass; 6: areturn ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23988994 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Host info for the leader of a Kafka TopicAndPartition */ +final class Leader private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** kafka hostname */ +val host: String, +/** kafka host's port */ +val port: Int) extends Serializable + +object Leader { + def create(topic: String, partition: Int, host: String, port: Int): Leader = --- End diff -- Similar with offset ranges, can't we just have a single way to construct these? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23988976 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ --- End diff -- This comment might be more helpful to include where `OffsetRangeTuple` is defined rather than here. I spent a long time trying to figure out why this extra class existed. Also, can you give a bit more detail. Not sure I see why you can't recover from a checkpoint safely provided that the recovering JVM has the class `OffsetRangeTuple` defined. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23989107 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +object OffsetRange { + private[spark] + type OffsetRangeTuple = (String, Int, Long, Long) --- End diff -- Can you group this at the bottom with the related `apply` method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23989943 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- Isn't the returned RDD of type `RDD[R]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72749763 @koeninger Thank you very much for all the changes. They are looking good. Unfortunately the issue with `createRDD` returning `RDD[] with OffsetRange` (i.e., the issue that @pwendell raised) could be a problem in the future in terms of binary compatibility. Basically, we have not used such things in the rest of Spark to keep things as Java-friendly and binary compatible as possible. Also in the generated Java doc this looks scary. So here is an alternate suggestion that I am trying to implement on top of your PR branch. How about this. We effectively combine KafkaRDD and HasOffsetRanges into a abstract class. ``` abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: SparkContext) extends RDD[T](sc, Nil) private[kafka] class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] { ... } KafkaUtils.createRDD(...simple one without messageHandler...): KafkaRDD[(K, V)] = { // return KafkaRDDImpl[K, V, KD, VD, (K, V)] } KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = { // return KafkaRDDImpl[K, V, KD, VD, R] } ``` Advantages - No binary compatibility issues - Easy to read from Java - KafkaRDD implementation and constructor all hidden as before - Can still extend KafkaRDD to expose more methods in future. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019631 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], --- End diff -- That's correct on both counts. If you don't provide a way for clients to supply offset ranges and leaders at the same, you're forcing twice the number of remote calls (because the usual way to get the end of the offset range is to talk to the leader). Yes, there's no way for people to actually use this currently unless they have their own copy of the functionality provided by KafkaCluster. In my case, I'm just going to remove SparkException from KafkaCluster, since it's the only spark dependency, and distribute it as a separate jar under a different namespace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24018460 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { --- End diff -- There is definitely a bytecode difference, try running diff on the class files. It statically guarantees you can call .hasOffsetRanges on the thing returned from createRDD. Without it, you'd have to cast at runtime. If you add e.g. a .chzburger method to KafkaRDD, you wont be able to call it without asInstanceOf. If you then made a Chzburger interface, implement it on KafkaRDD, change the return type to RDD with HasOffsetranges with Chzburger, you would. I hear your concern about binary compatibility. As far as exposing KafkaRDD instead... that's the way I originally designed things. The current design is the result of a compromise between TD's desire as a maintainer to hide as much as possible, and my desire as a user to expose what's necessary to get my job done. You can usually tell it's a good compromise if no one is happy :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72691392 Regarding naming, I agree. The name has been a point of discussion for a month, how to get some consensus? Regarding Java wrappers, there have already been a number of changes directed at java compatibility (array of Leader instead of a map[topic, broker], .create in addition to .apply). I wonder how relevant those are if we're doing separate java wrappers (which yes, I agree should be in a follow-on PR) On Tue, Feb 3, 2015 at 3:13 AM, Patrick Wendell notificati...@github.com wrote: I took a pass through the public API. I'm not very familiar with Kafka so it was somewhat slow going. However, some reactions: 1. We should try to tighten, simplify, and clarify the way we name and document everything in this public API. Most of the comments were about this. The most important IMO is coming up with a good name for the new streams returned and clearly explaining how they differ from the old Kafka stream. To me, the main differences seems to be in the way we (a) decide what goes into which batch and (b) actually ingest the data. I proposed javadoc and naming scheme that emphasizing that distinction. 2. Is there plans to add a Java and Python wrappers here next? Those are straightforward and it would be good to have them. Maybe in a follow on PR? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72617088. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24017652 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019904 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- I think we want the first sentence of the doc to convey why someone would choose this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019256 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ --- End diff -- There was discussion of this earlier that has since gotten buried. Here's the gist: https://gist.github.com/koeninger/561a61482cd1b5b3600c The classloader being used for restoring the checkpoint doesn't have that class, probably because it's in external (and thus included in the user assembly), rather than one of the spark jars thats on the default classpath. I went ahead and duplicated that comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user dibbhatt commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24015154 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24017456 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- There are 2 methods, one that takes a messagehandler (and thus returns RDD[R], and one that doesnt take a messagehandler as an argument, but provides a default one, so instead returning RDD[(K, V)] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019815 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24018579 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +object OffsetRange { + private[spark] + type OffsetRangeTuple = (String, Int, Long, Long) + + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = --- End diff -- TD thought that a static method named .create was more idiomatic for java. It's obviously more idiomatic for scala to have a .apply method since the syntax sugar for it is baked into the language. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24020676 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user dibbhatt commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23999633 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user dibbhatt commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24001371 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import kafka.serializer.StringDecoder +import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.SparkContext._ + +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var sc: SparkContext = _ + before { +setupKafka() + } + + after { +if (sc != null) { + sc.stop + sc = null +} +tearDownKafka() + } + + test(Kafka RDD) { +val sparkConf = new SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName) +sc = new SparkContext(sparkConf) +val topic = topic1 +val sent = Map(a - 5, b - 3, c - 10) +createTopic(topic) +produceAndSendMessage(topic, sent) + +val kafkaParams = Map(metadata.broker.list - slocalhost:$brokerPort, + group.id - stest-consumer-${Random.nextInt(1)}) + +val kc = new KafkaCluster(kafkaParams) + +val rdd = getRdd(kc, Set(topic)) +// this is the lots of messages case +// make sure we get all of them +assert(rdd.isDefined) +assert(rdd.get.count === sent.values.sum) + +kc.setConsumerOffsets( --- End diff -- Here you are committing the RDD offset to ZK via SimpleConsumer commitOffset call for next getRdd call . If committOffset call fails , what to do ? Isn't it breaking the logic of keeping the offset along with RDD concept mentioned in the design document ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72709829 [Test build #26658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26658/consoleFull) for PR 3798 at commit [`0df3ebe`](https://github.com/apache/spark/commit/0df3ebe1eed5b149c03a828db621fbc60edf). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) ` * `class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable ` * ` case class LeaderOffset(host: String, port: Int, offset: Long)` * `class KafkaRDDPartition(` * `trait HasOffsetRanges ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72709851 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26658/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24021621 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import org.apache.spark.SparkException + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new SparkException(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( + topicAndPartitions: Set[TopicAndPartition] +): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +val response = getPartitionMetadata(topics).right +val answer = response.flatMap { tms: Set[TopicMetadata] = + val leaderMap = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + + if (leaderMap.keys.size == topicAndPartitions.size) { +Right(leaderMap) + } else { +val
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72696610 [Test build #26658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26658/consoleFull) for PR 3798 at commit [`0df3ebe`](https://github.com/apache/spark/commit/0df3ebe1eed5b149c03a828db621fbc60edf). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24020938 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import org.scalatest.BeforeAndAfter +import kafka.common.TopicAndPartition + +class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + val brokerHost = localhost + + val kafkaParams = Map(metadata.broker.list - s$brokerHost:$brokerPort) + + val kc = new KafkaCluster(kafkaParams) + + val topic = kcsuitetopic + Random.nextInt(1) + + val topicAndPartition = TopicAndPartition(topic, 0) + + before { +setupKafka() +createTopic(topic) +produceAndSendMessage(topic, Map(a - 1)) + } + + after { +tearDownKafka() + } + + test(metadata apis) { +val leader = kc.findLeaders(Set(topicAndPartition)).right.get +assert(leader(topicAndPartition) === (brokerHost, brokerPort), didn't get leader) + +val parts = kc.getPartitions(Set(topic)).right.get +assert(parts(topicAndPartition), didn't get partitions) + } + + test(leader offset apis) { +val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get +assert(earliest(topicAndPartition).offset === 0, didn't get earliest) + +val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get +assert(latest(topicAndPartition).offset === 1, didn't get latest) + } + + test(consumer offset apis) { +val group = kcsuitegroup + Random.nextInt(1) + +val offset = Random.nextInt(1) + +val set = kc.setConsumerOffsets(group, Map(topicAndPartition - offset)) +assert(set.isRight, didn't set consumer offsets) + --- End diff -- I'm not sure exactly what the question here is, but this test is just verifying that the consumer offset apis work. They aren't publicly exposed, so the question of how people might misuse them is somewhat premature. That being said, the reason you'd typically want to use this api would be for interop with existing kafka monitoring tools that expect offsets in ZK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24021039 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.Random + +import kafka.serializer.StringDecoder +import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.SparkContext._ + +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var sc: SparkContext = _ + before { +setupKafka() + } + + after { +if (sc != null) { + sc.stop + sc = null +} +tearDownKafka() + } + + test(Kafka RDD) { +val sparkConf = new SparkConf().setMaster(local[4]).setAppName(this.getClass.getSimpleName) +sc = new SparkContext(sparkConf) +val topic = topic1 +val sent = Map(a - 5, b - 3, c - 10) +createTopic(topic) +produceAndSendMessage(topic, sent) + +val kafkaParams = Map(metadata.broker.list - slocalhost:$brokerPort, + group.id - stest-consumer-${Random.nextInt(1)}) + +val kc = new KafkaCluster(kafkaParams) + +val rdd = getRdd(kc, Set(topic)) +// this is the lots of messages case +// make sure we get all of them +assert(rdd.isDefined) +assert(rdd.get.count === sent.values.sum) + +kc.setConsumerOffsets( --- End diff -- See previous answer. Also, there's nothing inherently wrong with keeping offsets in ZK for the idempotent case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24029555 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24030535 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ --- End diff -- Created this so we don't lose track: https://issues.apache.org/jira/browse/SPARK-5569 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24029025 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { --- End diff -- It's just confusing though because the byte code reutrn type does not reflect the presence of the trait. I guess it is embbedded in the ScalaSig for the function. In any case, let me talk to TD, becuase this is an issue wrt binary compatiblity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24030347 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24031550 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- The comment about unused R was referring to a prior version of the PR that had a copy-pasted type-level R parameter even for the version that returned RDD[(K, V)]. Github probably just got confused because the comment wasn't attached to the particular line in question. Pretty sure things are correct at this point. On Tue, Feb 3, 2015 at 1:05 PM, Patrick Wendell notificati...@github.com wrote: In external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala https://github.com/apache/spark/pull/3798#discussion_r24029463: + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( Yeah, makes sense. But the comment here suggests R is not used, however, I see R being used in the return type. So that was my point. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798/files#r24029463. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24028967 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- That applies to people who already know what createStream does. But think about the people who is looking at KafkaUtilsl forthe first time, and happens to look upon this method first. Unless the first sentence conveys what this does, they will get extremely confused. As a developer I think the question i need answering first is what does this do. what does this do different from the other one comes second, and only after I know what this one does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24029286 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24033509 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- The documentation has already been changed several times since your previous comments. The current version of it doesn't make any comparison to existing createStream calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24028643 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24029463 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,249 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param batch Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( --- End diff -- Yeah, makes sense. But the comment here suggests `R` is not used, however, I see `R` being used in the return type. So that was my point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24029652 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition + +/** Something that has a collection of OffsetRanges */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** Represents a range of offsets from a single Kafka TopicAndPartition */ +final class OffsetRange private( +/** kafka topic name */ +val topic: String, +/** kafka partition id */ +val partition: Int, +/** inclusive starting offset */ +val fromOffset: Long, +/** exclusive ending offset */ +val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** this is to avoid ClassNotFoundException during checkpoint restore */ --- End diff -- Thanks cody - I am going to create a JIRA to document this since it's not obvious, not sure if it's a bug or a feature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24031208 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations + * (as opposed to output actions) exactly once, even in most failure situations. + * + * Points to note: + * + * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them + * as the fromOffsets parameter on restart. + * Kafka must have sufficient log retention to obtain messages after failure. + * + * Getting offsets from the stream - see programming guide + * +. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors + * that depend on Zookeeper, you must store offsets in ZK yourself. + * + * End-to-end semantics - This does not guarantee that any output operation will push each record + * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and + * outputting exactly once), you have to either ensure that the output operation is + * idempotent, or transactionally store offsets with the output. See the programming guide for + * more details. + * + * @param ssc StreamingContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72758821 Besides introducing 2 classes where 1 would do, it implies that there are (or could be) multiple implementations of the abstract class. You're not using it because you're actually planning for subclassing, you're using it as a workaround for returning a slightly less complicated type from a single method, where there's an alternative... just return RDD[(K, V)] for that one method. This really is a situation where there's only 1 implementation for the foreseeable future, and a single final concrete class would be cleaner. On Tue, Feb 3, 2015 at 5:21 PM, Tathagata Das notificati...@github.com wrote: I dont get it, what's the complication with abstract classes? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72758029. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72759859 But of course there can be multiple implementations! For example, there is both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so that the code path for existing uses is not disturbed when we are introducing experimental code paths that are optionally enabled with flags. We never envisioned that happening, but when it occur, we could do this because the KafkaReceiver was not exposed, only the Receiver interface was exposed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72758029 I dont get it, what's the complication with abstract classes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72760900 To put it another way, the type you return has to be public. If you return a public abstract class, what are you going to do when someone else subclasses it? Making it a final concrete class doesn't have that issue. On Feb 3, 2015 5:34 PM, Tathagata Das notificati...@github.com wrote: But of course there can be multiple implementations! For example, there is both KafkaReceiver and ReliableKafkaReceiver. The second was introduced so that the code path for existing uses is not disturbed when we are introducing experimental code paths that are optionally enabled with flags. We never envisioned that happening, but when it occur, we could do this because the KafkaReceiver was not exposed, only the Receiver interface was exposed. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72759859. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72754442 Like patrick said, I really don't see any reason not to just expose KafkaRDD. You can still hide its constructor without making a superflous abstract class, and you can still make another subclass of KafkaRDD later if you need to. Even if you don't want the static createRDD method to return a KafkaRDD, we can just take the with HasOffsetRanges off and people who care about getting to the offsets can cast it (they'll have to cast it for the stream case anyway) On Tue, Feb 3, 2015 at 4:30 PM, Tathagata Das notificati...@github.com wrote: @koeninger https://github.com/koeninger Thank you very much for all the changes. They are looking good. Unfortunately the issue with createRDD returning RDD[] with OffsetRange (i.e., the issue that @pwendell https://github.com/pwendell raised) could be a problem in the future in terms of binary compatibility. Basically, we have not used such things in the rest of Spark to keep things as Java-friendly and binary compatible as possible. Also in the generated Java doc this looks scary. So here is an alternate suggestion that I am trying to implement on top of your PR branch. How about this. We effectively combine KafkaRDD and HasOffsetRanges into a abstract class. abstract class KafkaRDD[T] (val offsetRanges: Array[OffsetRanges], sc: SparkContext) extends RDD[T](sc, Nil) private[kafka] class KafkaRDDImpl[K, V, KD, VD, R] extends KafkaRDD[R] { ... } KafkaUtils.createRDD(...simple one without messageHandler...): KafkaRDD[(K, V)] = { // return KafkaRDDImpl[K, V, KD, VD, (K, V)] } KafkaUtils.createRDD(...simple one WITH messageHandler...): KafkaRDD[R] = { // return KafkaRDDImpl[K, V, KD, VD, R] } Advantages - No binary compatibility issues - Easy to read from Java - KafkaRDD implementation and constructor all hidden as before - Can still extend KafkaRDD to expose more methods in future. What do you think? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72749763. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72756440 I spent some time talking to Patrick offline about this. If we expose the KafkaRDD as is (while keeping its constructor private), then the simplified createRDD would be ``` KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)] ``` Imagine how one would use it in Java. ``` KafkaRDDString, String, StringDecoder, StringDecode, Product2String, String rdd = KafkaUtils.createRDD() ``` That's not very Java friendly if you ask a Java developer. And we a huge fraction of the community as Java developers. Furthemore, we want to add Python API as well, and that also requires the interfaces to be Java-friendly. Here is the alternative (I think) with what I proposed. ``` KafkaRDDString, String rdd = KafkaUtils.createRDD() ``` Much simpler. Regarding casting, there are two cases, 1. casting RDD generated from createRDD - If we take off `HasOffsetRanges` (`KafkaUtils.createRDD` returns only RDD), then user have to cast. But if we return abstract class KafkaRDD, then no casting necessary. 2. casting RDD in `DStream.foreachRDD` - Casting is necessary either ways. But isnt it more intuitive to write `rdd.asInstanceOf[KafkaRDD]` than `rdd.asInstanceOf[HasOffsetRanges]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72757731 Just make the simplified createRDD return a static type of RDD[(K, V)], that's what I'm saying. You're already going to have to deal with those other type parameters in java if you want to call a more complex version of createRDD, because you have to know about the serializers and message handler. Make the more complex version return KafkaRDD. I agree that casting to KafkaRDD is better than casting to HasOffsetRanges. The abstract class is an unnecessary complication though. On Tue, Feb 3, 2015 at 5:11 PM, Tathagata Das notificati...@github.com wrote: I spent some time talking to Patrick offline about this. If we expose the KafkaRDD as is (while keeping its constructor private), then the simplified createRDD would be KafkaUtils.createRDD[K, V, KD, VD](): KafkaRDD[K, V, KD, VD, (K, V)] Imagine how one would use it in Java. KafkaRDDString, String, StringDecoder, StringDecode, Product2String, String rdd = KafkaUtils.createRDD() That's not very Java friendly if you ask a Java developer. And we a huge fraction of the community as Java developers. Furthemore, we want to add Python API as well, and that also requires the interfaces to be Java-friendly. Here is the alternative (I think) with what I proposed. KafkaRDDString, String rdd = KafkaUtils.createRDD() Much simpler. Regarding casting, there are two cases, 1. casting RDD generated from createRDD - If we take off HasOffsetRanges ( KafkaUtils.createRDD returns only RDD), then user have to cast. But if we return abstract class KafkaRDD, then no casting necessary. 2. casting RDD in DStream.foreachRDD - Casting is necessary either ways. But isnt it more intuitive to write rdd.asInstanceOf[KafkaRDD] than rdd.asInstanceOf[HasOffsetRanges]? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72756440. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72761134 In terms of number of classes, the abstract KafkaRDD is essentially replacing HasOffsetRanges. There is no need for this HasOffsetRanges trait that gets used only (assuming createRDD returns RDD[(K,V)]) inside ``` DStream.foreachRDD { rdd = val offsetRanges rdd.asInstaneOf[HasOffsetRanges].getOffsetRanges() } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72770451 Okay here are the two options. 1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses rdd.asInstanceOf[HasOffsetRanges] 2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and DStream.foreachRDD uses rdd.asInstanceOf[KafkaRDD[_]] I think I am okay with either one. Stepping back, my original concern was returning something that had no binary compatibility issues. Both solution suffices. Between these two, since you feel so strongly against (2), lets go with (1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72617088 I took a pass through the public API. I'm not very familiar with Kafka so it was somewhat slow going. However, some reactions: 1. We should try to tighten, simplify, and clarify the way we name and document everything in this public API. Most of the comments were about this. The most important IMO is coming up with a good name for the new streams returned and clearly explaining how they differ from the old Kafka stream. To me, the main differences seems to be in the way we (a) decide what goes into which batch and (b) actually ingest the data. I proposed javadoc and naming scheme that emphasizing that distinction. 2. Is there plans to add a Java and Python wrappers here next? Those are straightforward and it would be good to have them. Maybe in a follow on PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23990592 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { +val messageHandler = (mmd: MessageAndMetadata[K, V]) = (mmd.key, mmd.message) +val kc = new KafkaCluster(kafkaParams) +val topics = offsetRanges.map(o = TopicAndPartition(o.topic, o.partition)).toSet +val leaders = kc.findLeaders(topics).fold( + errs = throw new SparkException(errs.mkString(\n)), + ok = ok +) +new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ +K: ClassTag, +V: ClassTag, +U : Decoder[_]: ClassTag, +T : Decoder[_]: ClassTag, +R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: MessageAndMetadata[K, V] = R + ): RDD[R] with HasOffsetRanges = { + +val leaderMap = leaders + .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) + .toMap +new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + } + + /** + * This stream can guarantee that each message from Kafka is included in transformations --- End diff -- For the top level doc here isn't it something like: ``` Returns a Kafka stream that computes a specific offset range for each partition, then reads those offsets directly from Kafka without the use of receivers. Because this stream deterministically assigns offset ranges to specific batches, it can support exactly once semantics (as defined in the programming guide). Specifically, a streaming program experiences task failures or restarts from a job failure, output batches appear as if each record was ingested and processed exactly once. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72773257 To be clear, I'm ok with any solution that gives me access to what I need, which in this case are offsets. What's coming across as me feeling strongly about it is, I think, really because I'm not clear on what your principles with regard to the interface are ... well, that and frustration because it's been changed 3 times already :) For instance, why would you be willing to take on the fragile base class maintenance problem in exposing KafkaRDD as something that could be subclassed... but not ok with exposing the DStream (so that people could override the batch generation policy)? In the interests of moving this forward, if we're really just talking about changing KafkaUtil's use of RDD[..] with HasOffsetRanges to RDD[..] I can make that that change On Tue, Feb 3, 2015 at 7:02 PM, Tathagata Das notificati...@github.com wrote: Okay here are the two options. 1. createRDD returns RDD[(K,V)] or RDD[R], and DStream.foreachRDD uses rdd.asInstanceOf[HasOffsetRanges] 2. createRDD returns KafkaRDD[(K,V)] or KafkaRDD[R] and DStream.foreachRDD uses rdd.asInstanceOf[KafkaRDD[_]] I think I am okay with either one. Stepping back, my original concern was returning something that had no binary compatibility issues. Both solution suffices. Between these two, since you feel so strongly against (2), lets go with (1). â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/3798#issuecomment-72770451. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72775013 Yes, Cody, I agree that this has changed a number of time. But this is not unusual for adding such a significant, publicly visible API such as this. USually this level of API arguments occurs over design docs over a period of time. However, since we were short on time in this matter and not all implementation complexities were evident from the beginning, this had to be done over the code. I can thank you enough for your cooperation!!! So the two main changes are 1. sc.createNewStream (maybe rename DeterministicKafkaStream to Direct ) 2. createRDD returns RDD There are smaller suggestions and issues regarding documentation, indentation, etc. However, I am willing to address them in another PR. Let's merge this after you have made these two changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72775631 Hey man, I'd rather talk about the code anyway. I think there's just something I'm missing as far as your underlying assumptions about interfaces go :) Thanks for your help on this. Just made the createRDD change. Not clear on what createNewStream change you mean. Rename it to createStream, or something else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23971703 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) --- End diff -- Lets rename this configuration. Its very confusing to overload this configuration because the system does not behave in the same way. `receiver.maxRate` applies to receivers which is not used by this stream. In fact the mechanism used here is very specific to this input stream and applies to not other input stream. So lets rename it to something like `spark.streaming.kafka.maxRatePerPartition` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972368 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) --- End diff -- ``` def findLeaders( topicAndPartitions: Set[TopicAndPartition] // indent of 4 ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { // indent of 2 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3798#issuecomment-72575053 [Test build #26571 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26571/consoleFull) for PR 3798 at commit [`4354bce`](https://github.com/apache/spark/commit/4354bced65a7f37a51bde9081d8d19dc9b9316cd). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23967901 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { +val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) +// Either.fold would confuse @tailrec, do it manually +if (o.isLeft) { + val err = o.left.get.toString + if (retries = 0) { +throw new Exception(err) + } else { +log.error(err) +Thread.sleep(kc.config.refreshLeaderBackoffMs) +latestLeaderOffsets(retries - 1) + } +} else { + o.right.get +} + } + + protected def clamp( +leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { +maxMessagesPerPartition.map { mmp = + leaderOffsets.map { case (tp, lo) = +tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp,
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972007 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { +val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) +// Either.fold would confuse @tailrec, do it manually +if (o.isLeft) { + val err = o.left.get.toString + if (retries = 0) { +throw new Exception(err) + } else { +log.error(err) +Thread.sleep(kc.config.refreshLeaderBackoffMs) +latestLeaderOffsets(retries - 1) + } +} else { + o.right.get +} + } + + protected def clamp( --- End diff -- Please add some documentation on what this method does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so,
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23971976 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { --- End diff -- This function should be part of KafkaCluster. The getLatestLeaderOffset could take an optional parameter of retries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972917 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = --- End diff -- Can you break this up into multiple steps ``` val x = getPartitionMetadata(topcis).right val y = x.flatMap { tms: ... } ``` where , are semantically meaningful names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23974964 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { --- End diff -- All of the methods in kafka cluster are currently based on the idea of trying (at most) all of the brokers, then giving up and letting the caller establish an error handling policy. Sleeping and retrying may not in general be the correct error handling policy. I know it is for the input dstream's usage right here, but that doesn't make sense to bake into KafkaCluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972944 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = --- End diff -- By loop do you mean the findLeaders method? Yeah, it's returning the leaders as a map of TopicAndPartition to (host, port) The nesting directly maps to the nesting of the kafka api's return values... pretty much all of the nesting in this class is just grabbing stuff out of kafka data structures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972979 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23971788 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ + +/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.rdd.kafka.KafkaCluster} + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler function for translating each message into the desired type + * @param maxRetries maximum number of times in a row to retry getting leaders' offsets + */ +private[streaming] +class DeterministicKafkaInputDStream[ + K: ClassTag, + V: ClassTag, + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag, + R: ClassTag]( +@transient ssc_ : StreamingContext, +val kafkaParams: Map[String, String], +val fromOffsets: Map[TopicAndPartition, Long], +messageHandler: MessageAndMetadata[K, V] = R, +maxRetries: Int +) extends InputDStream[R](ssc_) with Logging { + + protected[streaming] override val checkpointData = +new DeterministicKafkaInputDStreamCheckpointData + + protected val kc = new KafkaCluster(kafkaParams) + + protected val maxMessagesPerPartition: Option[Long] = { +val ratePerSec = context.sparkContext.getConf.getInt(spark.streaming.receiver.maxRate, 0) +if (ratePerSec 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some((secsPerBatch * ratePerSec).toLong) +} else { + None +} + } + + protected var currentOffsets = fromOffsets + + @tailrec + protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { +val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) +// Either.fold would confuse @tailrec, do it manually +if (o.isLeft) { + val err = o.left.get.toString + if (retries = 0) { +throw new Exception(err) --- End diff -- Exception -- SparkException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23972523 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = --- End diff -- Its not obvious what this loop is doing. What is the result? leaders? The loop is also nested quite deep to understand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23973072 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23973375 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23973371 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23973357 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new
[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r23973351 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.util.control.NonFatal +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import java.util.Properties +import kafka.api._ +import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} + +/** + * Convenience methods for interacting with a Kafka cluster. + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + * configuration parameters/a. + * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { + import KafkaCluster.{Err, LeaderOffset} + + val seedBrokers: Array[(String, Int)] = +kafkaParams.get(metadata.broker.list) + .orElse(kafkaParams.get(bootstrap.servers)) + .getOrElse(throw new Exception(Must specify metadata.broker.list or bootstrap.servers)) + .split(,).map { hp = +val hpa = hp.split(:) +(hpa(0), hpa(1).toInt) + } + + // ConsumerConfig isn't serializable + @transient private var _config: ConsumerConfig = null + + def config: ConsumerConfig = this.synchronized { +if (_config == null) { + _config = KafkaCluster.consumerConfig(kafkaParams) +} +_config + } + + def connect(host: String, port: Int): SimpleConsumer = +new SimpleConsumer(host, port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = +findLeader(topic, partition).right.map(hp = connect(hp._1, hp._2)) + + // Metadata api + // scalastyle:off + // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + // scalastyle:on + + def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { +val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, + 0, config.clientId, Seq(topic)) +val errs = new Err +withBrokers(Random.shuffle(seedBrokers), errs) { consumer = + val resp: TopicMetadataResponse = consumer.send(req) + resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata = +tm.partitionsMetadata.find(_.partitionId == partition) + }.foreach { pm: PartitionMetadata = +pm.leader.foreach { leader = + return Right((leader.host, leader.port)) +} + } +} +Left(errs) + } + + def findLeaders( +topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { +val topics = topicAndPartitions.map(_.topic) +getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] = + val result = tms.flatMap { tm: TopicMetadata = +tm.partitionsMetadata.flatMap { pm: PartitionMetadata = + val tp = TopicAndPartition(tm.topic, pm.partitionId) + if (topicAndPartitions(tp)) { +pm.leader.map { l = + tp - (l.host - l.port) +} + } else { +None + } +} + }.toMap + if (result.keys.size == topicAndPartitions.size) { +Right(result) + } else { +val missing = topicAndPartitions.diff(result.keySet) +val err = new Err +err.append(new