[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822722#comment-17822722 ] Pavan Kotikalapudi commented on SPARK-24815: Thanks a lot for mentoring and driving this effort Mich. As you suggested I will update the benefits and challenges in the SPIP doc. That can outline the scope of the current work and possibility of any future work for other use cases. Re: > Pluggable Dynamic Allocation , Separate Algorithm for Structured Streaming I really like the idea. I started off with that but limited it to only core module as it serves at primitive level of evaluation (that current dra is already doing). but this idea is better as you said design wise and also for different kinds of workloads. > Warning for Enabled Core Dynamic Allocation Right now we need normal DRA because structured streaming DRA is built on top of it. I have added another flag `spark.dynamicAllocation.streaming.enabled` so that this particular pieces of streaming algo would kick in on top of traditional DRA. This approach also makes it backwards compatible especially when users have to upgrade spark. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > Labels: pull-request-available > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17821433#comment-17821433 ] Mich Talebzadeh commented on SPARK-24815: - some thoughts on this if I may This enhancement request provides a solid foundation for improving dynamic allocation in Structured Streaming. Adding more specific details, outlining potential benefits, and addressing potential challenges can further strengthen the proposal and increase its chances of being implemented. So these are my thoughts: - Pluggable Dynamic Allocation: This suggestion shows good design principles, allowing for flexibility and future improvements. We should elaborate benefits of a pluggable approach, like customization and integration with external resource management tools. - Separate Algorithm for Structured Streaming: This is crucial for adapting allocation strategies to the unique nature of streaming workloads Also outlining how a separate algorithm might differ from the batch counterpart could be useful - Warning for Enabled Core Dynamic Allocation: This is a valuable warning to prevent accidental misuse and raise awareness among users. Also consider suggesting the warning level (e.g. info, warning, error) and potential content to provide clarity. - Briefly mention potential challenges or trade-offs associated with implementing these proposals. Suggesting relevant discussions, resources, or alternative approaches could strengthen the request for enhancement > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > Labels: pull-request-available > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820915#comment-17820915 ] Mich Talebzadeh commented on SPARK-24815: - Now that the ticket is reopened let us review the submitted documents. This has got 6 votes for now. I volunteered to mentor it until a committer comes forward to it. Hope this helps to speed up the process and time to delivery. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > Labels: pull-request-available > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17763940#comment-17763940 ] Krystal Mitchell commented on SPARK-24815: -- Thank you [~pavan0831]. This draft PR will impact some of the projects we are currently working on. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > Labels: pull-request-available > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751237#comment-17751237 ] Pavan Kotikalapudi commented on SPARK-24815: Here is the draft PR with initial implementation [https://github.com/apache/spark/pull/42352] and the design doc: [https://docs.google.com/document/d/1_YmfCsQQb9XhRdKh0ijbc-j8JKGtGBxYsk_30NVSTWo/edit?usp=sharing |https://docs.google.com/document/d/1_YmfCsQQb9XhRdKh0ijbc-j8JKGtGBxYsk_30NVSTWo/edit?usp=sharing]. Thanks for the review :) > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17741449#comment-17741449 ] Sandish Kumar HN commented on SPARK-24815: -- [~pavan0831] I'm excited to see what you come up with. You can create a pull request with the design details, or you can attach a Google design doc to the pull request. Either way, I'm looking forward to reviewing your work. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17741448#comment-17741448 ] Pavan Kotikalapudi commented on SPARK-24815: Hi, I added some features to traditional DRA to work for structured streaming usecase based on the heuristics of trigger interval. It has been working as expected for the last few months in my company. I would like to contribute back to the community, Should I directly send a pull request for review or a document explaining how the dra is modified for structured streaming usecase? > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564614#comment-17564614 ] Santokh Singh commented on SPARK-24815: --- Pretty much interested in this feature. With {{mapGroupsWithState}} api in structured streaming, or generic state management and sharing state across executors, would externalizing state help? I am aware rocksDB being one way. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553867#comment-17553867 ] Ramiz Mehran commented on SPARK-24815: -- Guys, is this thread still alive? I think SSS for structure-streaming should be taken from spark-streaming itself. The logic of "processing/batch duration ratio" makes sense and removes any other dependency from the calculation. Also, there should be a moving average to calculate and this moving average batch count can be configurable. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394265#comment-17394265 ] Holden Karau commented on SPARK-24815: -- cc [~tdas] for thoughts? > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263876#comment-17263876 ] Rajat Goel commented on SPARK-24815: Is there any updates/progress on this feature especially for cases where Kafka is input source? > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160820#comment-17160820 ] Gerard Alexander commented on SPARK-24815: -- Thanks, but I get that; but it is hard to follow here. That article states SSS is being run with dynamic resources. But if I click thru here and read the comments I get the impression there in no batch algorithm support either for SSS. What is the status? > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160818#comment-17160818 ] Jungtaek Lim commented on SPARK-24815: -- I don't think the goal of the issue is only about totally idle vs query running. The goal is to reduce down the necessary resource if there's less input data being ingested and bring up more if there's more input data being ingested, smoothly. If the input data is ingested from the real product most probably it should have peak time for the day or expected/unexpected peak on some events. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160816#comment-17160816 ] Gerard Alexander commented on SPARK-24815: -- When I read this, I am not sure what is being said. This [https://dzone.com/articles/spark-dynamic-allocation] states and shows that Spark Structured Streaming does this capability of relinquishing or getting new resources up till maximum setting that has been set with the job. Can you clarify please? The issue seems to be that the batch approach is applied to a non-batch situation. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853955#comment-16853955 ] Stavros Kontopoulos commented on SPARK-24815: - Yes its 1-1. What I am trying to say is that if you have N Spark tasks/partitions processed in parallel and you want to do dynamic re-partitioning (Spark side) because these N tasks got "fatter" you need the processing/batch_duration ratio metric, as your backlog task number is zero anyway, and you have no idea otherwise what is happening (unless you count the number of offsets/partition). After re-partitioning is done at Spark side you can fallback to the backlog metric (as now you will have tasks queued), but no reason, you could just keep the processing/batch_duration ratio. Regarding references there was a discussion last year: [http://apache-spark-developers-list.1001551.n3.nabble.com/Discussion-Clarification-regarding-Stateful-Aggregations-over-Structured-Streaming-td25941.html|http://apache-spark-developers-list.1001551.n3.nabble.com/Discussion-Clarification-regarding-Stateful-Aggregations-over-Structured-Streaming-td25941.html#a25942] There is this project that shows how the APIs can be extended: [https://github.com/chermenin/spark-states/tree/master/src/main/scala/ru/chermenin/spark/sql/execution/streaming/state.] In general the source of truth is the code as you know. The related Jira for the state backend is here: https://issues.apache.org/jira/browse/SPARK-13809 and related doc is here: https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254 > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853745#comment-16853745 ] Karthik Palaniappan commented on SPARK-24815: - Just to clarify: does Spark allow having multiple tasks per Kafka partition? This doc implies that they are 1:1: [https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html]. Batch dynamic allocation gives you enough executors to process all tasks in running stage in parallel. I assume you end up with too much data per partition, your only recourse would be to increase the number of kafka partitions. Also, does Spark not handle state rebalancing today? In other words, is SS already fault tolerant to node failures? Do you have any good references (JIRAs, design docs) on how Spark stores streaming state internally? I see plenty of blogs and articles on how to do stateful processing, but not on how it works under the hood. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852874#comment-16852874 ] Stavros Kontopoulos commented on SPARK-24815: - @[~Karthik Palaniappan] I can help with the design. Btw there is a refactoring happen in here: [https://github.com/apache/spark/pull/24704] My concern with batch mode dynamic allocation is task list may not tell the whole story, what if the number of tasks stays the same and load changes per task/partition eg. Kafka source? As for state I think you need to rebalance it which translates to dynamic re-partitioning for the micro-batch mode in structured streaming. For continuous streaming it is harder I think, but maybe a unified approach could solve it for both batch and continuous streaming as in Flink, https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852541#comment-16852541 ] Karthik Palaniappan commented on SPARK-24815: - I was starting to update the JIRA description with a problem statement, then realized I am unfamiliar with some of the challenges you guys mentioned in the comments, in particular how state is managed in structured streaming. I was imagining that processing rate was the correct heuristic, assuming the goal is to keep up with the input. Continuous processing seems to solve the separate case where you need ultra low latency. [~skonto] [~kabhwan] [~gsomogyi] if you guys help with a design, I'd be happy to help with the implementation, but for now I will drop this JIRA. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > For batch jobs, dynamic allocation is very useful for adding and removing > containers to match the actual workload. On multi-tenant clusters, it ensures > that a Spark job is taking no more resources than necessary. In cloud > environments, it enables autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, the batch dynamic allocation algorithm kicks in. It requests > more executors if the task backlog is a certain size, and removes executors > if they idle for a certain period of time. > Quick thoughts: > 1) Dynamic allocation should be pluggable, rather than hardcoded to a > particular implementation in SparkContext.scala (this should be a separate > JIRA). > 2) We should make a structured streaming algorithm that's separate from the > batch algorithm. Eventually, continuous processing might need its own > algorithm. > 3) Spark should print a warning if you run a structured streaming job when > Core's dynamic allocation is enabled -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850879#comment-16850879 ] Stavros Kontopoulos commented on SPARK-24815: - I agree with [~gsomogyi] on stating the problem first. For me state management and the heuristics applied are the major concerns. I described one case on dev list. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > Dynamic allocation is very useful for adding and removing containers to match > the actual workload. On multi-tenant clusters, it ensures that a Spark job is > taking no more resources than necessary. In cloud environments, it enables > autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, Core's dynamic allocation algorithm kicks in. It requests > executors if the task backlog is a certain size, and remove executors if they > idle for a certain period of time. > This does not make sense for streaming jobs, as outlined in > https://issues.apache.org/jira/browse/SPARK-12133, which introduced dynamic > allocation for the old streaming API. > First, Spark should print a warning if you run a structured streaming job > when Core's dynamic allocation is enabled > Second, structured streaming should have support for dynamic allocation. It > would be convenient if it were the same set of properties as Core's dynamic > allocation, but I don't have a strong opinion on that. > If somebody can give me pointers on how to add dynamic allocation support, > I'd be happy to take a stab. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850596#comment-16850596 ] Gabor Somogyi commented on SPARK-24815: --- {quote}If somebody can give me pointers on how to add dynamic allocation support, I'd be happy to take a stab.{quote} Keen on see what's the problem first because the mentioned 3 bullet points in https://issues.apache.org/jira/browse/SPARK-12133 are not true for Structured Streaming. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > Dynamic allocation is very useful for adding and removing containers to match > the actual workload. On multi-tenant clusters, it ensures that a Spark job is > taking no more resources than necessary. In cloud environments, it enables > autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, Core's dynamic allocation algorithm kicks in. It requests > executors if the task backlog is a certain size, and remove executors if they > idle for a certain period of time. > This does not make sense for streaming jobs, as outlined in > https://issues.apache.org/jira/browse/SPARK-12133, which introduced dynamic > allocation for the old streaming API. > First, Spark should print a warning if you run a structured streaming job > when Core's dynamic allocation is enabled > Second, structured streaming should have support for dynamic allocation. It > would be convenient if it were the same set of properties as Core's dynamic > allocation, but I don't have a strong opinion on that. > If somebody can give me pointers on how to add dynamic allocation support, > I'd be happy to take a stab. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850235#comment-16850235 ] Jungtaek Lim commented on SPARK-24815: -- I'm also interested on the design doc, as I'd like to see whether the fact is considered as well: there're some points which dynamic allocation could hurt the performance of streaming query. Executors running structured streaming query is stateful, at least for stateful queries, and query leveraging Kafka source. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > Dynamic allocation is very useful for adding and removing containers to match > the actual workload. On multi-tenant clusters, it ensures that a Spark job is > taking no more resources than necessary. In cloud environments, it enables > autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, Core's dynamic allocation algorithm kicks in. It requests > executors if the task backlog is a certain size, and remove executors if they > idle for a certain period of time. > This does not make sense for streaming jobs, as outlined in > https://issues.apache.org/jira/browse/SPARK-12133, which introduced dynamic > allocation for the old streaming API. > First, Spark should print a warning if you run a structured streaming job > when Core's dynamic allocation is enabled > Second, structured streaming should have support for dynamic allocation. It > would be convenient if it were the same set of properties as Core's dynamic > allocation, but I don't have a strong opinion on that. > If somebody can give me pointers on how to add dynamic allocation support, > I'd be happy to take a stab. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850137#comment-16850137 ] Stavros Kontopoulos commented on SPARK-24815: - That is great news [~Karthik Palaniappan], I was thinking the same but missed this ticket. So please make sure you add a section with current drawbacks of using the batch mode dynamic allocation and why this feature is needed. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > Dynamic allocation is very useful for adding and removing containers to match > the actual workload. On multi-tenant clusters, it ensures that a Spark job is > taking no more resources than necessary. In cloud environments, it enables > autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, Core's dynamic allocation algorithm kicks in. It requests > executors if the task backlog is a certain size, and remove executors if they > idle for a certain period of time. > This does not make sense for streaming jobs, as outlined in > https://issues.apache.org/jira/browse/SPARK-12133, which introduced dynamic > allocation for the old streaming API. > First, Spark should print a warning if you run a structured streaming job > when Core's dynamic allocation is enabled > Second, structured streaming should have support for dynamic allocation. It > would be convenient if it were the same set of properties as Core's dynamic > allocation, but I don't have a strong opinion on that. > If somebody can give me pointers on how to add dynamic allocation support, > I'd be happy to take a stab. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24815) Structured Streaming should support dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-24815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849968#comment-16849968 ] Karthik Palaniappan commented on SPARK-24815: - Since there's discussion about this FR on the dev list ([https://lists.apache.org/thread.html/c1854305794596d13c2c560762c0031c57a90519ddae51488736d314@%3Cdev.spark.apache.org%3E]), I'll prioritize working on it and attach a design doc in the next few weeks. > Structured Streaming should support dynamic allocation > -- > > Key: SPARK-24815 > URL: https://issues.apache.org/jira/browse/SPARK-24815 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Structured Streaming >Affects Versions: 2.3.1 >Reporter: Karthik Palaniappan >Priority: Minor > > Dynamic allocation is very useful for adding and removing containers to match > the actual workload. On multi-tenant clusters, it ensures that a Spark job is > taking no more resources than necessary. In cloud environments, it enables > autoscaling. > However, if you set spark.dynamicAllocation.enabled=true and run a structured > streaming job, Core's dynamic allocation algorithm kicks in. It requests > executors if the task backlog is a certain size, and remove executors if they > idle for a certain period of time. > This does not make sense for streaming jobs, as outlined in > https://issues.apache.org/jira/browse/SPARK-12133, which introduced dynamic > allocation for the old streaming API. > First, Spark should print a warning if you run a structured streaming job > when Core's dynamic allocation is enabled > Second, structured streaming should have support for dynamic allocation. It > would be convenient if it were the same set of properties as Core's dynamic > allocation, but I don't have a strong opinion on that. > If somebody can give me pointers on how to add dynamic allocation support, > I'd be happy to take a stab. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org