I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
I think a listener-based API makes sense for streaming (since you need to keep watching the result), but may not be very reasonable for batch queries (you only get the result once). The idea of Observation looks good, but we should define what happens if `observation.get` is called before the batch query finishes.

Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim <kabhwan.opensou...@gmail.com <mailto:kabhwan.opensou...@gmail.com>> wrote:

    Please follow up the discussion in the origin PR.
    https://github.com/apache/spark/pull/26127
    <https://github.com/apache/spark/pull/26127>

    Dataset.observe() relies on the query listener for the batch query
    which is an "unstable" API - that's why we decided to not add an
    example for the batch query. For streaming query, it relies on the
    streaming query listener which is a stable API. That said,
    personally I'd consider the new API to be fit to the streaming
    query first, and see whether it fits to the batch query as well.

    If we found Dataset.observe() to be useful enough on the batch
    query, we'd probably be better to discuss how to provide these
    metrics against a stable API (so that Scala users could leverage
    it), and look back later for PySpark. That looks to be the first
    one to do if we have a consensus on the usefulness of observable
    metrics on batch query.


    On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
    <m...@enrico.minack.dev <mailto:m...@enrico.minack.dev>> wrote:

        I am focusing on batch mode, not streaming mode. I would argue
        that Dataset.observe() is equally useful for large batch
        processing. If you need some motivating use cases, please let
        me know.

        Anyhow, the documentation of observe states it works for both,
        batch and streaming. And in batch mode, the helper class
        Observation helps reducing code and avoiding repetition.

        The PySpark implementation of the Observation class can
        implement *all* methods by merely calling into their JVM
        counterpart, where the locking, listening, registration and
        un-registration happens. I think this qualifies as: "all the
        logic happens in the JVM". All that is transferred to Python
        is a row's data. No listeners needed.

        Enrico



        Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
        If I remember correctly, the major audience of the "observe"
        API is Structured Streaming, micro-batch mode. From the
        example, the abstraction in 2 isn't something working with
        Structured Streaming. It could be still done with callback,
        but it remains the question how much complexity is hidden
        from abstraction.

        I see you're focusing on PySpark - I'm not sure whether
        there's intention on not exposing query listener / streaming
        query listener to PySpark, but if there's some valid reason
        to do so, I'm not sure we do like to expose them to PySpark
        in any way. 2 isn't making sense to me with PySpark - how do
        you ensure all the logic is happening in the JVM and you can
        leverage these values from PySpark?
        (I see there's support for listeners with DStream in PySpark,
        so there might be reasons not to add the same for SQL/SS.
        Probably a lesson learned?)


        On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
        <m...@enrico.minack.dev <mailto:m...@enrico.minack.dev>> wrote:

            Hi Spark-Devs,

            the observable metrics that have been added to the
            Dataset API in 3.0.0 are a great improvement over the
            Accumulator APIs that seem to have much weaker
            guarantees. I have two questions regarding follow-up
            contributions:

            *1. Add observe to Python ***DataFrame**

            As I can see from master branch, there is no equivalent
            in the Python API. Is this something planned to happen,
            or is it missing because there are not listeners in
            PySpark which renders this method useless in Python. I
            would be happy to contribute here.

            *2. Add Observation class to simplify result access
            *

            The Dataset.observe method requires users to register
            listeners
            
<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
            with QueryExecutionListener or StreamingQUeryListener to
            obtain the result. I think for simple setups, this could
            be hidden behind a common helper class here called
            Observation, which reduces the usage of observe to three
            lines of code:

            // our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), 
(8, "d")).toDF("id", "value")

            // define the observation we want to make val observation =Observation("stats", 
count($"id"), sum($"id"))

            // add the observation to the Dataset and execute an
            action on it val cnt = df.observe(observation).count()

            // retrieve the result assert(observation.get ===Row(4, 15))

            The Observation class can handle the registration and
            de-registration of the listener, as well as properly
            accessing the result across thread boundaries.

            With *2.*, the observe method can be added to PySpark
            without introducing listeners there at all. All the logic
            is happening in the JVM.

            Thanks for your thoughts on this.

            Regards,
            Enrico

Reply via email to