This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 69dcea2  [SPARK-30992][DSTREAMS] Arrange scattered config of streaming 
module
69dcea2 is described below

commit 69dcea284961668b28d702e90e9068a3b80cbc8a
Author: beliefer <belie...@163.com>
AuthorDate: Tue Mar 10 18:04:09 2020 +0900

    [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module
    
    ### What changes were proposed in this pull request?
    I found a lot scattered config in `Streaming`.I think should arrange these 
config in unified position.
    
    ### Why are the changes needed?
    Arrange scattered config
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Exists UT
    
    Closes #27744 from beliefer/arrange-scattered-streaming-config.
    
    Authored-by: beliefer <belie...@163.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 8ee41f3576689f3d164131d1e6041bd347394364)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../org/apache/spark/streaming/StreamingConf.scala | 161 +++++++++++++++++++++
 .../apache/spark/streaming/StreamingContext.scala  |   3 +-
 .../apache/spark/streaming/dstream/DStream.scala   |   3 +-
 .../spark/streaming/receiver/BlockGenerator.scala  |   5 +-
 .../spark/streaming/receiver/RateLimiter.scala     |   5 +-
 .../spark/streaming/scheduler/JobGenerator.scala   |   6 +-
 .../spark/streaming/scheduler/JobScheduler.scala   |   2 +-
 .../spark/streaming/scheduler/RateController.scala |   3 +-
 .../streaming/scheduler/rate/RateEstimator.scala   |  11 +-
 .../ui/StreamingJobProgressListener.scala          |   4 +-
 .../org/apache/spark/streaming/util/StateMap.scala |   4 +-
 .../spark/streaming/util/WriteAheadLogUtils.scala  |  42 ++----
 .../streaming/ReceiverInputDStreamSuite.scala      |   3 +-
 13 files changed, 201 insertions(+), 51 deletions(-)

diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
new file mode 100644
index 0000000..71aefd6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+import 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.DELTA_CHAIN_LENGTH_THRESHOLD
+
+object StreamingConf {
+
+  private[streaming] val BACKPRESSURE_ENABLED =
+    ConfigBuilder("spark.streaming.backpressure.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[streaming] val RECEIVER_MAX_RATE =
+    ConfigBuilder("spark.streaming.receiver.maxRate")
+      .longConf
+      .createWithDefault(Long.MaxValue)
+
+  private[streaming] val BACKPRESSURE_INITIAL_RATE =
+    ConfigBuilder("spark.streaming.backpressure.initialRate")
+      .fallbackConf(RECEIVER_MAX_RATE)
+
+  private[streaming] val BLOCK_INTERVAL =
+    ConfigBuilder("spark.streaming.blockInterval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("200ms")
+
+  private[streaming] val RECEIVER_WAL_ENABLE_CONF_KEY =
+    ConfigBuilder("spark.streaming.receiver.writeAheadLog.enable")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[streaming] val RECEIVER_WAL_CLASS_CONF_KEY =
+    ConfigBuilder("spark.streaming.receiver.writeAheadLog.class")
+      .stringConf
+      .createOptional
+
+  private[streaming] val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+    ConfigBuilder("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs")
+      .intConf
+      .createWithDefault(60)
+
+  private[streaming] val RECEIVER_WAL_MAX_FAILURES_CONF_KEY =
+    ConfigBuilder("spark.streaming.receiver.writeAheadLog.maxFailures")
+      .intConf
+      .createWithDefault(3)
+
+  private[streaming] val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+    ConfigBuilder("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[streaming] val DRIVER_WAL_CLASS_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.class")
+      .stringConf
+      .createOptional
+
+  private[streaming] val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.rollingIntervalSecs")
+      .intConf
+      .createWithDefault(60)
+
+  private[streaming] val DRIVER_WAL_MAX_FAILURES_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.maxFailures")
+      .intConf
+      .createWithDefault(3)
+
+  private[streaming] val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.closeFileAfterWrite")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[streaming] val DRIVER_WAL_BATCHING_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.allowBatching")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[streaming] val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY =
+    ConfigBuilder("spark.streaming.driver.writeAheadLog.batchingTimeout")
+      .longConf
+      .createWithDefault(5000)
+
+  private[streaming] val STREAMING_UNPERSIST =
+    ConfigBuilder("spark.streaming.unpersist")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[streaming] val STOP_GRACEFULLY_ON_SHUTDOWN =
+    ConfigBuilder("spark.streaming.stopGracefullyOnShutdown")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[streaming] val UI_RETAINED_BATCHES =
+    ConfigBuilder("spark.streaming.ui.retainedBatches")
+      .intConf
+      .createWithDefault(1000)
+
+  private[streaming] val SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD =
+    ConfigBuilder("spark.streaming.sessionByKey.deltaChainThreshold")
+      .intConf
+      .createWithDefault(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+  private[streaming] val BACKPRESSURE_RATE_ESTIMATOR =
+    ConfigBuilder("spark.streaming.backpressure.rateEstimator")
+      .stringConf
+      .createWithDefault("pid")
+
+  private[streaming] val BACKPRESSURE_PID_PROPORTIONAL =
+    ConfigBuilder("spark.streaming.backpressure.pid.proportional")
+      .doubleConf
+      .createWithDefault(1.0)
+
+  private[streaming] val BACKPRESSURE_PID_INTEGRAL =
+    ConfigBuilder("spark.streaming.backpressure.pid.integral")
+      .doubleConf
+      .createWithDefault(0.2)
+
+  private[streaming] val BACKPRESSURE_PID_DERIVED =
+    ConfigBuilder("spark.streaming.backpressure.pid.derived")
+      .doubleConf
+      .createWithDefault(0.0)
+
+  private[streaming] val BACKPRESSURE_PID_MIN_RATE =
+    ConfigBuilder("spark.streaming.backpressure.pid.minRate")
+      .doubleConf
+      .createWithDefault(100)
+
+  private[streaming] val CONCURRENT_JOBS =
+    ConfigBuilder("spark.streaming.concurrentJobs")
+      .intConf
+      .createWithDefault(1)
+
+  private[streaming] val GRACEFUL_STOP_TIMEOUT =
+    ConfigBuilder("spark.streaming.gracefulStopTimeout")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  private[streaming] val MANUAL_CLOCK_JUMP =
+    ConfigBuilder("spark.streaming.manualClock.jump")
+      .longConf
+      .createWithDefault(0)
+
+}
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 440b653..e3459c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -41,6 +41,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.SerializationDebugger
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingConf.STOP_GRACEFULLY_ON_SHUTDOWN
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.Receiver
@@ -717,7 +718,7 @@ class StreamingContext private[streaming] (
   }
 
   private def stopOnShutdown(): Unit = {
-    val stopGracefully = 
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+    val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN)
     logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown 
hook")
     // Do not stop SparkContext, let its own shutdown hook stop it
     stop(stopSparkContext = false, stopGracefully = stopGracefully)
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 6c981b2..e037f26 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingConf.STREAMING_UNPERSIST
 import org.apache.spark.streaming.StreamingContext.rddToFileName
 import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.ui.{UIUtils => SparkUIUtils}
@@ -447,7 +448,7 @@ abstract class DStream[T: ClassTag] (
    * this to clear their own metadata along with the generated RDDs.
    */
   private[streaming] def clearMetadata(time: Time): Unit = {
-    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
+    val unpersistData = ssc.conf.get(STREAMING_UNPERSIST)
     val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
     logDebug("Clearing references to old RDDs: [" +
       oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 2533c53..d641f55 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.StreamingConf.BLOCK_INTERVAL
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, SystemClock}
 
@@ -100,8 +101,8 @@ private[streaming] class BlockGenerator(
   }
   import GeneratorState._
 
-  private val blockIntervalMs = 
conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
-  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a 
positive value")
+  private val blockIntervalMs = conf.get(BLOCK_INTERVAL)
+  require(blockIntervalMs > 0, s"'${BLOCK_INTERVAL.key}' should be a positive 
value")
 
   private val blockIntervalTimer =
     new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, 
"BlockGenerator")
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index c620074..f77ca3e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.{RateLimiter => 
GuavaRateLimiter}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.StreamingConf.{BACKPRESSURE_INITIAL_RATE, 
RECEIVER_MAX_RATE}
 
 /**
  * Provides waitToPush() method to limit the rate at which receivers consume 
data.
@@ -37,7 +38,7 @@ import org.apache.spark.internal.Logging
 private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
 
   // treated as an upper limit
-  private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", 
Long.MaxValue)
+  private val maxRateLimit = conf.get(RECEIVER_MAX_RATE)
   private lazy val rateLimiter = 
GuavaRateLimiter.create(getInitialRateLimit().toDouble)
 
   def waitToPush(): Unit = {
@@ -68,6 +69,6 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) 
extends Logging {
    * Get the initial rateLimit to initial rateLimiter
    */
   private def getInitialRateLimit(): Long = {
-    math.min(conf.getLong("spark.streaming.backpressure.initialRate", 
maxRateLimit), maxRateLimit)
+    math.min(conf.get(BACKPRESSURE_INITIAL_RATE), maxRateLimit)
   }
 }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 7e8449e..8008a5c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, 
StreamingConf, Time}
 import org.apache.spark.streaming.api.python.PythonDStream
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
@@ -115,7 +115,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
       logInfo("Stopping JobGenerator gracefully")
       val timeWhenStopStarted = System.nanoTime()
       val stopTimeoutMs = conf.getTimeAsMs(
-        "spark.streaming.gracefulStopTimeout", s"${10 * 
ssc.graph.batchDuration.milliseconds}ms")
+        StreamingConf.GRACEFUL_STOP_TIMEOUT.key, s"${10 * 
ssc.graph.batchDuration.milliseconds}ms")
       val pollTime = 100
 
       // To prevent graceful stop to get stuck permanently
@@ -206,7 +206,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     // or if the property is defined set it to that time
     if (clock.isInstanceOf[ManualClock]) {
       val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
+      val jumpTime = ssc.sc.conf.get(StreamingConf.MANUAL_CLOCK_JUMP)
       clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
     }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 7eea57c..a6d8dcc 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -47,7 +47,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
   // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due 
to Java 7/8 diff
   // https://gist.github.com/AlainODea/1375759b8720a3f9f094
   private val jobSets: java.util.Map[Time, JobSet] = new 
ConcurrentHashMap[Time, JobSet]
-  private val numConcurrentJobs = 
ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
+  private val numConcurrentJobs = ssc.conf.get(StreamingConf.CONCURRENT_JOBS)
   private val jobExecutor =
     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, 
"streaming-job-executor")
   private[streaming] val jobGenerator = new JobGenerator(this)
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
index 7774e85..88f191f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.concurrent.{ExecutionContext, Future}
 
 import org.apache.spark.SparkConf
