Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15307#discussion_r82716902 --- Diff: python/pyspark/sql/streaming.py --- @@ -189,6 +189,282 @@ def resetTerminated(self): self._jsqm.resetTerminated() +class StreamingQueryStatus(object): + """A class used to report information about the progress of a StreamingQuery. + + .. note:: Experimental + + .. versionadded:: 2.1 + """ + + def __init__(self, jsqs): + self._jsqs = jsqs + + def __str__(self): + """ + Pretty string of this query status. + + >>> print(sqs) + StreamingQueryStatus: + Query name: query + Query id: 1 + Status timestamp: 123 + Input rate: 1.0 rows/sec + Processing rate 2.0 rows/sec + Latency: 345.0 ms + Trigger status: + key: value + Source statuses [1 source]: + Source 1: MySource1 + Available offset: #0 + Input rate: 4.0 rows/sec + Processing rate: 5.0 rows/sec + Trigger status: + key: value + Sink status: MySink + Committed offsets: [#1, -] + """ + return self._jsqs.toString() + + @property + @ignore_unicode_prefix + @since(2.1) + def name(self): + """ + Name of the query. This name is unique across all active queries. + + >>> sqs.name + u'query' + """ + return self._jsqs.name() + + @property + @since(2.1) + def id(self): + """ + Id of the query. This id is unique across all queries that have been started in + the current process. + + >>> int(sqs.id) + 1 + """ + return self._jsqs.id() + + @property + @since(2.1) + def timestamp(self): + """ + Timestamp (ms) of when this query was generated. + + >>> int(sqs.timestamp) + 123 + """ + return self._jsqs.timestamp() + + @property + @since(2.1) + def inputRate(self): + """ + Current rate (rows/sec) at which data is being generated by all the sources. + + >>> sqs.inputRate + 1.0 + """ + return self._jsqs.inputRate() + + @property + @since(2.1) + def processingRate(self): + """ + Current rate (rows/sec) at which the query is processing data from all the sources. + + >>> sqs.processingRate + 2.0 + """ + return self._jsqs.processingRate() + + @property + @since(2.1) + def latency(self): + """ + Current average latency between the data being available in source and the sink + writing the corresponding output. + + >>> sqs.latency + 345.0 + """ + if (self._jsqs.latency().nonEmpty()): + return self._jsqs.latency().get() + else: + return None + + @property + @since(2.1) + def sourceStatuses(self): + """ + Current statuses of the sources. + + >>> len(sqs.sourceStatuses) + 1 + >>> sqs.sourceStatuses[0].description + u'MySource1' + """ + return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()] + + @property + @since(2.1) + def sinkStatus(self): + """ + Current status of the sink. + + >>> sqs.sinkStatus.description + u'MySink' + """ + return SinkStatus(self._jsqs.sinkStatus()) + + @property + @since(2.1) + def triggerStatus(self): + """ + Low-level detailed status of the last completed/currently active trigger. + + >>> sqs.triggerStatus + {u'key': u'value'} + """ + return self._jsqs.triggerStatus() + + +class SourceStatus(object): + """ + Status and metrics of a streaming Source. + + .. note:: Experimental + + .. versionadded:: 2.1 + """ + + def __init__(self, jss): + self._jss = jss + + def __str__(self): + """ + Pretty string of this source status. + + >>> print(sqs.sourceStatuses[0]) + SourceStatus: MySource1 + Available offset: #0 + Input rate: 4.0 rows/sec + Processing rate: 5.0 rows/sec + Trigger status: + key: value + """ + return self._jss.toString() + + @property + @ignore_unicode_prefix + @since(2.1) + def description(self): + """ + Description of the source corresponding to this status. + + >>> sqs.sourceStatuses[0].description + u'MySource1' + """ + return self._jss.description() + + @property + @ignore_unicode_prefix + @since(2.1) + def offsetDesc(self): + """ + Description of the current offset if known. + + >>> sqs.sourceStatuses[0].offsetDesc + u'#0' + """ + return self._jss.offsetDesc() + + @property + @since(2.1) + def inputRate(self): + """ + Current rate (rows/sec) at which data is being generated by the source. + + >>> sqs.sourceStatuses[0].inputRate + 4.0 + """ + return self._jss.inputRate() + + @property + @since(2.1) + def processingRate(self): + """ + Current rate (rows/sec) at which the query is processing data from the source. + + >>> sqs.sourceStatuses[0].processingRate + 5.0 + """ + return self._jss.processingRate() + + @property + @since(2.1) + def triggerStatus(self): + """ + Low-level detailed status of the last completed/currently active trigger. + + >>> sqs.sourceStatuses[0].triggerStatus + {u'key': u'value'} + """ + return self._jss.triggerStatus() + + +class SinkStatus(object): --- End diff -- yeah. no output rate.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org