Repository: spark
Updated Branches:
  refs/heads/master bc8890b35 -> 140ddef37


[SPARK-10963][STREAMING][KAFKA] make KafkaCluster public

Author: cody koeninger <c...@koeninger.org>

Closes #9007 from koeninger/SPARK-10963.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/140ddef3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/140ddef3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/140ddef3

Branch: refs/heads/master
Commit: 140ddef373680cb08a3948a883b172dc80814170
Parents: bc8890b
Author: cody koeninger <c...@koeninger.org>
Authored: Sun Feb 7 12:52:00 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Feb 7 12:52:00 2016 +0000

----------------------------------------------------------------------
 .../spark/streaming/kafka/KafkaCluster.scala     | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/140ddef3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
index d7885d7..8a66621 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -29,15 +29,19 @@ import kafka.common.{ErrorMapping, OffsetAndMetadata, 
OffsetMetadataAndError, To
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 
 import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
 
 /**
+ * :: DeveloperApi ::
  * Convenience methods for interacting with a Kafka cluster.
+ * See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol";>
+ * A Guide To The Kafka Protocol</a> for more details on individual api calls.
  * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
  * configuration parameters</a>.
  *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
  */
-private[spark]
+@DeveloperApi
 class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
   import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
 
@@ -227,7 +231,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
   // this 0 here indicates api version, in this case the original ZK backed 
api.
   private def defaultConsumerApiVersion: Short = 0
 
-  /** Requires Kafka >= 0.8.1.1 */
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
   def getConsumerOffsets(
       groupId: String,
       topicAndPartitions: Set[TopicAndPartition]
@@ -246,7 +250,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
     }
   }
 
-  /** Requires Kafka >= 0.8.1.1 */
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
   def getConsumerOffsetMetadata(
       groupId: String,
       topicAndPartitions: Set[TopicAndPartition]
@@ -283,7 +287,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
     Left(errs)
   }
 
-  /** Requires Kafka >= 0.8.1.1 */
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
   def setConsumerOffsets(
       groupId: String,
       offsets: Map[TopicAndPartition, Long]
@@ -301,7 +305,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
     setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
   }
 
-  /** Requires Kafka >= 0.8.1.1 */
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
   def setConsumerOffsetMetadata(
       groupId: String,
       metadata: Map[TopicAndPartition, OffsetAndMetadata]
@@ -359,7 +363,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) 
extends Serializable {
   }
 }
 
-private[spark]
+@DeveloperApi
 object KafkaCluster {
   type Err = ArrayBuffer[Throwable]
 
@@ -371,7 +375,6 @@ object KafkaCluster {
     )
   }
 
-  private[spark]
   case class LeaderOffset(host: String, port: Int, offset: Long)
 
   /**
@@ -379,7 +382,6 @@ object KafkaCluster {
    * Simple consumers connect directly to brokers, but need many of the same 
configs.
    * This subclass won't warn about missing ZK params, or presence of broker 
params.
    */
-  private[spark]
   class SimpleConsumerConfig private(brokers: String, originalProps: 
Properties)
       extends ConsumerConfig(originalProps) {
     val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
@@ -391,7 +393,6 @@ object KafkaCluster {
     }
   }
 
-  private[spark]
   object SimpleConsumerConfig {
     /**
      * Make a consumer config without requiring group.id or zookeeper.connect,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to