Re: [DISCUSS] SPIP: Structured Streaming - Arbitrary State API v2

2024-01-05 Thread Burak Yavuz
I'm also a +1 on the newer APIs. We had a lot of learnings from using
flatMapGroupsWithState and I believe that we can make the APIs a lot easier
to use.

On Wed, Nov 29, 2023 at 6:43 PM Anish Shrigondekar
 wrote:

> Hi dev,
>
> Addressed the comments that Jungtaek had on the doc. Bumping the thread
> once again to see if other folks have any feedback on the proposal.
>
> Thanks,
> Anish
>
> On Mon, Nov 27, 2023 at 8:15 PM Jungtaek Lim 
> wrote:
>
>> Kindly bump for better reach after the long holiday. Please kindly review
>> the proposal which opens the chance to address complex use cases of
>> streaming. Thanks!
>>
>> On Thu, Nov 23, 2023 at 8:19 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Thanks Anish for proposing SPIP and initiating this thread! I believe
>>> this SPIP will help a bunch of complex use cases on streaming.
>>>
>>> dev@: We are coincidentally initiating this discussion in thanksgiving
>>> holidays. We understand people in the US may not have time to review the
>>> SPIP, and we plan to bump this thread in early next week. We are open for
>>> any feedback from non-US during the holiday. We can either address feedback
>>> altogether after the holiday (Anish is in the US) or I can answer if the
>>> feedback is more about the question. Thanks!
>>>
>>> On Thu, Nov 23, 2023 at 5:27 AM Anish Shrigondekar <
>>> anish.shrigonde...@databricks.com> wrote:
>>>
 Hi dev,

 I would like to start a discussion on "Structured Streaming - Arbitrary
 State API v2". This proposal aims to address a bunch of limitations we see
 today using mapGroupsWithState/flatMapGroupsWithState operator. The
 detailed set of limitations is described in the SPIP doc.

 We propose to support various features such as multiple state variables
 (flexible data modeling), composite types, enhanced timer functionality,
 support for chaining operators after new operator, handling initial state
 along with state data source, schema evolution etc This will allow users to
 write more powerful streaming state management logic primarily used in
 operational use-cases. Other built-in stateful operators could also benefit
 from such changes in the future.

 JIRA: https://issues.apache.org/jira/browse/SPARK-45939
 SPIP:
 https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig/edit?usp=sharing
 Design Doc:
 https://docs.google.com/document/d/1QjZmNZ-fHBeeCYKninySDIoOEWfX6EmqXs2lK097u9o/edit?usp=sharing

 cc - @Jungtaek Lim   who has graciously
 agreed to be the shepherd for this project

 Looking forward to your feedback !

 Thanks,
 Anish

>>>


Re: Dynamic resource allocation for structured streaming [SPARK-24815]

2024-01-05 Thread Mich Talebzadeh
Hi Pavan,

Thanks for your answers.

Given these responses , it seems like you have already taken a
comprehensive approach to address the challenges associated with dynamic
scaling in Spark Structured Streaming. IMO, It would also be beneficial to
engage with other members as well, or gather additional feedback and
perspectives, especially from those with experience in dynamic resource
allocation in Spark. Having said that, the discussion above demonstrates a
good understanding of the challenges involved in enhancing Spark Structured
Streaming resource management capabilities.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 5 Jan 2024 at 13:43, Pavan Kotikalapudi 
wrote:

