[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92709390 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala --- @@ -137,12 +174,12 @@ object StreamingQueryStatusAndProgressSuite { name = "myName", timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, -durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, -eventTime = Map( +durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava), --- End diff -- Changed them to use the same approach as `ProgressReporter` to create Java Map. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92702425 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala --- @@ -137,12 +146,13 @@ object StreamingQueryStatusAndProgressSuite { name = "myName", timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, -durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, -eventTime = Map( - "max" -> "2016-12-05T20:54:20.827Z", - "min" -> "2016-12-05T20:54:20.827Z", - "avg" -> "2016-12-05T20:54:20.827Z", - "watermark" -> "2016-12-05T20:54:20.827Z").asJava, +durationMs = Collections.singletonMap("total", 0L), --- End diff -- Wait ... does this mean, that the maps we are generating in the StreamingQuery is NOT serializable?? Because we are probably doing the same conversion in ProgressReporter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92690628 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala --- @@ -137,12 +146,13 @@ object StreamingQueryStatusAndProgressSuite { name = "myName", timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, -durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, -eventTime = Map( - "max" -> "2016-12-05T20:54:20.827Z", - "min" -> "2016-12-05T20:54:20.827Z", - "avg" -> "2016-12-05T20:54:20.827Z", - "watermark" -> "2016-12-05T20:54:20.827Z").asJava, +durationMs = Collections.singletonMap("total", 0L), --- End diff -- I changed these two maps to pure Java maps, as the map created by `asJava` is not serializable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92687106 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -439,6 +440,37 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { } } + test("progress classes should be Serializable") { --- End diff -- This should be in the StreamingStatusAndPRogressSuite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92687025 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -300,6 +302,48 @@ class StreamSuite extends StreamTest { q.stop() } } + + test("StreamingQuery should be Serializable but cannot be used in executors") { --- End diff -- Why is this in StreamSuite. This is better to be in StreamingQuerySuite as its related to the StreamingQuery. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92530950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -83,7 +83,7 @@ class StreamingQueryProgress private[sql]( val currentWatermark: Long, val stateOperators: Array[StateOperatorProgress], val sources: Array[SourceProgress], - val sink: SinkProgress) { + val sink: SinkProgress) extends Serializable { --- End diff -- There should be new tests for verifying serializability --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92530788 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryWrapper.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamingQueryProgress, StreamingQueryStatus} + +/** + * Wrap non-serializable StreamExecution to make the query serializable as it's easy to for it to + * get captured with normal usage. It's safe to capture the query but not use it in executors. + * However, if the user tries to call its methods, it will throw `IllegalStateException`. + */ +class StreamingQueryWrapper( +@transient val streamingQuery: StreamExecution) extends StreamingQuery with Serializable { --- End diff -- streamingQuery -> @transient private val _streamExecution def streamExecution: StreamExecution = { if (_streamExecution == null) { throw new IllegalStateException("StreamingQuery cannot be used in executors") } _streamExecution } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92530604 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala --- @@ -24,7 +24,7 @@ import scala.collection.{immutable, GenTraversableOnce} */ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) - extends scala.collection.immutable.Map[Source, Offset] { + extends scala.collection.immutable.Map[Source, Offset] with Serializable { --- End diff -- I dont think this is needed any more. This is purely internal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92481457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils @Experimental class StateOperatorProgress private[sql]( val numRowsTotal: Long, -val numRowsUpdated: Long) { +val numRowsUpdated: Long) extends Serializable { --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92279982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils @Experimental class StateOperatorProgress private[sql]( val numRowsTotal: Long, -val numRowsUpdated: Long) { +val numRowsUpdated: Long) extends Serializable { --- End diff -- Add `Serializable` to these progress classes so that the user can collect them and use them in Spark jobs, e.g., ` sc.parallelize(query.recentProgress)...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16272#discussion_r92279247 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -71,7 +71,7 @@ trait ProgressReporter extends Logging { private var metricWarningLogged: Boolean = false /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ - private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() + @transient private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() --- End diff -- mark `progressBuffer` `@transient` instead of changing it to serializable because it may contain a lot of progresses and we should try to avoid sending them to executors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/16272 [SPARK-18850][SS]Make StreamExecution serializable ## What changes were proposed in this pull request? This PR makes StreamExecution serializable because it is too easy for it to get captured with normal usage. If StreamExecution gets captured in a closure but no place calls its methods, it should not fail the Spark tasks. If its methods are called, then this PR will throw a better message. ## How was this patch tested? `test("StreamExecution should be Serializable but cannot be used in executors")` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-18850 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16272 commit 3d22be8137002ac95cc0926ba4c70c8a8136e3f0 Author: Shixiong ZhuDate: 2016-12-13T22:08:50Z Make StreamExecution serializable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org