[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92709390
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 ---
@@ -137,12 +174,12 @@ object StreamingQueryStatusAndProgressSuite {
 name = "myName",
 timestamp = "2016-12-05T20:54:20.827Z",
 batchId = 2L,
-durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-eventTime = Map(
+durationMs = new java.util.HashMap(Map("total" -> 
0L).mapValues(long2Long).asJava),
--- End diff --

Changed them to use the same approach as `ProgressReporter` to create Java 
Map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92702425
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 ---
@@ -137,12 +146,13 @@ object StreamingQueryStatusAndProgressSuite {
 name = "myName",
 timestamp = "2016-12-05T20:54:20.827Z",
 batchId = 2L,
-durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-eventTime = Map(
-  "max" -> "2016-12-05T20:54:20.827Z",
-  "min" -> "2016-12-05T20:54:20.827Z",
-  "avg" -> "2016-12-05T20:54:20.827Z",
-  "watermark" -> "2016-12-05T20:54:20.827Z").asJava,
+durationMs = Collections.singletonMap("total", 0L),
--- End diff --

Wait ... does this mean, that the maps we are generating in the 
StreamingQuery is NOT serializable?? Because we are probably doing the same 
conversion in ProgressReporter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92690628
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 ---
@@ -137,12 +146,13 @@ object StreamingQueryStatusAndProgressSuite {
 name = "myName",
 timestamp = "2016-12-05T20:54:20.827Z",
 batchId = 2L,
-durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
-eventTime = Map(
-  "max" -> "2016-12-05T20:54:20.827Z",
-  "min" -> "2016-12-05T20:54:20.827Z",
-  "avg" -> "2016-12-05T20:54:20.827Z",
-  "watermark" -> "2016-12-05T20:54:20.827Z").asJava,
+durationMs = Collections.singletonMap("total", 0L),
--- End diff --

I changed these two maps to pure Java maps, as the map created by `asJava` 
is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92687106
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -439,6 +440,37 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 }
   }
 
+  test("progress classes should be Serializable") {
--- End diff --

This should be in the StreamingStatusAndPRogressSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92687025
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -300,6 +302,48 @@ class StreamSuite extends StreamTest {
   q.stop()
 }
   }
+
+  test("StreamingQuery should be Serializable but cannot be used in 
executors") {
--- End diff --

Why is this in StreamSuite. This is better to be in StreamingQuerySuite as 
its related to the StreamingQuery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92530950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -83,7 +83,7 @@ class StreamingQueryProgress private[sql](
   val currentWatermark: Long,
   val stateOperators: Array[StateOperatorProgress],
   val sources: Array[SourceProgress],
-  val sink: SinkProgress) {
+  val sink: SinkProgress) extends Serializable {
--- End diff --

There should be new tests for verifying serializability


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92530788
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryWrapper.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamingQueryProgress, StreamingQueryStatus}
+
+/**
+ * Wrap non-serializable StreamExecution to make the query serializable as 
it's easy to for it to
+ * get captured with normal usage. It's safe to capture the query but not 
use it in executors.
+ * However, if the user tries to call its methods, it will throw 
`IllegalStateException`.
+ */
+class StreamingQueryWrapper(
+@transient val streamingQuery: StreamExecution) extends StreamingQuery 
with Serializable {
--- End diff --

streamingQuery -> @transient private val _streamExecution
def streamExecution: StreamExecution = {
if (_streamExecution == null) {
   throw new IllegalStateException("StreamingQuery cannot be used in 
executors")
}
_streamExecution
}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92530604
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 ---
@@ -24,7 +24,7 @@ import scala.collection.{immutable, GenTraversableOnce}
  */
 class StreamProgress(
 val baseMap: immutable.Map[Source, Offset] = new 
immutable.HashMap[Source, Offset])
-  extends scala.collection.immutable.Map[Source, Offset] {
+  extends scala.collection.immutable.Map[Source, Offset] with Serializable 
{
--- End diff --

I dont think this is needed any more. This is purely internal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92481457
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 @Experimental
 class StateOperatorProgress private[sql](
 val numRowsTotal: Long,
-val numRowsUpdated: Long) {
+val numRowsUpdated: Long) extends Serializable {
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92279982
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 @Experimental
 class StateOperatorProgress private[sql](
 val numRowsTotal: Long,
-val numRowsUpdated: Long) {
+val numRowsUpdated: Long) extends Serializable {
--- End diff --

Add `Serializable` to these progress classes so that the user can collect 
them and use them in Spark jobs, e.g., ` 
sc.parallelize(query.recentProgress)...` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16272#discussion_r92279247
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -71,7 +71,7 @@ trait ProgressReporter extends Logging {
   private var metricWarningLogged: Boolean = false
 
   /** Holds the most recent query progress updates.  Accesses must lock on 
the queue itself. */
-  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+  @transient private val progressBuffer = new 
mutable.Queue[StreamingQueryProgress]()
--- End diff --

mark `progressBuffer` `@transient` instead of changing it to serializable 
because it may contain a lot of progresses and we should try to avoid sending 
them to executors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16272: [SPARK-18850][SS]Make StreamExecution serializabl...

2016-12-13 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/16272

[SPARK-18850][SS]Make StreamExecution serializable

## What changes were proposed in this pull request?

This PR makes StreamExecution serializable because it is too easy for it to 
get captured with normal usage. If StreamExecution gets captured in a closure 
but no place calls its methods, it should not fail the Spark tasks. If its 
methods are called, then this PR will throw a better message.

## How was this patch tested?

`test("StreamExecution should be Serializable but cannot be used in 
executors")`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-18850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16272


commit 3d22be8137002ac95cc0926ba4c70c8a8136e3f0
Author: Shixiong Zhu 
Date:   2016-12-13T22:08:50Z

Make StreamExecution serializable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org