> Hi Mich,
>
> As always thanks for looking keenly on the design, really appreciate your
> inputs on this Ticket. Would love to improve this further and cover more
> edge-cases if any.
>
> I can answer the concerns you have below. I believe I have covered some of
> them in the proposal, If at all I missed out on anything.
>
>
>- Implementation Complexity: Integrating dynamic scaling into Spark's
>resource management framework requires careful design and implementation to
>ensure effectiveness and stability.
>I have drafted a PR with initial implementation
>https://github.com/apache/spark/pull/42352
>
> ,
>made sure that we just utilize Spark's stable resource management
>framework of batch jobs and extended it to work for our streaming
>use-cases. As structured streaming is a micro-batch at the lowest level, I
>tuned the scaling actions based on micro-batches.
>Would appreciate it if anybody in the dev community who has worked on
>dynamic resource allocation (DRA) implementation can take a look at this as
>well.
>
>- Heuristic Accuracy: This proposal effectiveness depends heavily on
>the accuracy of the trigger interval heuristics used to guide scaling
>decisions.
>Yes. Though the scaling guidelines of the app are determined by the
>trigger interval, The guidelines will just provide values to the
>request/remove policy of the already existing DRA solution
>
> .
>
>The current dra is targeted towards batch use cases; it will
>constantly scale out/back per stage of the job. That makes it unstable for
>streaming jobs. I have tweaked it to scale by micro-batches. That said, I
>am still looking for any suggestions on other stats which will be helpful
>in effective scaling of the streaming apps
>
>- Overhead: Monitoring and scaling processes themselves introduce some
>overhead, which needs to be balanced against the potential performance
>gains. For example, how we can utilise Input Rate, process rate and 
> Operation
>Duration from Streaming Query Statistics page etc
>We already have all of the events in the Listener Bus spark framework.
>We are making sure we don't add anything more to the framework but rather
>just consume that information to scale. So the solution shouldn't
>compromise any performance, it will definitely yield better resource
>utilization for uneven traffic patterns of the day.
>Regarding the utilization of `Streaming Query Statistics`, it would
>fall under the spark-sql sub-module of the project which will steer towards
>creating a new algorithm in that module separate from current DRA
>implementation. Since the current design doesn't require any of those stats
>I kept it to the core module stats, but if other stats like input rate will
>help in building better scaling accuracy would definitely look into it.
>
>- We ought to consider the potential impact on latency. Scaling
>operations, especially scaling up, may introduce some latency. Ensuring
>minimal impact on the processing time is crucial
>Since structured streaming apps tend to be latency sensitive at times
>the scaling algorithm aggressively scales to add more resources. The scale
>out happens usually when the processing rate of a micro-batch increases, so
>the scale out should help reduce the processing time ( or keep it in check)
>and the latency of the events passing through.
>
>- 

Re: [DISCUSS] SPIP: Structured Streaming - Arbitrary State API v2

2024-01-05 Thread Shixiong Zhu
+1. Looking forward to seeing how the new API brings in new streaming use
cases!

Best Regards,
Shixiong Zhu


On Wed, Nov 29, 2023 at 6:42 PM Anish Shrigondekar
 wrote:

> Hi dev,
>
> Addressed the comments that Jungtaek had on the doc. Bumping the thread
> once again to see if other folks have any feedback on the proposal.
>
> Thanks,
> Anish
>
> On Mon, Nov 27, 2023 at 8:15 PM Jungtaek Lim 
> wrote:
>
>> Kindly bump for better reach after the long holiday. Please kindly review
>> the proposal which opens the chance to address complex use cases of
>> streaming. Thanks!
>>
>> On Thu, Nov 23, 2023 at 8:19 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Thanks Anish for proposing SPIP and initiating this thread! I believe
>>> this SPIP will help a bunch of complex use cases on streaming.
>>>
>>> dev@: We are coincidentally initiating this discussion in thanksgiving
>>> holidays. We understand people in the US may not have time to review the
>>> SPIP, and we plan to bump this thread in early next week. We are open for
>>> any feedback from non-US during the holiday. We can either address feedback
>>> altogether after the holiday (Anish is in the US) or I can answer if the
>>> feedback is more about the question. Thanks!
>>>
>>> On Thu, Nov 23, 2023 at 5:27 AM Anish Shrigondekar <
>>> anish.shrigonde...@databricks.com> wrote:
>>>
 Hi dev,

 I would like to start a discussion on "Structured Streaming - Arbitrary
 State API v2". This proposal aims to address a bunch of limitations we see
 today using mapGroupsWithState/flatMapGroupsWithState operator. The
 detailed set of limitations is described in the SPIP doc.

 We propose to support various features such as multiple state variables
 (flexible data modeling), composite types, enhanced timer functionality,
 support for chaining operators after new operator, handling initial state
 along with state data source, schema evolution etc This will allow users to
 write more powerful streaming state management logic primarily used in
 operational use-cases. Other built-in stateful operators could also benefit
 from such changes in the future.

 JIRA: https://issues.apache.org/jira/browse/SPARK-45939
 SPIP:
 https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig/edit?usp=sharing
 Design Doc:
 https://docs.google.com/document/d/1QjZmNZ-fHBeeCYKninySDIoOEWfX6EmqXs2lK097u9o/edit?usp=sharing

 cc - @Jungtaek Lim   who has graciously
 agreed to be the shepherd for this project

 Looking forward to your feedback !

 Thanks,
 Anish