+import org.apache.spark.streaming.StreamingConf.BACKPRESSURE_ENABLED
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -86,5 +87,5 @@ private[streaming] abstract class RateController(val 
streamUID: Int, rateEstimat
 
 object RateController {
   def isBackPressureEnabled(conf: SparkConf): Boolean =
-    conf.getBoolean("spark.streaming.backpressure.enabled", false)
+    conf.get(BACKPRESSURE_ENABLED)
 }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index e4b9dff..7f4d0f2 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler.rate
 
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingConf._
 
 /**
  * A component that estimates the rate at which an `InputDStream` should ingest
@@ -57,12 +58,12 @@ object RateEstimator {
    * @throws IllegalArgumentException if the configured RateEstimator is not 
`pid`.
    */
   def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
-    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
+    conf.get(BACKPRESSURE_RATE_ESTIMATOR) match {
       case "pid" =>
-        val proportional = 
conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
-        val integral = 
conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
-        val derived = 
conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
-        val minRate = 
conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
+        val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL)
+        val integral = conf.get(BACKPRESSURE_PID_INTEGRAL)
+        val derived = conf.get(BACKPRESSURE_PID_DERIVED)
+        val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE)
         new PIDRateEstimator(batchInterval.milliseconds, proportional, 
integral, derived, minRate)
 
       case estimator =>
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index de73762..da351ec 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, Queue}
 
 import org.apache.spark.scheduler._
-import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.{StreamingConf, StreamingContext, Time}
 import org.apache.spark.streaming.scheduler._
 
 private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -33,7 +33,7 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
   private val waitingBatchUIData = new HashMap[Time, BatchUIData]
   private val runningBatchUIData = new HashMap[Time, BatchUIData]
   private val completedBatchUIData = new Queue[BatchUIData]
-  private val batchUIDataLimit = 
ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
+  private val batchUIDataLimit = 
ssc.conf.get(StreamingConf.UI_RETAINED_BATCHES)
   private var totalCompletedBatches = 0L
   private var totalReceivedRecords = 0L
   private var totalProcessedRecords = 0L
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
index 618c036..4224cef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.io.{Input, Output}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.serializer.{KryoInputObjectInputBridge, 
KryoOutputObjectOutputBridge}
+import 
org.apache.spark.streaming.StreamingConf.SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD
 import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
 import org.apache.spark.util.collection.OpenHashMap
 
