Re: Enhanced Console Sink for Structured Streaming

2024-02-05 Thread Raghu Angadi
Agree, the default behavior does not need to change.

Neil, how about separating it into two sections:

   - Actual rows in the sink (same as current output)
   - Followed by metadata data


Re: [EXTERNAL] Re: Spark Kafka Rack Aware Consumer

2024-01-26 Thread Raghu Angadi
Overall the proposal to make this an option for Kafka source SGTM.
You can address the doc review and can send PR (in parallel or after the
review).
Note that currently executors cache client connection to Kafka and reuse
the connection and buffered records for next micro-batch.
Your proposal would ideally keep that affinity as well (both can be done).

On Fri, Jan 26, 2024 at 8:21 AM Schwager, Randall <
randall.schwa...@charter.com> wrote:

> Granted. Thanks for bearing with me. I’ve also opened up permissions to
> allow anyone with the link to edit the document. Thank you!
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Friday, January 26, 2024 at 09:19
> *To: *"Schwager, Randall" 
> *Cc: *"dev@spark.apache.org" 
> *Subject: *Re: [EXTERNAL] Re: Spark Kafka Rack Aware Consumer
>
>
>
> *CAUTION:* The e-mail below is from an external source. Please exercise
> caution before opening attachments, clicking links, or following guidance.
>
> Ok I made a request to access this document
>
> Thanks
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> 
>
> Ent
>
>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 26 Jan 2024 at 15:48, Schwager, Randall <
> randall.schwa...@charter.com> wrote:
>
> Hi Mich,
>
>
>
> Thanks for responding. In the JIRA issue, the design doc you’re referring
> to describes the prior work.
>
>
>
> This is the design doc for the proposed change:
> https://docs.google.com/document/d/1RoEk_mt8AUh9sTQZ1NfzIuuYKf1zx6BP1K3IlJ2b8iM/edit#heading=h.pbt6pdb2jt5c
>
>
>
> I’ll re-word the description to make that distinction more clear.
>
>
>
> Sincerely,
>
>
>
> Randall
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Friday, January 26, 2024 at 04:30
> *To: *"Schwager, Randall" 
> *Cc: *"dev@spark.apache.org" 
> *Subject: *[EXTERNAL] Re: Spark Kafka Rack Aware Consumer
>
>
>
> *CAUTION:* The e-mail below is from an external source. Please exercise
> caution before opening attachments, clicking links, or following guidance.
>
> Your design doc
>
> Structured Streaming Kafka Source - Design Doc - Google Docs
> 
>
>
>
> seems to be around since 2016. Reading the comments  it was decided not to
> progress with it. What has changed since then please?
>
>
>
> Are you implying if this  doc is still relevant?
>
>
>
> HTH
>
>
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 25 Jan 2024 at 20:10, Schwager, Randall <
> randall.schwa...@charter.com> wrote:
>
> Bump.
>
> Am I asking these questions in the wrong place? Or should I forego design
> input and just write the PR?
>
>
>
> *From: *"Schwager, Randall" 
> *Date: *Monday, January 22, 2024 at 17:02
> *To: *"dev@spark.apache.org" 
> *Subject: *Re: Spark Kafka Rack Aware Consumer
>
>
>
> Hello Spark Devs!
>
>
>
> After doing some detective work, I’d like to revisit this idea in earnest.
> My understanding now is that setting `client.rack` dynamically on the
> executor will do nothing. This is because the driver assigns Kafka
> partitions to executors. I’ve summarized a design to enable rack awareness
> and other location assignment patterns more generally in SPARK-46798
> .
>
>
>
> Since this is my first go at contributing to Spark, could I ask for a
> committer to help shepherd this JIRA issue along?
>
>
>
> Sincerely,
>
>
>
> Randall
>
>
>
> *From: *"Schwager, Randall" 
> *Date: *Wednesday, January 10, 2024 at 19:39
> *To: *"dev@spark.apache.org" 
> *Subject: *Spark Kafka Rack Aware Consumer
>
>
>
> Hello Spark Devs!
>
>
>
> Has there been discussion around adding the ability to dynamically set the
> ‘client.rack’ Kafka parameter at the executor?
>
> The Kafka SQL connector code on master doesn’t seem to support this
> feature. One can easily set the ‘client.rac

Re: [VOTE] SPIP: Structured Streaming - Arbitrary State API v2

2024-01-09 Thread Raghu Angadi
+1. This is a major improvement to the state API.

Raghu.

On Tue, Jan 9, 2024 at 1:42 AM Mich Talebzadeh 
wrote:

> +1 for me as well
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Jan 2024 at 03:24, Anish Shrigondekar
>  wrote:
>
>> Thanks Jungtaek for creating the Vote thread.
>>
>> +1 (non-binding) from my side too.
>>
>> Thanks,
>> Anish
>>
>> On Tue, Jan 9, 2024 at 6:09 AM Jungtaek Lim 
>> wrote:
>>
>>> Starting with my +1 (non-binding). Thanks!
>>>
>>> On Tue, Jan 9, 2024 at 9:37 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Hi all,

 I'd like to start the vote for SPIP: Structured Streaming - Arbitrary
 State API v2.

 References:

- JIRA ticket 
- SPIP doc

 
- Discussion thread


 Please vote on the SPIP for the next 72 hours:

 [ ] +1: Accept the proposal as an official SPIP
 [ ] +0
 [ ] -1: I don’t think this is a good idea because …

 Thanks!
 Jungtaek Lim (HeartSaVioR)

>>>


Re: [DISCUSS] SPIP: State Data Source - Reader

2023-10-18 Thread Raghu Angadi
+1 overall and a big +1 to keeping offline state-rebalancing as a primary
use case.

Raghu.

On Mon, Oct 16, 2023 at 11:25 AM Bartosz Konieczny 
wrote:

> Thank you, Jungtaek, for your answers! It's clear now.
>
> +1 for me. It seems like a prerequisite for further ops-related
> improvements for the state store management. I mean especially here the
> state rebalancing that could rely on this read+write state store API. I
> don't mean here the dynamic state rebalancing that could probably be
> implemented with a lower latency directly in the stateful API. Instead I'm
> thinking more of an offline job to rebalance the state and later restart
> the stateful pipeline with the changed number of shuffle partitions.
>
> Best,
> Bartosz.
>
> On Mon, Oct 16, 2023 at 6:19 PM Jungtaek Lim 
> wrote:
>
>> bump for better reach
>>
>> On Thu, Oct 12, 2023 at 4:26 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Sorry, please use this link instead for SPIP doc:
>>> https://docs.google.com/document/d/1_iVf_CIu2RZd3yWWF6KoRNlBiz5NbSIK0yThqG0EvPY/edit?usp=sharing
>>>
>>>
>>> On Thu, Oct 12, 2023 at 3:58 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Hi dev,

 I'd like to start a discussion on "State Data Source - Reader".

 This proposal aims to introduce a new data source "statestore" which
 enables reading the state rows from existing checkpoint via offline (batch)
 query. This will enable users to 1) create unit tests against stateful
 query verifying the state value (especially flatMapGroupsWithState), 2)
 gather more context on the status when an incident occurs, especially for
 incorrect output.

 *SPIP*:
 https://docs.google.com/document/d/1HjEupRv8TRFeULtJuxRq_tEG1Wq-9UNu-ctGgCYRke0/edit?usp=sharing
 *JIRA*: https://issues.apache.org/jira/browse/SPARK-45511

 Looking forward to your feedback!

 Thanks,
 Jungtaek Lim (HeartSaVioR)

 ps. The scope of the project is narrowed to the reader in this SPIP,
 since the writer requires us to consider more cases. We are planning on it.

>>>
>
> --
> Bartosz Konieczny
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>


Re: Watermark on late data only

2023-10-10 Thread Raghu Angadi
I like some way to expose watermarks to the user. It does affect
the processing of the records, so it is relevant for the users.
`current_watermark()` is a good option.
The implementation of this might be engine specific. But it is a very
relevant concept for authors of streaming pipelines.
Ideally I would like the engine to drop (or write to side output) even for
stateless pipelines for consistency.

On Tue, Oct 10, 2023 at 2:27 AM Bartosz Konieczny 
wrote:

> Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like
> a highly demanded feature from the end users, haven't seen that a lot on
> StackOverflow or mailing lists. I was just curious about the reasons.
>
> Using the arbitrary stateful processing could be indeed a workaround! But
> IMHO it would be easier to expose this watermark value from a function like
> a current_watermark() and let the users do anything with the data. And it
> wouldn't require having the state store overhead to deal with. The function
> could simplify implementing the *side output pattern* where we could
> process the on-time data differently from the late data, e.g. write late
> data to a dedicated space in the lake and facilitate the backfilling for
> the batch pipelines?
>
> With the current_watermark function it could be expressed as a simple:
>
> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>   dataframe.cache()
>   dataframe.filter(current_watermark() >
> event_time_from_datafarame).writeTo("late_data")
>   dataframe.filter(current_watermark() <=
> event_time_from_datafarame).writeTo("on_time_data")
> })
>
> A little bit as you can do with Apache Flink in fact:
>
> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>
> WDYT?
>
> Best,
> Bartosz.
>
> PS. Will be happy to contribute on that if the feature does make sense ;)
>
> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim 
> wrote:
>
>> Technically speaking, "late data" represents the data which cannot be
>> processed due to the fact the engine threw out the state associated with
>> the data already.
>>
>> That said, the only reason watermark does exist for streaming is to
>> handle stateful operators. From the engine's point of view, there is no
>> concept about "late data" for stateless query. It's something users have to
>> leverage "filter" by themselves, without relying on the value of watermark.
>> I guess someone may see some benefit of automatic tracking of trend for
>> event time and want to define late data based on the watermark even in
>> stateless query, but personally I don't hear about the request so far.
>>
>> As a workaround you can leverage flatMapGroupsWithState which provides
>> the value of watermark for you, but I'd agree it's too heavyweight just to
>> do this. If we see consistent demand on it, we could probably look into it
>> and maybe introduce a new SQL function (which works only on streaming -
>> that's probably a major blocker on introduction) on it.
>>
>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>> bartkoniec...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been analyzing the watermark propagation added in the 3.5.0
>>> recently and had to return to the basics of watermarks. One question is
>>> still unanswered in my head.
>>>
>>> Why are the watermarks reserved to stateful queries? Can't they apply to
>>> the filtering late date out only?
>>>
>>> The reason is only historical, as the initial design doc
>>> 
>>> mentions the aggregated queries exclusively? Or are there any technical
>>> limitations why writing the jobs like below don't drop late data
>>> automatically?
>>>
>>> import sparkSession.implicits._
>>> implicit val sparkContext = sparkSession.sqlContext
>>> val clicksStream = MemoryStream[Click]
>>> val clicksWithWatermark = clicksStream.toDF
>>>   .withWatermark("clickTime", "10 minutes")
>>> val query =
>>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>>   .start()
>>>
>>> clicksStream.addData(Seq(
>>>   Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>>   Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>>   Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>> ))
>>>
>>>
>>> query.processAllAvailable()
>>>
>>> clicksStream.addData(Seq(
>>>   Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>>   Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>>   Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>>   Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>> ))
>>> query.processAllAvailable()
>>>
>>> One quick implementation could be adding a new physical plan rule to the
>>> IncrementalExecution
>>> 
>>> for the Ev

Re: Spark 3.5 Branch Cut

2023-07-17 Thread Raghu Angadi
Thanks Yuanjian for accepting these for warmfix.

Raghu.

On Mon, Jul 17, 2023 at 1:04 PM Yuanjian Li  wrote:

> Hi, all
>
> FYI, I cut branch-3.5 as https://github.com/apache/spark/tree/branch-3.5
>
> Here is the complete list of exception merge requests received before the
> cut:
>
>-
>
>SPARK-44421: Reattach to existing execute in Spark Connect (server
>mechanism)
>-
>
>SPARK-44423:  Reattach to existing execute in Spark Connect (scala
>client)
>-
>
>SPARK-44424:  Reattach to existing execute in Spark Connect (python
>client)
>-
>
>Sub-tasks in epic SPARK-42938
>: Structured
>Streaming with Spark Connect
>-
>
>   SPARK-42944 : (Will mostly hit Monday deadline, just in case)
>   Python foreachBatch
>   -
>
>   SPARK-42941 : (WIP, but might slip Monday deadline): Python
>   streaming listener
>   -
>
>   SPARK-44400 : Improve session access in connect Scala
>   StreamingQueryListener
>   -
>
>   SPARK-44432 : Allow timeout of sessions when client disconnects and
>   terminate queries
>   -
>
>   SPARK-44433 : Improve termination logic for Python processes for
>   foreachBatch & query listener
>   -
>
>   SPARK-44434 : More Scala tests for foreachBatch & query listener
>   -
>
>   SPARK-44435 : More Python tests for foreachBatch & query listener
>   -
>
>   SPARK-44436 : Use Connect DataFrame for Scala foreachBatch in
>   Connect
>   -
>
>Sub-task in epic SPARK-43754
>: Spark Connect
>Session & Query lifecycle
>-
>
>   SPARK-44422: Fine grained interrupt in Spark Connect
>   -
>
>SPARK-43923: [CONNECT] Post listenerBus events during
>ExecutePlanRequest
>-
>
>SPARK-44394: Add a new Spark UI page for Spark Connect
>-
>
>SPARK-44262: JdbcUtils hardcodes some SQL statements
>-
>
>SPARK-38200: Spark JDBC Savemode Supports Upsert
>-
>
>SPARK-44396  :
>Direct Arrow Deserialization
>-
>
>SPARK-9  :
>Upcasting for Arrow Deserialization
>-
>
>SPARK-44450  : Make
>direct Arrow encoding work with SQL/API.
>
>
> Best,
>
> Yuanjian
>
>


Re: [Reminder] Spark 3.5 Branch Cut

2023-07-14 Thread Raghu Angadi
Thank you. We plan to get remaining major pieces for Streaming Spark
Connect (Epic SPARK-42938
).
I would like to request a warmfix exception for the following tweaks and
improvements over the next two weeks (all in the same epic).

   - SPARK-42944 : (Will mostly hit Monday deadline, just in case) Python
   foreachBatch
   - SPARK-42941 : (WIP, but might slip Monday deadline): Python streaming
   listener
   - SPARK-44400 : Improve session access in connect Scala
   StreamingQueryListener
   - SPARK-44432 : Allow timeout of sessions when client disconnects and
   terminate queries
   - SPARK-44433 : Improve termination logic for Python processes for
   foreachBatch & query listener
   - SPARK-44434 : More Scala tests for foreachBatch & query listener
   - SPARK-44435 : More Python tests for foreachBatch & query listener
   - SPARK-44436 : Use Connect DataFrame for Scala foreachBatch in Connect


On Fri, Jul 14, 2023 at 10:33 AM Yuanjian Li  wrote:

> Hi everyone,
> As discussed earlier in "Time for Spark v3.5.0 release", I will cut
> branch-3.5 on *Monday, July 17th at 1 pm PST* as scheduled.
>
> Please plan your PR merge accordingly with the given timeline. Currently,
> we have received the following exception merge requests:
>
>- SPARK-44421: Reattach to existing execute in Spark Connect (server
>mechanism)
>- SPARK-44423:  Reattach to existing execute in Spark Connect (scala
>client)
>- SPARK-44424:  Reattach to existing execute in Spark Connect (python
>client)
>
> If there are any other exception feature requests, please reply to this
> email. We will not merge any new features in 3.5 after the branch cut.
>
> Best,
> Yuanjian
>


Re: Time for Spark v3.5.0 release

2023-07-14 Thread Raghu Angadi
We have a bunch of work in progress for Spark Connect trying to meet the
branch cut deadline.

Moving to 17th is certainly welcome.

Is it feasible to extend it by a couple of more days?
Alternatively, we could have a relaxed warmfix process for Spark Connect
code for a week or two since it does not affect core Spark.

Thank you.
Raghu.

On Tue, Jul 4, 2023 at 3:42 PM Xinrong Meng  wrote:

> +1
>
> Thank you!
>
> On Tue, Jul 4, 2023 at 3:04 PM Jungtaek Lim 
> wrote:
>
>> +1
>>
>> On Wed, Jul 5, 2023 at 2:23 AM L. C. Hsieh  wrote:
>>
>>> +1
>>>
>>> Thanks Yuanjian.
>>>
>>> On Tue, Jul 4, 2023 at 7:45 AM yangjie01  wrote:
>>> >
>>> > +1
>>> >
>>> >
>>> >
>>> > 发件人: Maxim Gekk 
>>> > 日期: 2023年7月4日 星期二 17:24
>>> > 收件人: Kent Yao 
>>> > 抄送: "dev@spark.apache.org" 
>>> > 主题: Re: Time for Spark v3.5.0 release
>>> >
>>> >
>>> >
>>> > +1
>>> >
>>> > On Tue, Jul 4, 2023 at 11:55 AM Kent Yao  wrote:
>>> >
>>> > +1, thank you
>>> >
>>> > Kent
>>> >
>>> > On 2023/07/04 05:32:52 Dongjoon Hyun wrote:
>>> > > +1
>>> > >
>>> > > Thank you, Yuanjian
>>> > >
>>> > > Dongjoon
>>> > >
>>> > > On Tue, Jul 4, 2023 at 1:03 AM Hyukjin Kwon 
>>> wrote:
>>> > >
>>> > > > Yeah one day postponed shouldn't be a big deal.
>>> > > >
>>> > > > On Tue, Jul 4, 2023 at 7:10 AM Yuanjian Li 
>>> wrote:
>>> > > >
>>> > > >> Hi All,
>>> > > >>
>>> > > >> According to the Spark versioning policy at
>>> > > >> https://spark.apache.org/versioning-policy.html, should we cut
>>> > > >> *branch-3.5* on *July 17th, 2023*? (We initially proposed January
>>> 16th,
>>> > > >> but since it's a Sunday, I suggest we postpone it by one day).
>>> > > >>
>>> > > >> I would like to volunteer as the release manager for Apache Spark
>>> 3.5.0.
>>> > > >>
>>> > > >> Best,
>>> > > >> Yuanjian
>>> > > >>
>>> > > >
>>> > >
>>> >
>>> > -
>>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>