>>>


Re: Dynamic resource allocation for structured streaming [SPARK-24815]

2024-01-05 Thread Pavan Kotikalapudi
Hi Mich,

As always thanks for looking keenly on the design, really appreciate your
inputs on this Ticket. Would love to improve this further and cover more
edge-cases if any.

I can answer the concerns you have below. I believe I have covered some of
them in the proposal, If at all I missed out on anything.


   - Implementation Complexity: Integrating dynamic scaling into Spark's
   resource management framework requires careful design and implementation to
   ensure effectiveness and stability.
   I have drafted a PR with initial implementation
   https://github.com/apache/spark/pull/42352
   
,
   made sure that we just utilize Spark's stable resource management
   framework of batch jobs and extended it to work for our streaming
   use-cases. As structured streaming is a micro-batch at the lowest level, I
   tuned the scaling actions based on micro-batches.
   Would appreciate it if anybody in the dev community who has worked on
   dynamic resource allocation (DRA) implementation can take a look at this as
   well.

   - Heuristic Accuracy: This proposal effectiveness depends heavily on the
   accuracy of the trigger interval heuristics used to guide scaling decisions.
   Yes. Though the scaling guidelines of the app are determined by the
   trigger interval, The guidelines will just provide values to the
   request/remove policy of the already existing DRA solution
   
.

   The current dra is targeted towards batch use cases; it will constantly
   scale out/back per stage of the job. That makes it unstable for streaming
   jobs. I have tweaked it to scale by micro-batches. That said, I am still
   looking for any suggestions on other stats which will be helpful in
   effective scaling of the streaming apps

   - Overhead: Monitoring and scaling processes themselves introduce some
   overhead, which needs to be balanced against the potential performance
   gains. For example, how we can utilise Input Rate, process rate and
Operation
   Duration from Streaming Query Statistics page etc
   We already have all of the events in the Listener Bus spark framework.
   We are making sure we don't add anything more to the framework but rather
   just consume that information to scale. So the solution shouldn't
   compromise any performance, it will definitely yield better resource
   utilization for uneven traffic patterns of the day.
   Regarding the utilization of `Streaming Query Statistics`, it would fall
   under the spark-sql sub-module of the project which will steer towards
   creating a new algorithm in that module separate from current DRA
   implementation. Since the current design doesn't require any of those stats
   I kept it to the core module stats, but if other stats like input rate will
   help in building better scaling accuracy would definitely look into it.

   - We ought to consider the potential impact on latency. Scaling
   operations, especially scaling up, may introduce some latency. Ensuring
   minimal impact on the processing time is crucial
   Since structured streaming apps tend to be latency sensitive at times
   the scaling algorithm aggressively scales to add more resources. The scale
   out happens usually when the processing rate of a micro-batch increases, so
   the scale out should help reduce the processing time ( or keep it in check)
   and the latency of the events passing through.

   - Implementing mechanisms for graceful scaling operations, avoiding
   abrupt changes, can contribute to a smoother user experience.
   Totally! While the scale-out is aggressive, the scale-back typically
   involves a de-allocation ratio(0-100%) and time-out period so that only few
   resources leave the application executor pool. This also keeps the
   processing times pretty stable throughout the day.


Please let me know if there are any more concerns, I will also add these at
the end of the doc for future reference.

Thank you,

Pavan


On Wed, Jan 3, 2024 at 4:58 AM Mich Talebzadeh 
wrote:

> Hi Pavan,
>
> Thanks for putting this request forward.
>
> I am generally supportive of it. In a nutshell, I believe this proposal
> can potentially hold a significant promise for optimizing resource
> utilization and enhancing performance in Spark Structured Streaming.
>
> Having said that there are potential Challenges and Considerations from
> my experience of Spark Structured Streaming (SSS), if I summarise
>
>- Implementation Complexity: Integrating dynamic scaling into Spark's
>resource management framework requires careful design and implementation to
>ensure effectiveness and stability.
>- Heuristic Accuracy: This proposal effectiveness depends heavily on
>the accuracy of the trigger interval heuristics