This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8329e7d  [SPARK-27649][SS] Unify the way use 'spark.network.timeout'
8329e7d is described below

commit 8329e7debdaf6db9f3a52094bbc5dc4c1e2771ea
Author: gengjiaan <gengji...@360.cn>
AuthorDate: Wed May 8 11:43:03 2019 +0800

    [SPARK-27649][SS] Unify the way use 'spark.network.timeout'
    
    ## What changes were proposed in this pull request?
    
    For historical reasons, structured streaming still has some old way of use
    `spark.network.timeout`
    , even though
    `org.apache.spark.internal.config.Network.NETWORK_TIMEOUT`
    is now available.
    
    ## How was this patch tested?
    Exists UT.
    
    Closes #24545 from beliefer/unify-spark-network-timeout.
    
    Authored-by: gengjiaan <gengji...@360.cn>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala  | 3 ++-
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala | 5 ++---
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala   | 3 ++-
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 6972f39..76c7b5d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -64,7 +65,7 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val pollTimeoutMs = options.getLong(
     "kafkaConsumer.pollTimeoutMs",
-    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 
1000L)
+    SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L)
 
   private val maxOffsetsPerTrigger = 
Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 9effa29..48cc089 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -22,6 +22,7 @@ import java.util.UUID
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -47,9 +48,7 @@ private[kafka010] class KafkaRelation(
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
     "kafkaConsumer.pollTimeoutMs",
-    (sqlContext.sparkContext.conf.getTimeAsSeconds(
-      "spark.network.timeout",
-      "120s") * 1000L).toString
+    (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString
   ).toLong
 
   override def schema: StructType = KafkaOffsetReader.kafkaSchema
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 82d746e..fa93e8f 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -85,7 +86,7 @@ private[kafka010] class KafkaSource(
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
     "kafkaConsumer.pollTimeoutMs",
-    (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 
1000L).toString
+    (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString
   ).toLong
 
   private val maxOffsetsPerTrigger =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to