OK I have just been working on  a Databricks engineering question raised by
a user

Monitoring structure streaming in external sink
<https://community.databricks.com/t5/data-engineering/monitoring-structure-streaming-in-externar-sink/td-p/63069>

In practice there is an option to use *StreamingQueryListener* from
*pyspark.sql.streaming
import DataStreamWriter, StreamingQueryListene*r to get the matrix out
fpreachBatch

For example
onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
numPartitions=default",
etc

Will that help?

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 9 Feb 2024 at 22:39, Neil Ramaswamy <neil.ramasw...@databricks.com>
wrote:

> Thanks for the comments, Anish and Jerry. To summarize so far, we are in
> agreement that:
>
> 1. Enhanced console sink is a good tool for new users to understand
> Structured Streaming semantics
> 2. It should be opt-in via an option (unlike my original proposal)
> 3. Out of the 2 modes of verbosity I proposed, we're fine with the first
> mode for now (print sink data with event-time metadata and state data for
> stateful queries, with duration-rendered timestamps, with just the
> KeyWithIndexToValue state store for joins, and with a state table for every
> stateful operator, if there are multiple).
>
> I think the last pending suggestion (from Raghu, Anish, and Jerry) is how
> to structure the output so that it's clear what is data and what is
> metadata. Here's my proposal:
>
> ------------------------------------------
> BATCH: 1
> ------------------------------------------
>
> +----------------------------------------+
> |           ROWS WRITTEN TO SINK         |
> +--------------------------+-------------+
> |          window          |   count     |
> +--------------------------+-------------+
> | {10 seconds, 20 seconds} |      2      |
> +--------------------------+-------------+
>
> +----------------------------------------+
> |           EVENT TIME METADATA          |
> +----------------------------------------+
> | watermark -> 21 seconds                |
> | numDroppedRows -> 0                    |
> +----------------------------------------+
>
> +----------------------------------------+
> |          ROWS IN STATE STORE           |
> +--------------------------+-------------+
> |           key            |    value    |
> +--------------------------+-------------+
> | {30 seconds, 40 seconds} |     {1}     |
> +--------------------------+-------------+
>
> If there are no more major concerns, I think we can discuss smaller
> details in the JIRA ticket or PR itself. I don't think a SPIP is needed for
> a flag-gated benign change like this, but please let me know if you
> disagree.
>
> Best,
> Neil
>
> On Thu, Feb 8, 2024 at 5:37 PM Jerry Peng <jerry.boyang.p...@gmail.com>
> wrote:
>
>> I am generally a +1 on this as we can use this information in our docs to
>> demonstrate certains concepts to potential users.
>>
>> I am in agreement with other reviewers that we should keep the existing
>> default behavior of the console sink.  This new style of output should be
>> enabled behind a flag.
>>
>> As for the output of this "new mode" in the console sink, can we be more
>> explicit about what is the actual output and what is the metadata?  It is
>> not clear from the logged output.
>>
>> On Tue, Feb 6, 2024 at 11:08 AM Neil Ramaswamy
>> <neil.ramasw...@databricks.com.invalid> wrote:
>>
>>> Jungtaek and Raghu, thanks for the input. I'm happy with the verbose
>>> mode being off by default.
>>>
>>> I think it's reasonable to have 1 or 2 levels of verbosity:
>>>
>>>    1. The first verbose mode could target new users, and take a highly
>>>    opinionated view on what's important to understand streaming semantics.
>>>    This would include printing the sink rows, watermark, number of dropped
>>>    rows (if any), and state data. For state data, we should print for all
>>>    state stores (for multiple stateful operators), but for joins, I think
>>>    rendering just the KeyWithIndexToValueStore(s) is reasonable. Timestamps
>>>    would render as durations (see original message) to make small examples
>>>    easy to understand.
>>>    2. The second verbose mode could target more advanced users trying
>>>    to create a reproduction. In addition to the first verbose mode, it would
>>>    also print the other join state store, the number of evicted rows due to
>>>    the watermark, and print timestamps as extended ISO 8601 strings (same as
>>>    today).
>>>
>>> Rather than implementing both, I would prefer to implement the first
>>> level, and evaluate later if the second would be useful.
>>>
>>> Mich, can you elaborate on why you don't think it's useful? To
>>> reiterate, this proposal is to bring to light certain metrics/values that
>>> are essential for understanding SS micro-batching semantics. It's to help
>>> users go from 0 to 1, not 1 to 100. (And the Spark UI can't be the place
>>> for rendering sink data or state store values—there should be no sensitive
>>> user data there.)
>>>
>>> On Mon, Feb 5, 2024 at 11:32 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> I don't think adding this to the streaming flow (at micro level) will
>>>> be that useful
>>>>
>>>> However, this can be added to Spark UI as an enhancement to
>>>> the Streaming Query Statistics page.
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 6 Feb 2024 at 03:49, Raghu Angadi <raghu.ang...@databricks.com>
>>>> wrote:
>>>>
>>>>> Agree, the default behavior does not need to change.
>>>>>
>>>>> Neil, how about separating it into two sections:
>>>>>
>>>>>    - Actual rows in the sink (same as current output)
>>>>>    - Followed by metadata data
>>>>>
>>>>>

Reply via email to