This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8329e7d [SPARK-27649][SS] Unify the way use 'spark.network.timeout' 8329e7d is described below commit 8329e7debdaf6db9f3a52094bbc5dc4c1e2771ea Author: gengjiaan <gengji...@360.cn> AuthorDate: Wed May 8 11:43:03 2019 +0800 [SPARK-27649][SS] Unify the way use 'spark.network.timeout' ## What changes were proposed in this pull request? For historical reasons, structured streaming still has some old way of use `spark.network.timeout` , even though `org.apache.spark.internal.config.Network.NETWORK_TIMEOUT` is now available. ## How was this patch tested? Exists UT. Closes #24545 from beliefer/unify-spark-network-timeout. Authored-by: gengjiaan <gengji...@360.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala | 3 ++- .../src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala | 5 ++--- .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 6972f39..76c7b5d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -64,7 +65,7 @@ private[kafka010] class KafkaMicroBatchStream( private val pollTimeoutMs = options.getLong( "kafkaConsumer.pollTimeoutMs", - SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 9effa29..48cc089 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow @@ -47,9 +48,7 @@ private[kafka010] class KafkaRelation( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sqlContext.sparkContext.conf.getTimeAsSeconds( - "spark.network.timeout", - "120s") * 1000L).toString + (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong override def schema: StructType = KafkaOffsetReader.kafkaSchema diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 82d746e..fa93e8f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +86,7 @@ private[kafka010] class KafkaSource( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString + (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong private val maxOffsetsPerTrigger = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org