Repository: spark Updated Branches: refs/heads/master bc8890b35 -> 140ddef37
[SPARK-10963][STREAMING][KAFKA] make KafkaCluster public Author: cody koeninger <c...@koeninger.org> Closes #9007 from koeninger/SPARK-10963. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/140ddef3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/140ddef3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/140ddef3 Branch: refs/heads/master Commit: 140ddef373680cb08a3948a883b172dc80814170 Parents: bc8890b Author: cody koeninger <c...@koeninger.org> Authored: Sun Feb 7 12:52:00 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Sun Feb 7 12:52:00 2016 +0000 ---------------------------------------------------------------------- .../spark/streaming/kafka/KafkaCluster.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/140ddef3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index d7885d7..8a66621 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -29,15 +29,19 @@ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, To import kafka.consumer.{ConsumerConfig, SimpleConsumer} import org.apache.spark.SparkException +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Convenience methods for interacting with a Kafka cluster. + * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol"> + * A Guide To The Kafka Protocol</a> for more details on individual api calls. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), * NOT zookeeper servers, specified in host1:port1,host2:port2 form */ -private[spark] +@DeveloperApi class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig} @@ -227,7 +231,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { // this 0 here indicates api version, in this case the original ZK backed api. private def defaultConsumerApiVersion: Short = 0 - /** Requires Kafka >= 0.8.1.1 */ + /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */ def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition] @@ -246,7 +250,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { } } - /** Requires Kafka >= 0.8.1.1 */ + /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */ def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition] @@ -283,7 +287,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { Left(errs) } - /** Requires Kafka >= 0.8.1.1 */ + /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */ def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long] @@ -301,7 +305,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } - /** Requires Kafka >= 0.8.1.1 */ + /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */ def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata] @@ -359,7 +363,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { } } -private[spark] +@DeveloperApi object KafkaCluster { type Err = ArrayBuffer[Throwable] @@ -371,7 +375,6 @@ object KafkaCluster { ) } - private[spark] case class LeaderOffset(host: String, port: Int, offset: Long) /** @@ -379,7 +382,6 @@ object KafkaCluster { * Simple consumers connect directly to brokers, but need many of the same configs. * This subclass won't warn about missing ZK params, or presence of broker params. */ - private[spark] class SimpleConsumerConfig private(brokers: String, originalProps: Properties) extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => @@ -391,7 +393,6 @@ object KafkaCluster { } } - private[spark] object SimpleConsumerConfig { /** * Make a consumer config without requiring group.id or zookeeper.connect, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org