For advanced users, it's certainly an option to look at the streaming query
progress and use the state store reader to look at your state. However, the
goal of this Enhanced Console Sink is to improve the experience for
*new *users,
i.e. it should work mostly out of the box.

Let's move discussion to the JIRA (SPARK-47362
<https://issues.apache.org/jira/browse/SPARK-47362>), since most
participants here are in alignment.

On Tue, Mar 12, 2024 at 7:25 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> OK I have just been working on  a Databricks engineering question raised
> by a user
>
> Monitoring structure streaming in external sink
> <https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/td-p/63069>
>
> In practice there is an option to use *StreamingQueryListener* from 
> *pyspark.sql.streaming
> import DataStreamWriter, StreamingQueryListene*r to get the matrix out
> fpreachBatch
>
> For example
> onQueryProgress
> microbatch_data received
> {
> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
> "name" : null,
> "timestamp" : "2024-03-10T09:21:27.233Z",
> "batchId" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701,
> "durationMs" : {
> "addBatch" : 37,
> "commitOffsets" : 41,
> "getBatch" : 0,
> "latestOffset" : 0,
> "queryPlanning" : 5,
> "triggerExecution" : 187,
> "walCommit" : 104
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
> numPartitions=default",
> etc
>
> Will that help?
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Fri, 9 Feb 2024 at 22:39, Neil Ramaswamy <neil.ramasw...@databricks.com>
> wrote:
>
>> Thanks for the comments, Anish and Jerry. To summarize so far, we are in
>> agreement that:
>>
>> 1. Enhanced console sink is a good tool for new users to understand
>> Structured Streaming semantics
>> 2. It should be opt-in via an option (unlike my original proposal)
>> 3. Out of the 2 modes of verbosity I proposed, we're fine with the first
>> mode for now (print sink data with event-time metadata and state data for
>> stateful queries, with duration-rendered timestamps, with just the
>> KeyWithIndexToValue state store for joins, and with a state table for every
>> stateful operator, if there are multiple).
>>
>> I think the last pending suggestion (from Raghu, Anish, and Jerry) is how
>> to structure the output so that it's clear what is data and what is
>> metadata. Here's my proposal:
>>
>> ------------------------------------------
>> BATCH: 1
>> ------------------------------------------
>>
>> +----------------------------------------+
>> |           ROWS WRITTEN TO SINK         |
>> +--------------------------+-------------+
>> |          window          |   count     |
>> +--------------------------+-------------+
>> | {10 seconds, 20 seconds} |      2      |
>> +--------------------------+-------------+
>>
>> +----------------------------------------+
>> |           EVENT TIME METADATA          |
>> +----------------------------------------+
>> | watermark -> 21 seconds                |
>> | numDroppedRows -> 0                    |
>> +----------------------------------------+
>>
>> +----------------------------------------+
>> |          ROWS IN STATE STORE           |
>> +--------------------------+-------------+
>> |           key            |    value    |
>> +--------------------------+-------------+
>> | {30 seconds, 40 seconds} |     {1}     |
>> +--------------------------+-------------+
>>
>> If there are no more major concerns, I think we can discuss smaller
>> details in the JIRA ticket or PR itself. I don't think a SPIP is needed for
>> a flag-gated benign change like this, but please let me know if you
>> disagree.
>>
>> Best,
>> Neil
>>
>> On Thu, Feb 8, 2024 at 5:37 PM Jerry Peng <jerry.boyang.p...@gmail.com>
>> wrote:
>>
>>> I am generally a +1 on this as we can use this information in our docs
>>> to demonstrate certains concepts to potential users.
>>>
>>> I am in agreement with other reviewers that we should keep the existing
>>> default behavior of the console sink.  This new style of output should be
>>> enabled behind a flag.
>>>
>>> As for the output of this "new mode" in the console sink, can we be more
>>> explicit about what is the actual output and what is the metadata?  It is
>>> not clear from the logged output.
>>>
>>> On Tue, Feb 6, 2024 at 11:08 AM Neil Ramaswamy
>>> <neil.ramasw...@databricks.com.invalid> wrote:
>>>
>>>> Jungtaek and Raghu, thanks for the input. I'm happy with the verbose
>>>> mode being off by default.
>>>>
>>>> I think it's reasonable to have 1 or 2 levels of verbosity:
>>>>
>>>>    1. The first verbose mode could target new users, and take a highly
>>>>    opinionated view on what's important to understand streaming semantics.
>>>>    This would include printing the sink rows, watermark, number of dropped
>>>>    rows (if any), and state data. For state data, we should print for all
>>>>    state stores (for multiple stateful operators), but for joins, I think
>>>>    rendering just the KeyWithIndexToValueStore(s) is reasonable. Timestamps
>>>>    would render as durations (see original message) to make small examples
>>>>    easy to understand.
>>>>    2. The second verbose mode could target more advanced users trying
>>>>    to create a reproduction. In addition to the first verbose mode, it 
>>>> would
>>>>    also print the other join state store, the number of evicted rows due to
>>>>    the watermark, and print timestamps as extended ISO 8601 strings (same 
>>>> as
>>>>    today).
>>>>
>>>> Rather than implementing both, I would prefer to implement the first
>>>> level, and evaluate later if the second would be useful.
>>>>
>>>> Mich, can you elaborate on why you don't think it's useful? To
>>>> reiterate, this proposal is to bring to light certain metrics/values that
>>>> are essential for understanding SS micro-batching semantics. It's to help
>>>> users go from 0 to 1, not 1 to 100. (And the Spark UI can't be the place
>>>> for rendering sink data or state store values—there should be no sensitive
>>>> user data there.)
>>>>
>>>> On Mon, Feb 5, 2024 at 11:32 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> I don't think adding this to the streaming flow (at micro level) will
>>>>> be that useful
>>>>>
>>>>> However, this can be added to Spark UI as an enhancement to
>>>>> the Streaming Query Statistics page.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 6 Feb 2024 at 03:49, Raghu Angadi <raghu.ang...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Agree, the default behavior does not need to change.
>>>>>>
>>>>>> Neil, how about separating it into two sections:
>>>>>>
>>>>>>    - Actual rows in the sink (same as current output)
>>>>>>    - Followed by metadata data
>>>>>>
>>>>>>

Reply via email to