This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f9b8637 [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf f9b8637 is described below commit f9b86370cb04b72a4f00cbd4d60873960aa2792c Author: Yuanjian Li <xyliyuanj...@gmail.com> AuthorDate: Sun Feb 2 23:37:13 2020 -0800 [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf ### What changes were proposed in this pull request? Put the configs below needed by Structured Streaming UI into StaticSQLConf: - spark.sql.streaming.ui.enabled - spark.sql.streaming.ui.retainedProgressUpdates - spark.sql.streaming.ui.retainedQueries ### Why are the changes needed? Make all SS UI configs consistent with other similar configs in usage and naming. ### Does this PR introduce any user-facing change? Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`. ### How was this patch tested? Existing UT. Closes #27425 from xuanyuanking/SPARK-29543-follow. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> (cherry picked from commit a4912cee615314e9578e6ab4eae25f147feacbd5) Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ---------------- .../apache/spark/sql/internal/StaticSQLConf.scala | 20 ++++++++++++++++++++ .../org/apache/spark/sql/internal/SharedState.scala | 15 ++++++++------- .../streaming/ui/StreamingQueryStatusListener.scala | 10 ++++++---- .../spark/sql/streaming/ui/StreamingQueryTab.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 4 ++-- 6 files changed, 37 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 04572c3..3ad3416 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1150,18 +1150,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val STREAMING_UI_ENABLED = - buildConf("spark.sql.streaming.ui.enabled") - .doc("Whether to run the structured streaming UI for the Spark application.") - .booleanConf - .createWithDefault(true) - - val STREAMING_UI_INACTIVE_QUERY_RETENTION = - buildConf("spark.sql.streaming.ui.numInactiveQueries") - .doc("The number of inactive queries to retain for structured streaming ui.") - .intConf - .createWithDefault(100) - val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.") @@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) - def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED) - - def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION) - def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 66ac9ddb..6bc7522 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -176,4 +176,24 @@ object StaticSQLConf { .internal() .booleanConf .createWithDefault(true) + + val STREAMING_UI_ENABLED = + buildStaticConf("spark.sql.streaming.ui.enabled") + .doc("Whether to run the Structured Streaming Web UI for the Spark application when the " + + "Spark Web UI is enabled.") + .booleanConf + .createWithDefault(true) + + val STREAMING_UI_RETAINED_PROGRESS_UPDATES = + buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates") + .doc("The number of progress updates to retain for a streaming query for Structured " + + "Streaming UI.") + .intConf + .createWithDefault(100) + + val STREAMING_UI_RETAINED_QUERIES = + buildStaticConf("spark.sql.streaming.ui.retainedQueries") + .doc("The number of inactive queries to retain for Structured Streaming UI.") + .intConf + .createWithDefault(100) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index fefd72d..5347264 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -145,13 +145,14 @@ private[sql] class SharedState( * data to show. */ lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = { - val sqlConf = SQLConf.get - if (sqlConf.isStreamingUIEnabled) { - val statusListener = new StreamingQueryStatusListener(sqlConf) - sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _)) - Some(statusListener) - } else { - None + sparkContext.ui.flatMap { ui => + if (conf.get(STREAMING_UI_ENABLED)) { + val statusListener = new StreamingQueryStatusListener(conf) + new StreamingQueryTab(statusListener, ui) + Some(statusListener) + } else { + None + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index db085db..9181511 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -24,8 +24,9 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} /** @@ -33,7 +34,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro * UI data for both active and inactive query. * TODO: Add support for history server. */ -private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener { +private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener { private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @@ -45,8 +46,9 @@ private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends Stream private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]() private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]() - private val streamingProgressRetention = sqlConf.streamingProgressRetention - private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention + private val streamingProgressRetention = + conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES) + private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES) override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { activeQueryStatus.putIfAbsent(event.runId, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala index f909cfd..bb097ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala @@ -34,6 +34,6 @@ private[sql] class StreamingQueryTab( parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql") } -object StreamingQueryTab { +private[sql] object StreamingQueryTab { private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index bd74ed3..adbb501 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming class StreamingQueryStatusListenerSuite extends StreamTest { test("onQueryStarted, onQueryProgress, onQueryTerminated") { - val listener = new StreamingQueryStatusListener(spark.sqlContext.conf) + val listener = new StreamingQueryStatusListener(spark.sparkContext.conf) // hanlde query started event val id = UUID.randomUUID() @@ -73,7 +73,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("same query start multiple times") { - val listener = new StreamingQueryStatusListener(spark.sqlContext.conf) + val listener = new StreamingQueryStatusListener(spark.sparkContext.conf) // handle first time start val id = UUID.randomUUID() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org