This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 53e31e2  [SPARK-27399][STREAMING][KAFKA] Arrange scattered config and 
reduce hardcode for kafka 10.
53e31e2 is described below

commit 53e31e2ca1f0aec1d19f1528cdd1a5317a4e0a32
Author: gengjiaan <gengji...@360.cn>
AuthorDate: Sat Apr 6 18:05:15 2019 -0500

    [SPARK-27399][STREAMING][KAFKA] Arrange scattered config and reduce 
hardcode for kafka 10.
    
    ## What changes were proposed in this pull request?
    
    I found a lot scattered config in `Kafka` streaming.I think should arrange 
these config in unified position.
    
    ## How was this patch tested?
    
    No need UT.
    
    Closes #24267 from beliefer/arrange-scattered-streaming-kafka-config.
    
    Authored-by: gengjiaan <gengji...@360.cn>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../kafka010/DirectKafkaInputDStream.scala         |  3 +-
 .../apache/spark/streaming/kafka010/KafkaRDD.scala | 16 +++-----
 .../streaming/kafka010/PerPartitionConfig.scala    |  4 +-
 .../apache/spark/streaming/kafka010/package.scala  | 47 +++++++++++++++++++++-
 4 files changed, 55 insertions(+), 15 deletions(-)

diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 224f41a..88d6d0e 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -229,8 +229,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
       val fo = currentOffsets(tp)
       OffsetRange(tp.topic, tp.partition, fo, uo)
     }
-    val useConsumerCache = 
context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
-      true)
+    val useConsumerCache = context.conf.get(CONSUMER_CACHE_ENABLED)
     val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, 
offsetRanges.toArray,
       getPreferredHosts, useConsumerCache)
 
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 4513dca..bd2e7e1 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
@@ -64,16 +65,11 @@ private[spark] class KafkaRDD[K, V](
       " must be set to false for executor kafka params, else offsets may 
commit before processing")
 
   // TODO is it necessary to have separate configs for initial poll time vs 
ongoing poll time?
-  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms",
-    conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
-  private val cacheInitialCapacity =
-    conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
-  private val cacheMaxCapacity =
-    conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
-  private val cacheLoadFactor =
-    conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
-  private val compacted =
-    conf.getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false)
+  private val pollTimeout = 
conf.get(CONSUMER_POLL_MS).getOrElse(conf.get(NETWORK_TIMEOUT) * 1000L)
+  private val cacheInitialCapacity = conf.get(CONSUMER_CACHE_INITIAL_CAPACITY)
+  private val cacheMaxCapacity = conf.get(CONSUMER_CACHE_MAX_CAPACITY)
+  private val cacheLoadFactor = conf.get(CONSUMER_CACHE_LOAD_FACTOR).toFloat
+  private val compacted = conf.get(ALLOW_NON_CONSECUTIVE_OFFSETS)
 
   override def persist(newLevel: StorageLevel): this.type = {
     logError("Kafka ConsumerRecord is not serializable. " +
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
index 77193e2..b261500 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
@@ -39,8 +39,8 @@ abstract class PerPartitionConfig extends Serializable {
  */
 private class DefaultPerPartitionConfig(conf: SparkConf)
     extends PerPartitionConfig {
-  val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0)
-  val minRate = conf.getLong("spark.streaming.kafka.minRatePerPartition", 1)
+  val maxRate = conf.get(MAX_RATE_PER_PARTITION)
+  val minRate = conf.get(MIN_RATE_PER_PARTITION)
 
   def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
   override def minRatePerPartition(topicPartition: TopicPartition): Long = 
minRate
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 09db6d6..3d2921f 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -17,7 +17,52 @@
 
 package org.apache.spark.streaming
 
+import org.apache.spark.internal.config.ConfigBuilder
+
 /**
  * Spark Integration for Kafka 0.10
  */
-package object kafka010 //scalastyle:ignore
+package object kafka010 { //scalastyle:ignore
+
+  private[spark] val CONSUMER_CACHE_ENABLED =
+    ConfigBuilder("spark.streaming.kafka.consumer.cache.enabled")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val CONSUMER_POLL_MS =
+    ConfigBuilder("spark.streaming.kafka.consumer.poll.ms")
+    .longConf
+    .createOptional
+
+  private[spark] val CONSUMER_CACHE_INITIAL_CAPACITY =
+    ConfigBuilder("spark.streaming.kafka.consumer.cache.initialCapacity")
+    .intConf
+    .createWithDefault(16)
+
+  private[spark] val CONSUMER_CACHE_MAX_CAPACITY =
+    ConfigBuilder("spark.streaming.kafka.consumer.cache.maxCapacity")
+    .intConf
+    .createWithDefault(64)
+
+  private[spark] val CONSUMER_CACHE_LOAD_FACTOR =
+    ConfigBuilder("spark.streaming.kafka.consumer.cache.loadFactor")
+    .doubleConf
+    .createWithDefault(0.75)
+
+  private[spark] val MAX_RATE_PER_PARTITION =
+    ConfigBuilder("spark.streaming.kafka.maxRatePerPartition")
+    .longConf
+    .createWithDefault(0)
+
+  private[spark] val MIN_RATE_PER_PARTITION =
+    ConfigBuilder("spark.streaming.kafka.minRatePerPartition")
+    .longConf
+    .createWithDefault(1)
+
+  private[spark] val ALLOW_NON_CONSECUTIVE_OFFSETS =
+    ConfigBuilder("spark.streaming.kafka.allowNonConsecutiveOffsets")
+    .booleanConf
+    .createWithDefault(false)
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to