You can probably tune writing to elastic search by 1. Increasing number of partitions so you are writing smaller batches of rows to elastic search 2. Using Elastic search’s bulk api 3. Scaling up the number of hot nodes on elastic search cluster to support writing in parallel.
You want to minimize long running tasks. Not just to avoid the “thread dump”. Large number of shorter running tasks are better than Small number of long running tasks, because you can scale up your processing by throwing hardware at it. This is subject to law of diminishing returns; ie; at some point making your tasks smaller will start slowing you down. You need to find the sweet spot. Generally for elastic search, the sweet spot is that each task writes around 10MB of data using the bulk API. Writing 10MB of data per task should be take order of few seconds. You won’t get the dreaded thread dump if your tasks are taking few seconds From: Maksim Grinman <m...@resolute.ai> Date: Thursday, February 10, 2022 at 7:21 PM To: "Lalwani, Jayesh" <jlalw...@amazon.com> Cc: Mich Talebzadeh <mich.talebza...@gmail.com>, Holden Karau <hol...@pigscanfly.ca>, Sean Owen <sro...@gmail.com>, "user @spark" <user@spark.apache.org> Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. That's fair, but I do get the same thread dump at the last step of the spark job, where we write the final dataframe into an elasticsearch index. It's a df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation which takes a while and we usually get a thread dump during that as well. On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh <jlalw...@amazon.com<mailto:jlalw...@amazon.com>> wrote: Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling your executors? I am guessing that the size of your autoscaling group is proportional to the number of Spark executors. You can probably measure how many executors each can support. Then you can tie in the size of your autoscaling group to the number of executors. Alternatively, you can build your service so a) it autoscales as load increases b) throttle requests when the load is higher than it can manage now. This means that when Spark executors start hitting your nodes, your service will throttle many of the requests, and start autoscaling up. Note that this is an established pattern in the cloud. This is how most services on AWS work. The end result is that initially there will be higher latency due to cold start, but the system will catch up eventually From: Maksim Grinman <m...@resolute.ai<mailto:m...@resolute.ai>> Date: Friday, February 4, 2022 at 9:35 PM To: Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> Cc: Holden Karau <hol...@pigscanfly.ca<mailto:hol...@pigscanfly.ca>>, Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>>, "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Not that this discussion is not interesting (it is), but this has strayed pretty far from my original question. Which was: How do I prevent spark from dumping huge Java Full Thread dumps when an executor appears to not be doing anything (in my case, there's a loop where it sleeps waiting for a service to come up). The service happens to be set up using an auto-scaling group, a coincidental and unimportant detail that seems to have derailed the conversation. On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote: OK basically, do we have a scenario where Spark or for that matter any cluster manager can deploy a new node (after the loss of an existing node) with the view of running the failed tasks on the new executor(s) deployed on that newly spun node? Error! Filename not specified. view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 5 Feb 2022 at 00:00, Holden Karau <hol...@pigscanfly.ca<mailto:hol...@pigscanfly.ca>> wrote: We don’t block scaling up after node failure in classic Spark if that’s the question. On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote: From what I can see in auto scaling setup, you will always need a min of two worker nodes as primary. It also states and I quote "Scaling primary workers is not recommended due to HDFS limitations which result in instability while scaling. These limitations do not exist for secondary workers". So the scaling comes with the secondary workers specifying the min and max instances. It also defaults to 2 minutes for the so-called auto scaling cooldown duration hence that delay observed. I presume task allocation to the new executors is FIFO for new tasks. This link<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does some explanation on autoscaling. Handling Spot Node Loss and Spot Blocks in Spark Clusters "When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) notification from the RM, it notifies the Spark driver. The driver then performs the following actions: 1. Identifies all the executors affected by the upcoming node loss. 2. Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors. 3. Kills all the executors after reaching 50% of the termination time. 4. Starts the failed tasks (if any) on other executors. 5. For these nodes, it removes all the entries of the shuffle data from the map output tracker on driver after reaching 90% of the termination time. This helps in preventing the shuffle-fetch failures due to spot loss. 6. Recomputes the shuffle data from the lost node by stage resubmission and at the time shuffles data of spot node if required." 1. 2. So basically when a node fails classic spark comes into play and no new nodes are added etc (no rescaling) and tasks are redistributed among the existing executors as I read it? Error! Filename not specified. view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 4 Feb 2022 at 13:55, Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>> wrote: I have not seen stack traces under autoscaling, so not even sure what the error in question is. There is always delay in acquiring a whole new executor in the cloud as it usually means a new VM is provisioned. Spark treats the new executor like any other, available for executing tasks. On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote: Thanks for the info. My concern has always been on how Spark handles autoscaling (adding new executors) when the load pattern changes.I have tried to test this with setting the following parameters (Spark 3.1.2 on GCP) spark-submit --verbose \ ....... --conf spark.dynamicAllocation.enabled="true" \ --conf spark.shuffle.service.enabled="true" \ --conf spark.dynamicAllocation.minExecutors=2 \ --conf spark.dynamicAllocation.maxExecutors=10 \ --conf spark.dynamicAllocation.initialExecutors=4 \ It is not very clear to me how Spark distributes tasks on the added executors and the source of delay. As you have observed there is a delay in adding new resources and allocating tasks. If that process is efficient? Thanks Error! Filename not specified. view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <m...@resolute.ai<mailto:m...@resolute.ai>> wrote: It's actually on AWS EMR. The job bootstraps and runs fine -- the autoscaling group is to bring up a service that spark will be calling. Some code waits for the autoscaling group to come up before continuing processing in Spark, since the Spark cluster will need to make requests to the service in the autoscaling group. It takes several minutes for the service to come up, and during the wait, Spark starts to show these thread dumps, as presumably it thinks something is wrong since the executor is busy waiting and not doing anything. The previous version of Spark did not do this (2.4.4). On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote: Sounds like you are running this on Google Dataproc cluster (spark 3.1.2) with auto scaling policy? Can you describe if this happens before Spark starts a new job on the cluster or somehow half way through processing an existing job? Also is the job involved doing Spark Structured Streaming? HTH Error! Filename not specified. view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <m...@resolute.ai<mailto:m...@resolute.ai>> wrote: We've got a spark task that, after some processing, starts an autoscaling group and waits for it to be up before continuing processing. While waiting for the autoscaling group, spark starts throwing full thread dumps, presumably at the spark.executor.heartbeat interval. Is there a way to prevent the thread dumps? -- Maksim Grinman VP Engineering Resolute AI -- Maksim Grinman VP Engineering Resolute AI -- Twitter: https://twitter.com/holdenkarau Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> YouTube Live Streams: https://www.youtube.com/user/holdenkarau -- Maksim Grinman VP Engineering Resolute AI -- Maksim Grinman VP Engineering Resolute AI