[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16125 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16125#discussion_r90767030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala --- @@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut * :: Experimental :: * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception * that caused the failure. - * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception - * @param startOffset Starting offset (if known) of the range of data in which exception occurred - * @param endOffset Ending offset (if known) of the range of data in exception occurred + * @param startOffset Starting offset in json of the range of data in which exception occurred + * @param endOffset Ending offset in json of the range of data in exception occurred * @since 2.0.0 */ @Experimental -class StreamingQueryException private[sql]( -@transient val query: StreamingQuery, +class StreamingQueryException private( +causeString: String, val message: String, val cause: Throwable, -val startOffset: Option[OffsetSeq] = None, -val endOffset: Option[OffsetSeq] = None) +val startOffset: String, +val endOffset: String) extends Exception(message, cause) { + private[sql] def this( + query: StreamingQuery, + message: String, + cause: Throwable, + startOffset: String, + endOffset: String) { +this( + // scalastyle:off + s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")} --- End diff -- The class name here allows [capture_sql_exception](https://github.com/apache/spark/blob/4a3c09601ba69f7d49d1946bb6f20f5cfe453031/python/pyspark/sql/utils.py#L60) to convert Java StreamingQueryException to Python StreamingQueryException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16125#discussion_r90748623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException]), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), - AssertOnQuery( -q => q.exception.get.startOffset.get.offsets === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets, -"incorrect start offset on exception") + AssertOnQuery(q => { +q.exception.get.startOffset === + q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString && --- End diff -- This line would conflict with my PR #16113 , you may have to fix issues after my PR gets merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16125#discussion_r90748582 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) --- End diff -- is this needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16125#discussion_r90748529 --- Diff: python/pyspark/sql/streaming.py --- @@ -132,6 +133,61 @@ def stop(self): """ self._jsq.stop() +@since(2.1) +def explain(self, extended=False): +"""Prints the (logical and physical) plans to the console for debugging purpose. + +:param extended: boolean, default ``False``. If ``False``, prints only the physical plan. + +>>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() +>>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. +>>> sq.explain() +== Physical Plan == +... +>>> sq.explain(True) +== Parsed Logical Plan == +... +== Analyzed Logical Plan == +... +== Optimized Logical Plan == +... +== Physical Plan == +... +>>> sq.stop() +""" +# Cannot call `_jsq.explain(...)` because it will print in the JVM process. +# We should print it in the Python process. +print(self._jsq.explainInternal(extended)) + +@since(2.1) +def exception(self): +""" +:return: the StreamingQueryException if the query was terminated by an exception, or None. + +>>> sq = sdf.writeStream.format('memory').queryName('query_exception').start() +>>> str(sq.exception()) +'None' +>>> sq.stop() +>>> from pyspark.sql.functions import col, udf +>>> bad_udf = udf(lambda x: x / 0) +>>> sq = sdf.select(bad_udf(col("value"))).writeStream.format('memory')\ + .queryName('this_query').start() +>>> try: +... sq.processAllAvailable() # Process some data to fail the query +... except: +... pass # Ignore the error as we want to test the "exception" method +>>> sq.exception() +StreamingQueryException() +>>> sq.stop() --- End diff -- I am not sure if such a big test should be present as doc tests. please add this to the sql/tests.py. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16125: [SPARK-18694][SS]Add StreamingQuery.explain and e...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/16125 [SPARK-18694][SS]Add StreamingQuery.explain and exception to Python and fix StreamingQueryException ## What changes were proposed in this pull request? - Add StreamingQuery.explain and exception to Python. - Fix StreamingQueryException to not expose `OffsetSeq`. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark py-streaming-explain Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16125.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16125 commit 757cf2f31b267a665e711a3196de59484acb5268 Author: Shixiong Zhu Date: 2016-12-03T00:21:32Z Add StreamingQuery.explain and exception to Python and fix StreamingQueryException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org