@@ -61,8 +62,7 @@ private[streaming] object StateMap {
   def empty[K, S]: StateMap[K, S] = new EmptyStateMap[K, S]
 
   def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
-    val deltaChainThreshold = 
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
-      DELTA_CHAIN_LENGTH_THRESHOLD)
+    val deltaChainThreshold = conf.get(SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD)
     new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
   }
 }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index b0a4c98..224e782 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -23,52 +23,34 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.StreamingConf._
 import org.apache.spark.util.Utils
 
 /** A helper class with utility functions related to the WriteAheadLog 
interface */
 private[streaming] object WriteAheadLogUtils extends Logging {
-  val RECEIVER_WAL_ENABLE_CONF_KEY = 
"spark.streaming.receiver.writeAheadLog.enable"
-  val RECEIVER_WAL_CLASS_CONF_KEY = 
"spark.streaming.receiver.writeAheadLog.class"
-  val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
-    "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
-  val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = 
"spark.streaming.receiver.writeAheadLog.maxFailures"
-  val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
-    "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"
-
-  val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
-  val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
-    "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
-  val DRIVER_WAL_MAX_FAILURES_CONF_KEY = 
"spark.streaming.driver.writeAheadLog.maxFailures"
-  val DRIVER_WAL_BATCHING_CONF_KEY = 
"spark.streaming.driver.writeAheadLog.allowBatching"
-  val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = 
"spark.streaming.driver.writeAheadLog.batchingTimeout"
-  val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
-    "spark.streaming.driver.writeAheadLog.closeFileAfterWrite"
-
-  val DEFAULT_ROLLING_INTERVAL_SECS = 60
-  val DEFAULT_MAX_FAILURES = 3
 
   def enableReceiverLog(conf: SparkConf): Boolean = {
-    conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
+    conf.get(RECEIVER_WAL_ENABLE_CONF_KEY)
   }
 
   def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = {
     if (isDriver) {
-      conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, 
DEFAULT_ROLLING_INTERVAL_SECS)
+      conf.get(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY)
     } else {
-      conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, 
DEFAULT_ROLLING_INTERVAL_SECS)
+      conf.get(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY)
     }
   }
 
   def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = {
     if (isDriver) {
-      conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+      conf.get(DRIVER_WAL_MAX_FAILURES_CONF_KEY)
     } else {
-      conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+      conf.get(RECEIVER_WAL_MAX_FAILURES_CONF_KEY)
     }
   }
 
   def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
-    isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
true)
+    isDriver && conf.get(DRIVER_WAL_BATCHING_CONF_KEY)
   }
 
   /**
@@ -76,14 +58,14 @@ private[streaming] object WriteAheadLogUtils extends 
Logging {
    * before we fail the write attempt to unblock receivers.
    */
   def getBatchingTimeout(conf: SparkConf): Long = {
-    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
+    conf.get(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY)
   }
 
   def shouldCloseFileAfterWrite(conf: SparkConf, isDriver: Boolean): Boolean = 
{
     if (isDriver) {
-      conf.getBoolean(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = 
false)
+      conf.get(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY)
     } else {
-      conf.getBoolean(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = 
false)
+      conf.get(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY)
     }
   }
 
@@ -126,9 +108,9 @@ private[streaming] object WriteAheadLogUtils extends 
Logging {
     ): WriteAheadLog = {
 
     val classNameOption = if (isDriver) {
-      sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)
+      sparkConf.get(DRIVER_WAL_CLASS_CONF_KEY)
     } else {
-      sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)
+      sparkConf.get(RECEIVER_WAL_CLASS_CONF_KEY)
     }
     val wal = classNameOption.map { className =>
       try {
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
index 5e2ce25..6b33220 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, 
Receiver, WriteAheadLogBasedStoreResult}
@@ -117,7 +118,7 @@ class ReceiverInputDStreamSuite
   private def runTest(enableWAL: Boolean, body: ReceiverInputDStream[_] => 
Unit): Unit = {
     val conf = new SparkConf()
     conf.setMaster("local[4]").setAppName("ReceiverInputDStreamSuite")
-    conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY, 
enableWAL.toString)
+    conf.set(StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY.key, 
enableWAL.toString)
     require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL)
     ssc = new StreamingContext(conf, Seconds(1))
     val receiverStream = new ReceiverInputDStream[Int](ssc) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to