Re: Scheduling jobs using FAIR pool
Hi Hussein, Thanks for clarifying my doubts. It means that even if I configure 2 separate pools for 2 jobs or submit the 2 jobs in same pool, the submission time will take into effect only when both the jobs are "running" in parallel ( ie if job 1 gets all resources, job 2 has to wait unless until pool 2 had been assigned a set min executors ) However, with separate pools ( small, preferred static pools defined over dynamic ones ) , more control such as weightage for jobs when multiple jobs are competing for the resources and assigning minimum executors for each pool can be done. Regards, Varun Shah On Mon, Apr 1, 2024, 18:50 Hussein Awala wrote: > IMO the questions are not limited to Databricks. > > > The Round-Robin distribution of executors only work in case of empty > executors (achievable by enabling dynamic allocation). In case the jobs > (part of the same pool) requires all executors, second jobs will still need > to wait. > > This feature in Spark allows for optimal resource utilization. Consider a > scenario with two stages, each with 500 tasks (500 partitions), generated > by two threads, and a total of 100 Spark executors available in the fair > pool. > The first thread may be instantiated microseconds ahead of the second, > resulting in the fair scheduler allocating 100 tasks to the first stage > initially. Once some of the tasks are complete, the scheduler dynamically > redistributes resources, ultimately splitting the capacity equally between > both stages. This will work in the same way if you have a single stage but > without splitting the capacity. > > Regarding the other three questions, dynamically creating pools may not be > advisable due to several considerations (cleanup issues, mixing application > and infrastructure management, + a lot of unexpected issues). > > For scenarios involving stages with few long-running tasks like yours, > it's recommended to enable dynamic allocation to let Spark add executors as > needed. > > In the context of streaming workloads, streaming dynamic allocation is > preferred to address specific issues detailed in SPARK-12133 > <https://issues.apache.org/jira/browse/SPARK-12133>. Although the > configurations for this feature are not documented, they can be found in the > source code > <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala> > . > But for structured streaming (your case), you should use batch one ( > spark.dynamicAllocation.*), as SPARK-24815 > <https://issues.apache.org/jira/browse/SPARK-24815> is not ready yet (it > was accepted and will be ready soon), but it has some issues in the > downscale step, you can check the JIRA issue for more details. > > On Mon, Apr 1, 2024 at 2:07 PM Varun Shah > wrote: > >> Hi Mich, >> >> I did not post in the databricks community, as most of the questions were >> related to spark itself. >> >> But let me also post the question on databricks community. >> >> Thanks, >> Varun Shah >> >> On Mon, Apr 1, 2024, 16:28 Mich Talebzadeh >> wrote: >> >>> Hi, >>> >>> Have you put this question to Databricks forum >>> >>> Data Engineering - Databricks >>> <https://community.databricks.com/t5/data-engineering/bd-p/data-engineering> >>> >>> >>> Mich Talebzadeh, >>> Technologist | Solutions Architect | Data Engineer | Generative AI >>> London >>> United Kingdom >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Mon, 1 Apr 2024 at 07:22, Varun Shah >>> wrote: >>> >>>> Hi Community, >>>> >>>> I am currently exploring the best use of "Scheduler Pools" for >>>> executing jobs in parallel, and require clarification and suggestions on a >>>> few points. >>>> >>>> The implementation consists of executing "Structured Streaming" jobs on >>>> Databricks using AutoLoader. Each stream is exe
Re: Scheduling jobs using FAIR pool
Hi Mich, I did not post in the databricks community, as most of the questions were related to spark itself. But let me also post the question on databricks community. Thanks, Varun Shah On Mon, Apr 1, 2024, 16:28 Mich Talebzadeh wrote: > Hi, > > Have you put this question to Databricks forum > > Data Engineering - Databricks > <https://community.databricks.com/t5/data-engineering/bd-p/data-engineering> > > > Mich Talebzadeh, > Technologist | Solutions Architect | Data Engineer | Generative AI > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 1 Apr 2024 at 07:22, Varun Shah wrote: > >> Hi Community, >> >> I am currently exploring the best use of "Scheduler Pools" for executing >> jobs in parallel, and require clarification and suggestions on a few points. >> >> The implementation consists of executing "Structured Streaming" jobs on >> Databricks using AutoLoader. Each stream is executed with trigger = >> 'AvailableNow', ensuring that the streams don't keep running for the >> source. (we have ~4000 such streams, with no continuous stream from source, >> hence not keeping the streams running infinitely using other triggers). >> >> One way to achieve parallelism in the jobs is to use "MultiThreading", >> all using same SparkContext, as quoted from official docs: "Inside a given >> Spark application (SparkContext instance), multiple parallel jobs can run >> simultaneously if they were submitted from separate threads." >> >> There's also a availability of "FAIR Scheduler", which instead of FIFO >> Scheduler (default), assigns executors in Round-Robin fashion, ensuring the >> smaller jobs that were submitted later do not starve due to bigger jobs >> submitted early consuming all resources. >> >> Here are my questions: >> 1. The Round-Robin distribution of executors only work in case of empty >> executors (achievable by enabling dynamic allocation). In case the jobs >> (part of the same pool) requires all executors, second jobs will still need >> to wait. >> 2. If we create dynamic pools for submitting each stream (by setting >> spark property -> "spark.scheduler.pool" to a dynamic value as >> spark.sparkContext.setLocalProperty("spark.scheduler.pool", "> string>") , how does executor allocation happen ? Since all pools created >> are created dynamically, they share equal weight. Does this also work the >> same way as submitting streams to a single pool as a FAIR scheduler ? >> 3. Official docs quote "inside each pool, jobs run in FIFO order.". Is >> this true for the FAIR scheduler also ? By definition, it does not seem >> right, but it's confusing. It says "By Default" , so does it mean for FIFO >> scheduler or by default for both scheduling types ? >> 4. Are there any overhead for spark driver while creating / using a >> dynamically created spark pool vs pre-defined pools ? >> >> Apart from these, any suggestions or ways you have implemented >> auto-scaling for such loads ? We are currently trying to auto-scale the >> resources based on requests, but scaling down is an issue (known already >> for which SPIP is already in discussion, but it does not cater to >> submitting multiple streams in a single cluster. >> >> Thanks for reading !! Looking forward to your suggestions >> >> Regards, >> Varun Shah >> >> >> >> >>
Scheduling jobs using FAIR pool
Hi Community, I am currently exploring the best use of "Scheduler Pools" for executing jobs in parallel, and require clarification and suggestions on a few points. The implementation consists of executing "Structured Streaming" jobs on Databricks using AutoLoader. Each stream is executed with trigger = 'AvailableNow', ensuring that the streams don't keep running for the source. (we have ~4000 such streams, with no continuous stream from source, hence not keeping the streams running infinitely using other triggers). One way to achieve parallelism in the jobs is to use "MultiThreading", all using same SparkContext, as quoted from official docs: "Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads." There's also a availability of "FAIR Scheduler", which instead of FIFO Scheduler (default), assigns executors in Round-Robin fashion, ensuring the smaller jobs that were submitted later do not starve due to bigger jobs submitted early consuming all resources. Here are my questions: 1. The Round-Robin distribution of executors only work in case of empty executors (achievable by enabling dynamic allocation). In case the jobs (part of the same pool) requires all executors, second jobs will still need to wait. 2. If we create dynamic pools for submitting each stream (by setting spark property -> "spark.scheduler.pool" to a dynamic value as spark.sparkContext.setLocalProperty("spark.scheduler.pool", "") , how does executor allocation happen ? Since all pools created are created dynamically, they share equal weight. Does this also work the same way as submitting streams to a single pool as a FAIR scheduler ? 3. Official docs quote "inside each pool, jobs run in FIFO order.". Is this true for the FAIR scheduler also ? By definition, it does not seem right, but it's confusing. It says "By Default" , so does it mean for FIFO scheduler or by default for both scheduling types ? 4. Are there any overhead for spark driver while creating / using a dynamically created spark pool vs pre-defined pools ? Apart from these, any suggestions or ways you have implemented auto-scaling for such loads ? We are currently trying to auto-scale the resources based on requests, but scaling down is an issue (known already for which SPIP is already in discussion, but it does not cater to submitting multiple streams in a single cluster. Thanks for reading !! Looking forward to your suggestions Regards, Varun Shah
Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
+1 Great initiative. QQ : Stack overflow has a similar feature called "Collectives", but I am not sure of the expenses to create one for Apache Spark. With SO being used ( atleast before ChatGPT became quite the norm for searching questions), it already has a lot of questions asked and answered by the community over a period of time and hence, if possible, we could leverage it as the starting point for building a community before creating a complete new website from scratch. Any thoughts on this? Regards, Varun Shah On Mon, Mar 18, 2024, 16:29 Mich Talebzadeh wrote: > Some of you may be aware that Databricks community Home | Databricks > have just launched a knowledge sharing hub. I thought it would be a > good idea for the Apache Spark user group to have the same, especially > for repeat questions on Spark core, Spark SQL, Spark Structured > Streaming, Spark Mlib and so forth. > > Apache Spark user and dev groups have been around for a good while. > They are serving their purpose . We went through creating a slack > community that managed to create more more heat than light.. This is > what Databricks community came up with and I quote > > "Knowledge Sharing Hub > Dive into a collaborative space where members like YOU can exchange > knowledge, tips, and best practices. Join the conversation today and > unlock a wealth of collective wisdom to enhance your experience and > drive success." > > I don't know the logistics of setting it up.but I am sure that should > not be that difficult. If anyone is supportive of this proposal, let > the usual +1, 0, -1 decide > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > Disclaimer: The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner Von Braun)". > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Pyspark Write Batch Streaming Data to Snowflake Fails with more columns
Hi Mich, Thanks for the suggestions. I checked the documentation regarding the issue in data types and found that the different timezone settings being used in spark & snowflake was the issue. Specifying the timezone in spark options while writing the data to snowflake worked 😁 Documentation link : https://docs.snowflake.com/en/user-guide/spark-connector-use#working-with-timestamps-and-time-zones Thank you once again for your help. Regards, Varun Shah On Sat, Feb 10, 2024, 04:01 Mich Talebzadeh wrote: > Hi Varun, > > I am no expert on Snowflake, however, the issue you are facing, > particularly if it involves data trimming in a COPY statement and potential > data mismatch, is likely related to how Snowflake handles data ingestion > rather than being directly tied to PySpark. The COPY command in Snowflake > is used to load data from external files (like those in s3) into Snowflake > tables. Possible causes for data truncation or mismatch could include > differences in data types, column lengths, or encoding between your source > data and the Snowflake table schema. It could also be related to the way > your PySpark application is formatting or providing data to Snowflake. > > Check these > >- Schema Matching: Ensure that the data types, lengths, and encoding >of the columns in your Snowflake table match the corresponding columns in >your PySpark DataFrame. >- Column Mapping: Explicitly map the columns in your PySpark DataFrame >to the corresponding columns in the Snowflake table during the write >operation. This can help avoid any implicit mappings that might be causing >issues. > > >1. > >HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 9 Feb 2024 at 13:06, Varun Shah wrote: > >> Hi Team, >> >> We currently have implemented pyspark spark-streaming application on >> databricks, where we read data from s3 and write to the snowflake table >> using snowflake connector jars (net.snowflake:snowflake-jdbc v3.14.5 and >> net.snowflake:spark-snowflake v2.12:2.14.0-spark_3.3) . >> >> Currently facing an issue where if we give a large number of columns, it >> trims the data in a copy statement, thereby unable to write to the >> snowflake as the data mismatch happens. >> >> Using databricks 11.3 LTS with Spark 3.3.0 and Scala 2.12 version. >> >> Can you please help on how I can resolve this issue ? I tried searching >> online, but did not get any such articles. >> >> Looking forward to hearing from you. >> >> Regards, >> Varun Shah >> >> >>
Pyspark Write Batch Streaming Data to Snowflake Fails with more columns
Hi Team, We currently have implemented pyspark spark-streaming application on databricks, where we read data from s3 and write to the snowflake table using snowflake connector jars (net.snowflake:snowflake-jdbc v3.14.5 and net.snowflake:spark-snowflake v2.12:2.14.0-spark_3.3) . Currently facing an issue where if we give a large number of columns, it trims the data in a copy statement, thereby unable to write to the snowflake as the data mismatch happens. Using databricks 11.3 LTS with Spark 3.3.0 and Scala 2.12 version. Can you please help on how I can resolve this issue ? I tried searching online, but did not get any such articles. Looking forward to hearing from you. Regards, Varun Shah
Re: Spark Scala SBT Local build fails
++ DEV community On Mon, Jul 17, 2023 at 4:14 PM Varun Shah wrote: > Resending this message with a proper Subject line > > Hi Spark Community, > > I am trying to set up my forked apache/spark project locally for my 1st > Open Source Contribution, by building and creating a package as mentioned here > under Running Individual Tests > <https://spark.apache.org/developer-tools.html#running-individual-tests>. > Here are the steps I have followed: > >> .build/sbt # this opens a sbt console > >> test # to execute all tests > > I am getting the following error and the tests are failing. Even compile / > package sbt commands fail with the same errors. > >> >> [info] compiling 19 Java sources to >> forked/spark/common/network-shuffle/target/scala-2.12/test-classes ... >> [info] compiling 330 Scala sources and 29 Java sources to >> forked/spark/core/target/scala-2.12/test-classes ... >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:21:0: >> There should at least one a single empty line separating groups 3rdParty >> and spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:32:0: >> org.json4s.JsonAST.JValue should be in group 3rdParty, not spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:33:0: >> org.json4s.JsonDSL._ should be in group 3rdParty, not spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:34:0: >> org.json4s._ should be in group 3rdParty, not spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:35:0: >> org.json4s.jackson.JsonMethods._ should be in group 3rdParty, not spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:37:0: >> java.util.Locale should be in group java, not spark. >> [error] >> forked/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala:38:0: >> scala.util.control.NonFatal should be in group scala, not spark. >> [error] >> forked/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala:226: >> File line length exceeds 100 characters >> [error] stack trace is suppressed; run last catalyst / >> scalaStyleOnCompile for the full output >> [error] stack trace is suppressed; run last scalaStyleOnTest for the full >> outpu >> [error] (catalyst / scalaStyleOnCompile) Failing because of negative >> scalastyle result >> [error] (scalaStyleOnTest) Failing because of negative scalastyle result >> > > Can you please guide me if I am doing something wrong. > > Regards, > Varun Shah >