Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15307#discussion_r82716902
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -189,6 +189,282 @@ def resetTerminated(self):
             self._jsqm.resetTerminated()
     
     
    +class StreamingQueryStatus(object):
    +    """A class used to report information about the progress of a 
StreamingQuery.
    +
    +    .. note:: Experimental
    +
    +    .. versionadded:: 2.1
    +    """
    +
    +    def __init__(self, jsqs):
    +        self._jsqs = jsqs
    +
    +    def __str__(self):
    +        """
    +        Pretty string of this query status.
    +
    +        >>> print(sqs)
    +        StreamingQueryStatus:
    +            Query name: query
    +            Query id: 1
    +            Status timestamp: 123
    +            Input rate: 1.0 rows/sec
    +            Processing rate 2.0 rows/sec
    +            Latency: 345.0 ms
    +            Trigger status:
    +                key: value
    +            Source statuses [1 source]:
    +                Source 1:    MySource1
    +                    Available offset: #0
    +                    Input rate: 4.0 rows/sec
    +                    Processing rate: 5.0 rows/sec
    +                    Trigger status:
    +                        key: value
    +            Sink status:     MySink
    +                Committed offsets: [#1, -]
    +        """
    +        return self._jsqs.toString()
    +
    +    @property
    +    @ignore_unicode_prefix
    +    @since(2.1)
    +    def name(self):
    +        """
    +        Name of the query. This name is unique across all active queries.
    +
    +        >>> sqs.name
    +        u'query'
    +        """
    +        return self._jsqs.name()
    +
    +    @property
    +    @since(2.1)
    +    def id(self):
    +        """
    +        Id of the query. This id is unique across all queries that have 
been started in
    +        the current process.
    +
    +        >>> int(sqs.id)
    +        1
    +        """
    +        return self._jsqs.id()
    +
    +    @property
    +    @since(2.1)
    +    def timestamp(self):
    +        """
    +        Timestamp (ms) of when this query was generated.
    +
    +        >>> int(sqs.timestamp)
    +        123
    +        """
    +        return self._jsqs.timestamp()
    +
    +    @property
    +    @since(2.1)
    +    def inputRate(self):
    +        """
    +        Current rate (rows/sec) at which data is being generated by all 
the sources.
    +
    +        >>> sqs.inputRate
    +        1.0
    +        """
    +        return self._jsqs.inputRate()
    +
    +    @property
    +    @since(2.1)
    +    def processingRate(self):
    +        """
    +        Current rate (rows/sec) at which the query is processing data from 
all the sources.
    +
    +        >>> sqs.processingRate
    +        2.0
    +        """
    +        return self._jsqs.processingRate()
    +
    +    @property
    +    @since(2.1)
    +    def latency(self):
    +        """
    +        Current average latency between the data being available in source 
and the sink
    +        writing the corresponding output.
    +
    +        >>> sqs.latency
    +        345.0
    +        """
    +        if (self._jsqs.latency().nonEmpty()):
    +            return self._jsqs.latency().get()
    +        else:
    +            return None
    +
    +    @property
    +    @since(2.1)
    +    def sourceStatuses(self):
    +        """
    +        Current statuses of the sources.
    +
    +        >>> len(sqs.sourceStatuses)
    +        1
    +        >>> sqs.sourceStatuses[0].description
    +        u'MySource1'
    +        """
    +        return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
    +
    +    @property
    +    @since(2.1)
    +    def sinkStatus(self):
    +        """
    +        Current status of the sink.
    +
    +        >>> sqs.sinkStatus.description
    +        u'MySink'
    +        """
    +        return SinkStatus(self._jsqs.sinkStatus())
    +
    +    @property
    +    @since(2.1)
    +    def triggerStatus(self):
    +        """
    +        Low-level detailed status of the last completed/currently active 
trigger.
    +
    +        >>> sqs.triggerStatus
    +        {u'key': u'value'}
    +        """
    +        return self._jsqs.triggerStatus()
    +
    +
    +class SourceStatus(object):
    +    """
    +    Status and metrics of a streaming Source.
    +
    +    .. note:: Experimental
    +
    +    .. versionadded:: 2.1
    +    """
    +
    +    def __init__(self, jss):
    +        self._jss = jss
    +
    +    def __str__(self):
    +        """
    +        Pretty string of this source status.
    +
    +        >>> print(sqs.sourceStatuses[0])
    +        SourceStatus:    MySource1
    +            Available offset: #0
    +            Input rate: 4.0 rows/sec
    +            Processing rate: 5.0 rows/sec
    +            Trigger status:
    +                key: value
    +        """
    +        return self._jss.toString()
    +
    +    @property
    +    @ignore_unicode_prefix
    +    @since(2.1)
    +    def description(self):
    +        """
    +        Description of the source corresponding to this status.
    +
    +        >>> sqs.sourceStatuses[0].description
    +        u'MySource1'
    +        """
    +        return self._jss.description()
    +
    +    @property
    +    @ignore_unicode_prefix
    +    @since(2.1)
    +    def offsetDesc(self):
    +        """
    +        Description of the current offset if known.
    +
    +        >>> sqs.sourceStatuses[0].offsetDesc
    +        u'#0'
    +        """
    +        return self._jss.offsetDesc()
    +
    +    @property
    +    @since(2.1)
    +    def inputRate(self):
    +        """
    +        Current rate (rows/sec) at which data is being generated by the 
source.
    +
    +        >>> sqs.sourceStatuses[0].inputRate
    +        4.0
    +        """
    +        return self._jss.inputRate()
    +
    +    @property
    +    @since(2.1)
    +    def processingRate(self):
    +        """
    +        Current rate (rows/sec) at which the query is processing data from 
the source.
    +
    +        >>> sqs.sourceStatuses[0].processingRate
    +        5.0
    +        """
    +        return self._jss.processingRate()
    +
    +    @property
    +    @since(2.1)
    +    def triggerStatus(self):
    +        """
    +        Low-level detailed status of the last completed/currently active 
trigger.
    +
    +        >>> sqs.sourceStatuses[0].triggerStatus
    +        {u'key': u'value'}
    +       """
    +        return self._jss.triggerStatus()
    +
    +
    +class SinkStatus(object):
    --- End diff --
    
    yeah. no output rate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to