[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16125


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-03 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16125#discussion_r90767030
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 ---
@@ -24,32 +24,42 @@ import 
org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
  * :: Experimental ::
  * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual 
exception
  * that caused the failure.
- * @param query   Query that caused the exception
  * @param message Message of this exception
  * @param cause   Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in 
which exception occurred
- * @param endOffset   Ending offset (if known) of the range of data in 
exception occurred
+ * @param startOffset Starting offset in json of the range of data in 
which exception occurred
+ * @param endOffset   Ending offset in json of the range of data in 
exception occurred
  * @since 2.0.0
  */
 @Experimental
-class StreamingQueryException private[sql](
-@transient val query: StreamingQuery,
+class StreamingQueryException private(
+causeString: String,
 val message: String,
 val cause: Throwable,
-val startOffset: Option[OffsetSeq] = None,
-val endOffset: Option[OffsetSeq] = None)
+val startOffset: String,
+val endOffset: String)
   extends Exception(message, cause) {
 
+  private[sql] def this(
+  query: StreamingQuery,
+  message: String,
+  cause: Throwable,
+  startOffset: String,
+  endOffset: String) {
+this(
+  // scalastyle:off
+  s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} 
${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
--- End diff --

The class name here allows 
[capture_sql_exception](https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/python/pyspark/sql/utils.py#L60)
 to convert Java StreamingQueryException to Python StreamingQueryException. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16125#discussion_r90748623
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
   TestAwaitTermination(ExpectException[SparkException]),
   TestAwaitTermination(ExpectException[SparkException], timeoutMs = 
2000),
   TestAwaitTermination(ExpectException[SparkException], timeoutMs = 
10),
-  AssertOnQuery(
-q => q.exception.get.startOffset.get.offsets ===
-  q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
-"incorrect start offset on exception")
+  AssertOnQuery(q => {
+q.exception.get.startOffset ===
+  q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
--- End diff --

This line would conflict with my PR #16113 , you may have to fix issues 
after my PR gets merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16125#discussion_r90748582
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
 class StateOperatorProgress private[sql](
 val numRowsTotal: Long,
 val numRowsUpdated: Long) {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
--- End diff --

is this needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16125#discussion_r90748529
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -132,6 +133,61 @@ def stop(self):
 """
 self._jsq.stop()
 
+@since(2.1)
+def explain(self, extended=False):
+"""Prints the (logical and physical) plans to the console for 
debugging purpose.
+
+:param extended: boolean, default ``False``. If ``False``, prints 
only the physical plan.
+
+>>> sq = 
sdf.writeStream.format('memory').queryName('query_explain').start()
+>>> sq.processAllAvailable() # Wait a bit to generate the runtime 
plans.
+>>> sq.explain()
+== Physical Plan ==
+...
+>>> sq.explain(True)
+== Parsed Logical Plan ==
+...
+== Analyzed Logical Plan ==
+...
+== Optimized Logical Plan ==
+...
+== Physical Plan ==
+...
+>>> sq.stop()
+"""
+# Cannot call `_jsq.explain(...)` because it will print in the JVM 
process.
+# We should print it in the Python process.
+print(self._jsq.explainInternal(extended))
+
+@since(2.1)
+def exception(self):
+"""
+:return: the StreamingQueryException if the query was terminated 
by an exception, or None.
+
+>>> sq = 
sdf.writeStream.format('memory').queryName('query_exception').start()
+>>> str(sq.exception())
+'None'
+>>> sq.stop()
+>>> from pyspark.sql.functions import col, udf
+>>> bad_udf = udf(lambda x: x / 0)
+>>> sq = 
sdf.select(bad_udf(col("value"))).writeStream.format('memory')\
+  .queryName('this_query').start()
+>>> try:
+... sq.processAllAvailable() # Process some data to fail the 
query
+... except:
+... pass # Ignore the error as we want to test the "exception" 
method
+>>> sq.exception()
+StreamingQueryException()
+>>> sq.stop()
--- End diff --

I am not sure if such a big test should be present as doc tests.  please 
add this to the sql/tests.py.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...

2016-12-02 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/16125

[SPARK-18694][SS]Add StreamingQuery.explain and exception to Python and fix 
StreamingQueryException

## What changes were proposed in this pull request?

- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`. 

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark py-streaming-explain

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16125


commit 757cf2f31b267a665e711a3196de59484acb5268
Author: Shixiong Zhu 
Date:   2016-12-03T00:21:32Z

Add StreamingQuery.explain and exception to Python and fix 
StreamingQueryException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org