[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21504 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194950151 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.language.reflectiveCalls + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.StreamingQueryListener._ + + +class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + + override protected def sparkConf: SparkConf = +super.sparkConf.set("spark.sql.streamingQueryListeners", --- End diff -- seems a mistake here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194817758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala --- @@ -96,6 +96,14 @@ object StaticSQLConf { .toSequence .createOptional + val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streamingQueryListeners") --- End diff -- ok makes sense. renamed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194688537 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala --- @@ -96,6 +96,14 @@ object StaticSQLConf { .toSequence .createOptional + val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streamingQueryListeners") --- End diff -- maybe -> `spark.sql.streaming.streamingQueryListeners ` for consistency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194101270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") + }) +} + } catch { +case e: Exception => + throw new SparkException(s"Exception when registering StreamingQueryListener", e) --- End diff -- Addressed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194100709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Since its only once and provides information to user I guess info is fine. Similar pattern here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193945288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Either debug or info is fine for me, since it would add just couple of log lines only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193936818 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") + }) +} + } catch { +case e: Exception => + throw new SparkException(s"Exception when registering StreamingQueryListener", e) --- End diff -- nit: `s` seems not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193936698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- I would do this at debug level .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193923588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => +Utils.loadExtensions(classOf[StreamingQueryListener], classNames, + sparkSession.sparkContext.conf).foreach(addListener) + } + --- End diff -- Good point. Addressed, please check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user merlintang commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193911087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => +Utils.loadExtensions(classOf[StreamingQueryListener], classNames, + sparkSession.sparkContext.conf).foreach(addListener) + } + --- End diff -- two comments here: 1. we need to log the registration here 2. we need to use try catch for this, it is possible that register fail. this would break the job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org