Re: pyspark dataframe join with two different data type
You can use a combination of explode and distinct before joining. from pyspark.sql import SparkSession from pyspark.sql.functions import explode # Create a SparkSession spark = SparkSession.builder \ .appName("JoinExample") \ .getOrCreate() sc = spark.sparkContext # Set the log level to ERROR to reduce verbosity sc.setLogLevel("ERROR") # Create DataFrame df data = [ ["a"], ["b"], ["d"], ] column_names = ["column1"] df = spark.createDataFrame(data, column_names) print("df:") df.show() # Create DataFrame df_1 df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) print("df_1:") df_1.show() # Explode the array column in df_1 exploded_df_1 = df_1.select(explode("data").alias("data")) # Join with df using the exploded column final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1) # Distinct to ensure only unique rows are returned from df_1 final_df = final_df.select("data").distinct() print("Result:") final_df.show() Results in df: +---+ |column1| +---+ | a| | b| | d| +---+ df_1: +-+ | data| +-+ |[a, b, c]| | []| +-+ Result: ++ |data| ++ | a| | b| ++ HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrimeLondon United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 14 May 2024 at 13:19, Karthick Nk wrote: > Hi All, > > Could anyone have any idea or suggestion of any alternate way to achieve > this scenario? > > Thanks. > > On Sat, May 11, 2024 at 6:55 AM Damien Hawes > wrote: > >> Right now, with the structure of your data, it isn't possible. >> >> The rows aren't duplicates of each other. "a" and "b" both exist in the >> array. So Spark is correctly performing the join. It looks like you need to >> find another way to model this data to get what you want to achieve. >> >> Are the values of "a" and "b" related to each other in any way? >> >> - Damien >> >> Op vr 10 mei 2024 18:08 schreef Karthick Nk : >> >>> Hi Mich, >>> >>> Thanks for the solution, But I am getting duplicate result by using >>> array_contains. I have explained the scenario below, could you help me on >>> that, how we can achieve i have tried different way bu i could able to >>> achieve. >>> >>> For example >>> >>> data = [ >>> ["a"], >>> ["b"], >>> ["d"], >>> ] >>> column_names = ["column1"] >>> df = spark.createDataFrame(data, column_names) >>> df.display() >>> >>> [image: image.png] >>> >>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) >>> df_1.display() >>> [image: image.png] >>> >>> >>> final_df = df_1.join(df, expr("array_contains(data, column1)")) >>> final_df.display() >>> >>> Resul: >>> [image: image.png] >>> >>> But i need the result like below: >>> >>> [image: image.png] >>> >>> Why because >>> >>> In the df_1 i have only two records, in that first records onlly i have >>> matching value. >>> But both records from the df i.e *a, b* are present in the first >>> records itself, it is returning two records as resultant, but my >>> expectation is to return only one records means if any of the records from >>> the df is present in the df_1 it should return only one records from the >>> df_1. >>> >>> Note: >>> 1. Here we are able to filter the duplicate records by using distinct of >>> ID field in the resultant df, bu I am thinking that shouldn't be effective >>> way, rather i am thinking of updating in array_contains steps itself. >>> >>> Thanks. >>> >>> >>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> >>>> This is w
Re: [Spark Streaming]: Save the records that are dropped by watermarking in spark structured streaming
you may consider - Increase Watermark Retention: Consider increasing the watermark retention duration. This allows keeping records for a longer period before dropping them. However, this might increase processing latency and violate at-least-once semantics if the watermark lags behind real-time. OR - Use a separate stream for dropped records: Create a separate streaming pipeline to process the dropped records. Try: - Filter: Filter out records older than the watermark in the main pipeline. say resultC = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.timestamp").alias("timestamp") \ , col("parsed_value.temperature").alias("temperature")) """ We work out the window and the AVG(temperature) in the window's timeframe below This should return back the following Dataframe as struct root |-- window: struct (nullable = false) ||-- start: timestamp (nullable = true) ||-- end: timestamp (nullable = true) |-- avg(temperature): double (nullable = true) """ resultM = resultC. \ *withWatermark("timestamp", "5 minutes").* \ groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ avg('temperature') - Write to Sink: Write the filtered records (dropped records) to a separate Kafka topic. - Consume and Store: Consume the dropped records topic with another streaming job and store them in a Postgres table or S3 using lib HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 8 May 2024 at 05:13, Nandha Kumar wrote: > Hi Team, >We are trying to use *spark structured streaming *for our use > case. We will be joining 2 streaming sources(from kafka topic) with > watermarks. As time progresses, the records that are prior to the watermark > timestamp are removed from the state. For our use case, we want to *store > these dropped records* in some postgres table or s3. > > When searching, we found a similar question > <https://stackoverflow.com/questions/60418632/how-to-save-the-records-that-are-dropped-by-watermarking-in-spark-structured-str>in > StackOverflow which is unanswered. > *We would like to know how to store these dropped records due to the > watermark.* >
Re: ********Spark streaming issue to Elastic data**********
Hi Kartrick, Unfortunately Materialised views are not available in Spark as yet. I raised Jira [SPARK-48117] Spark Materialized Views: Improve Query Performance and Data Management - ASF JIRA (apache.org) <https://issues.apache.org/jira/browse/SPARK-48117> as a feature request. Let me think about another way and revert HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 6 May 2024 at 07:54, Karthick Nk wrote: > Thanks Mich, > > can you please confirm me is my understanding correct? > > First, we have to create the materialized view based on the mapping > details we have by using multiple tables as source(since we have multiple > join condition from different tables). From the materialised view we can > stream the view data into elastic index by using cdc? > > Thanks in advance. > > On Fri, May 3, 2024 at 3:39 PM Mich Talebzadeh > wrote: > >> My recommendation! is using materialized views (MVs) created in Hive with >> Spark Structured Streaming and Change Data Capture (CDC) is a good >> combination for efficiently streaming view data updates in your scenario. >> >> HTH >> >> Mich Talebzadeh, >> Technologist | Architect | Data Engineer | Generative AI | FinCrime >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Thu, 2 May 2024 at 21:25, Karthick Nk wrote: >> >>> Hi All, >>> >>> Requirements: >>> I am working on the data flow, which will use the view definition(view >>> definition already defined in schema), there are multiple tables used in >>> the view definition. Here we want to stream the view data into elastic >>> index based on if any of the table(used in the view definition) data got >>> changed. >>> >>> >>> Current flow: >>> 1. we are inserting id's from the table(which used in the view >>> definition) into the common table. >>> 2. From the common table by using the id, we will be streaming the view >>> data (by using if any of the incomming id is present in the collective id >>> of all tables used from view definition) by using spark structured >>> streaming. >>> >>> >>> Issue: >>> 1. Here we are facing issue - For each incomming id here we running view >>> definition(so it will read all the data from all the data) and check if any >>> of the incomming id is present in the collective id's of view result, Due >>> to which it is taking more memory in the cluster driver and taking more >>> time to process. >>> >>> >>> I am epxpecting an alternate solution, if we can avoid full scan of view >>> definition every time, If you have any alternate deisgn flow how we can >>> achieve the result, please suggest for the same. >>> >>> >>> Note: Also, it will be helpfull, if you can share the details like >>> community forum or platform to discuss this kind of deisgn related topics, >>> it will be more helpfull. >>> >>
Spark Materialized Views: Improve Query Performance and Data Management
Hi, I have raised a ticket SPARK-48117 <https://issues.apache.org/jira/browse/SPARK-48117> for enhancing Spark capabilities with Materialised Views (MV). Currently both Hive and Databricks support this. I have added these potential benefits to the ticket -* Improved Query Performance (especially for Streaming Data):* Materialized Views can significantly improve query performance, particularly for use cases involving Spark Structured Streaming. When dealing with continuous data streams, materialized views can pre-compute and store frequently accessed aggregations or transformations. Subsequent queries on the materialized view can retrieve the results much faster compared to continuously processing the entire streaming data. This is crucial for real-time analytics where low latency is essential. *Enhancing Data Management:* They offer a way to pre-aggregate or transform data, making complex queries more efficient. - *Reduced Data Movement*: Materialized Views can be materialized on specific clusters or storage locations closer to where the data will be consumed. This minimizes data movement across the network, further improving query performance and reducing overall processing time. - *Simplified Workflows:* Developers and analysts can leverage pre-defined Materialized Views that represent specific business logic or data subsets. This simplifies data access, reduces development time for queries that rely on these views, and fosters code reuse. Please have a look at the ticket and add your comments. Thanks Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Re: Issue with Materialized Views in Spark SQL
Sadly Apache Spark sounds like it has nothing to do within materialised views. I was hoping it could read it! >>> *spark.sql("SELECT * FROM test.mv <http://test.mv>").show()* Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/sql/session.py", line 1440, in sql return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) File "/usr/src/Python-3.9.16/venv/venv3.9/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/spark/python/pyspark/errors/exceptions/captured.py", line 175, in deco raise converted from None *Pyspark.errors.exceptions.captured.AnalysisException: Hive materialized view is not supported.* HTH Mch Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 3 May 2024 at 11:03, Mich Talebzadeh wrote: > Thanks for the comments I received. > > So in summary, Apache Spark itself doesn't directly manage materialized > views,(MV) but it can work with them through integration with the > underlying data storage systems like Hive or through iceberg. I believe > databricks through unity catalog support MVs as well. > > Moreover, there is a case for supporting MVs. However, Spark can utilize > materialized views even though it doesn't directly manage them.. This came > about because someone in the Spark user forum enquired about "Spark > streaming issue to Elastic data*". One option I thought of was that uUsing > materialized views with Spark Structured Streaming and Change Data Capture > (CDC) is a potential solution for efficiently streaming view data updates > in this scenario. . > > > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Fri, 3 May 2024 at 00:54, Mich Talebzadeh > wrote: > >> An issue I encountered while working with Materialized Views in Spark >> SQL. It appears that there is an inconsistency between the behavior of >> Materialized Views in Spark SQL and Hive. >> >> When attempting to execute a statement like DROP MATERIALIZED VIEW IF >> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating >> that the keyword MATERIALIZED is not recognized. However, the same >> statement executes successfully in Hive without any errors. >> >> pyspark.errors.exceptions.captured.ParseException: >> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos >> 5) >> >> == SQL == >> DROP MATERIALIZED VIEW IF EXISTS test.mv >> -^^^ >> >> Here are the versions I am using: >> >> >> >> *Hive: 3.1.1Spark: 3.4* >> my Spark session: >> >> spark = SparkSession.builder \ >> .appName("test") \ >> .enableHiveSupport() \ >> .getOrCreate() >> >> Has anyone seen this behaviour or encountered a similar issue or if there >> are any insights into why this discrepancy exists between Spark SQL and >> Hive. >> >> Thanks >> >> Mich Talebzadeh, >> >> Technologist | Architect | Data Engineer | Generative AI | FinCrime >> >> London >> United Kingdom >> >> >>view my Linkedin profile >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> Disclaimer: The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner Von Braun)". >> >
Re: ********Spark streaming issue to Elastic data**********
My recommendation! is using materialized views (MVs) created in Hive with Spark Structured Streaming and Change Data Capture (CDC) is a good combination for efficiently streaming view data updates in your scenario. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 2 May 2024 at 21:25, Karthick Nk wrote: > Hi All, > > Requirements: > I am working on the data flow, which will use the view definition(view > definition already defined in schema), there are multiple tables used in > the view definition. Here we want to stream the view data into elastic > index based on if any of the table(used in the view definition) data got > changed. > > > Current flow: > 1. we are inserting id's from the table(which used in the view definition) > into the common table. > 2. From the common table by using the id, we will be streaming the view > data (by using if any of the incomming id is present in the collective id > of all tables used from view definition) by using spark structured > streaming. > > > Issue: > 1. Here we are facing issue - For each incomming id here we running view > definition(so it will read all the data from all the data) and check if any > of the incomming id is present in the collective id's of view result, Due > to which it is taking more memory in the cluster driver and taking more > time to process. > > > I am epxpecting an alternate solution, if we can avoid full scan of view > definition every time, If you have any alternate deisgn flow how we can > achieve the result, please suggest for the same. > > > Note: Also, it will be helpfull, if you can share the details like > community forum or platform to discuss this kind of deisgn related topics, > it will be more helpfull. >
Re: Issue with Materialized Views in Spark SQL
Thanks for the comments I received. So in summary, Apache Spark itself doesn't directly manage materialized views,(MV) but it can work with them through integration with the underlying data storage systems like Hive or through iceberg. I believe databricks through unity catalog support MVs as well. Moreover, there is a case for supporting MVs. However, Spark can utilize materialized views even though it doesn't directly manage them.. This came about because someone in the Spark user forum enquired about "Spark streaming issue to Elastic data*". One option I thought of was that uUsing materialized views with Spark Structured Streaming and Change Data Capture (CDC) is a potential solution for efficiently streaming view data updates in this scenario. . Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 3 May 2024 at 00:54, Mich Talebzadeh wrote: > An issue I encountered while working with Materialized Views in Spark SQL. > It appears that there is an inconsistency between the behavior of > Materialized Views in Spark SQL and Hive. > > When attempting to execute a statement like DROP MATERIALIZED VIEW IF > EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that > the keyword MATERIALIZED is not recognized. However, the same statement > executes successfully in Hive without any errors. > > pyspark.errors.exceptions.captured.ParseException: > [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5) > > == SQL == > DROP MATERIALIZED VIEW IF EXISTS test.mv > -^^^ > > Here are the versions I am using: > > > > *Hive: 3.1.1Spark: 3.4* > my Spark session: > > spark = SparkSession.builder \ > .appName("test") \ > .enableHiveSupport() \ > .getOrCreate() > > Has anyone seen this behaviour or encountered a similar issue or if there > are any insights into why this discrepancy exists between Spark SQL and > Hive. > > Thanks > > Mich Talebzadeh, > > Technologist | Architect | Data Engineer | Generative AI | FinCrime > > London > United Kingdom > > >view my Linkedin profile > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > Disclaimer: The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner Von Braun)". >
Issue with Materialized Views in Spark SQL
An issue I encountered while working with Materialized Views in Spark SQL. It appears that there is an inconsistency between the behavior of Materialized Views in Spark SQL and Hive. When attempting to execute a statement like DROP MATERIALIZED VIEW IF EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that the keyword MATERIALIZED is not recognized. However, the same statement executes successfully in Hive without any errors. pyspark.errors.exceptions.captured.ParseException: [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5) == SQL == DROP MATERIALIZED VIEW IF EXISTS test.mv -^^^ Here are the versions I am using: *Hive: 3.1.1Spark: 3.4* my Spark session: spark = SparkSession.builder \ .appName("test") \ .enableHiveSupport() \ .getOrCreate() Has anyone seen this behaviour or encountered a similar issue or if there are any insights into why this discrepancy exists between Spark SQL and Hive. Thanks Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Re: [spark-graphframes]: Generating incorrect edges
Hi Steve, Thanks for your statement. I tend to use uuid myself to avoid collisions. This built-in function generates random IDs that are highly likely to be unique across systems. My concerns are on edge so to speak. If the Spark application runs for a very long time or encounters restarts, the monotonically_increasing_id() sequence might restart from the beginning. This could again cause duplicate IDs if other Spark applications are running concurrently or if data is processed across multiple runs of the same application.. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 1 May 2024 at 01:22, Stephen Coy wrote: > Hi Mich, > > I was just reading random questions on the user list when I noticed that > you said: > > On 25 Apr 2024, at 2:12 AM, Mich Talebzadeh > wrote: > > 1) You are using monotonically_increasing_id(), which is not > collision-resistant in distributed environments like Spark. Multiple hosts >can generate the same ID. I suggest switching to UUIDs (e.g., > uuid.uuid4()) for guaranteed uniqueness. > > > It’s my understanding that the *Spark* `monotonically_increasing_id()` > function exists for the exact purpose of generating a collision-resistant > unique id across nodes on different hosts. > We use it extensively for this purpose and have never encountered an issue. > > Are we wrong or are you thinking of a different (not Spark) function? > > Cheers, > > Steve C > > > > > This email contains confidential information of and is the copyright of > Infomedia. It must not be forwarded, amended or disclosed without consent > of the sender. If you received this message by mistake, please advise the > sender and delete all copies. Security of transmission on the internet > cannot be guaranteed, could be infected, intercepted, or corrupted and you > should ensure you have suitable antivirus protection in place. By sending > us your or any third party personal details, you consent to (or confirm you > have obtained consent from such third parties) to Infomedia’s privacy > policy. http://www.infomedia.com.au/privacy-policy/ >
Re: spark.sql.shuffle.partitions=auto
spark.sql.shuffle.partitions=auto Because Apache Spark does not build clusters. This configuration option is specific to Databricks, with their managed Spark offering. It allows Databricks to automatically determine an optimal number of shuffle partitions for your workload. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 30 Apr 2024 at 11:51, second_co...@yahoo.com.INVALID wrote: > May i know is > > spark.sql.shuffle.partitions=auto > > only available on Databricks? what about on vanilla Spark ? When i set > this, it gives error need to put int. Any open source library that auto > find the best partition , block size for dataframe? > > >
Re: Spark on Kubernetes
Hi, In k8s the driver is responsible for executor creation. The likelihood of your problem is that Insufficient memory allocated for executors in the K8s cluster. Even with dynamic allocation, k8s won't schedule executor pods if there is not enough free memory to fulfill their resource requests. My suggestions - Increase Executor Memory: Allocate more memory per executor (e.g., 2GB or 3GB) to allow for multiple executors within available cluster memory. - Adjust Driver Pod Resources: Ensure the driver pod has enough memory to run Spark and manage executors. - Optimize Resource Management: Explore on-demand allocation or adjusting allocation granularity for better resource utilization. For example look at documents for Executor On-Demand Allocation (spark.executor.cores=0): and spark.dynamicAllocation.minExecutors & spark.dynamicAllocation.maxExecutors HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 30 Apr 2024 at 04:29, Tarun raghav wrote: > Respected Sir/Madam, > I am Tarunraghav. I have a query regarding spark on kubernetes. > > We have an eks cluster, within which we have spark installed in the pods. > We set the executor memory as 1GB and set the executor instances as 2, I > have also set dynamic allocation as true. So when I try to read a 3 GB CSV > file or parquet file, it is supposed to increase the number of pods by 2. > But the number of executor pods is zero. > I don't know why executor pods aren't being created, even though I set > executor instance as 2. Please suggest a solution for this. > > Thanks & Regards, > Tarunraghav > >
Re: [spark-graphframes]: Generating incorrect edges
OK let us have a look at these 1) You are using monotonically_increasing_id(), which is not collision-resistant in distributed environments like Spark. Multiple hosts can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4()) for guaranteed uniqueness. 2) Missing values in the Origin column lead to null IDs, potentially causing problems downstream. You can handle missing values appropriately, say a) Filter out rows with missing origins or b) impute missing values with a strategy that preserves relationships (if applicable). 3) With join code, you mentioned left joining on the same column used for ID creation, not very clear! 4) Edge Issue, it appears to me the issue seems to occur with larger datasets (>100K records). Possible causes could be a) Resource Constraints as data size increases, PySpark might struggle with joins or computations if resources are limited (memory, CPU). b) Data Skew: Uneven distribution of values in certain columns could lead to imbalanced processing across machines. Check Spark UI (4040) on staging and execution tabs HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS) < j.g.w.nijl...@student.utwente.nl> wrote: > Hi Mich, > > Thanks for your reply, > 1) ID generation is done using monotonically_increasing_id() > <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html> > this > is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of > the value it identifies. > 2) There are some missing values in the Origin column, these will result > in a Null ID > 3) The join code is present in [1], I join "left" on the same column > I create the ID on > 4) I dont think the issue is in ID or edge generation, if i limit my input > dataframe and union it with my Utwente data row, I can verify those edges > are created correctly up to 100K records. > Once I go past that amount of records the results become inconsistent and > incorrect. > > Kind regards, > Jelle Nijland > > > -- > *From:* Mich Talebzadeh > *Sent:* Wednesday, April 24, 2024 4:40 PM > *To:* Nijland, J.G.W. (Jelle, Student M-CS) < > j.g.w.nijl...@student.utwente.nl> > *Cc:* user@spark.apache.org > *Subject:* Re: [spark-graphframes]: Generating incorrect edges > > OK few observations > > 1) ID Generation Method: How are you generating unique IDs (UUIDs, > sequential numbers, etc.)? > 2) Data Inconsistencies: Have you checked for missing values impacting ID > generation? > 3) Join Verification: If relevant, can you share the code for joining data > points during ID creation? Are joins matching columns correctly? > 4) Specific Edge Issues: Can you share examples of vertex IDs with > incorrect connections? Is this related to ID generation or edge creation > logic? > > HTH > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI, FinCrime > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) < > j.g.w.nijl...@student.utwente.nl> wrote: > > tags: pyspark,spark-graphframes > > Hello, > > I am running pyspark in a podman container and I have issues with > incorrect edges when I build my graph. > I start with loading a source dataframe from a parquet directory on my > server. The source dataframe has the following columns: > > +-+---+-+-+--+-+--+---+ > |created |descr |last_modified
Re: [spark-graphframes]: Generating incorrect edges
OK few observations 1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential numbers, etc.)? 2) Data Inconsistencies: Have you checked for missing values impacting ID generation? 3) Join Verification: If relevant, can you share the code for joining data points during ID creation? Are joins matching columns correctly? 4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect connections? Is this related to ID generation or edge creation logic? HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI, FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) < j.g.w.nijl...@student.utwente.nl> wrote: > tags: pyspark,spark-graphframes > > Hello, > > I am running pyspark in a podman container and I have issues with > incorrect edges when I build my graph. > I start with loading a source dataframe from a parquet directory on my > server. The source dataframe has the following columns: > > +-+---+-+-+--+-+--+---+ > |created |descr |last_modified|mnt_by |origin|start_address|prefix > |external_origin| > > +-+---+-+-+--+-+--+---+ > > I aim to build a graph connecting prefix, mnt_by, origin and descr with > edges storing the created and last_modified values. > I start with generating IDs for the prefix, mnt_by, origin and descr using > monotonically_increasing_id() [1] > These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are > unique IDs across the dataframe. > > Then I construct the vertices dataframe by collecting the ID, value and > whether they are external for each vertex. [2] > These vertices are then unioned together. > Following the vertices, I construct the edges dataframe by selecting the > IDs that I want to be the src and the dst and union those together. [3] > These edges store the created and last_modified. > > Now I am ready to construct the graph. Here is where I run into my issue. > > When verifying my graph, I looked at a couple of vertices to see if they > have the correct edges. > I looked at the Utwente prefix, origin, descr and mnt_by and found that it > generates incorrect edges. > > I saw edges going out to vertices that are not associated with the utwente > values at all. > The methods to find the vertices, edges and the output can be found in [4] > We can already observe inconsistencies by viewing the prefix->maintainer > and origin -> prefix edges. [5] > Depending on what column I filter on the results are inconsistent. > To make matters worse some edges contain IDs that are not connected to the > original values in the source dataframe at all. > > What I have tried to resolve my issue: > >- Write a checker that verifies edges created against the source >dataframe. [6] >The aim of this checker was to determine where the inconsistency comes >fro, to locate the bug and resolve it. >I ran this checker a limited graphs from n=10 upwards to n=100 (or >1m). >This felt close enough as there are only ~6.5m records in my source >dataframe. >This ran correctly, near the 1m it did experience significant slowdown >at the full dataframe it errors/times out. >I blamed this on the large joins that it performs on the source >dataframe. >- I found a github issue of someone with significantly larger graphs >have similar issues. >One suggestion there blamed indexing using strings rather than ints or >longs. >I rewrote my system to use int for IDs but I ran into the same issue. >The amount of incorrect edges was the same, the link to which >incorrects vertices it links to was the same too. >- I re-ordered my source dataframe to see what the impact was. >This results in considerably more incorrect edges using the checker in >[4] >If helpful I can post the output of this checker as well. > > > Can you give me any pointers in what I can try or what I can do to clarify > my situation better? > Thanks in advance for your time. > > Kind regards, >
Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.
Interesting My concern is infinite Loop in* foreachRDD*: The *while(true)* loop within foreachRDD creates an infinite loop within each Spark executor. This might not be the most efficient approach, especially since offsets are committed asynchronously.? HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sun, 14 Apr 2024 at 13:40, Kidong Lee wrote: > > Because spark streaming for kafk transaction does not work correctly to > suit my need, I moved to another approach using raw kafka consumer which > handles read_committed messages from kafka correctly. > > My codes look like the following. > > JavaDStream stream = ssc.receiverStream(new CustomReceiver()); // > CustomReceiver does nothing special except awaking foreach task. > > stream.foreachRDD(rdd -> { > > KafkaConsumer consumer = new > KafkaConsumer<>(consumerProperties); > > consumer.subscribe(Arrays.asList(topic)); > > while(true){ > > ConsumerRecords records = > consumer.poll(java.time.Duration.ofMillis(intervalMs)); > > Map offsetMap = new HashMap<>(); > > List someList = new ArrayList<>(); > > for (ConsumerRecord consumerRecord : records) { > > // add something to list. > > // put offset to offsetMap. > > } > > // process someList. > > // commit offset. > > consumer.commitAsync(offsetMap, null); > > } > > }); > > > In addition, I increased max.poll.records to 10. > > Even if this raw kafka consumer approach is not so scalable, it consumes > read_committed messages from kafka correctly and is enough for me at the > moment. > > - Kidong. > > > > 2024년 4월 12일 (금) 오후 9:19, Kidong Lee 님이 작성: > >> Hi, >> >> I have a kafka producer which sends messages transactionally to kafka and >> spark streaming job which should consume read_committed messages from kafka. >> But there is a problem for spark streaming to consume read_committed >> messages. >> The count of messages sent by kafka producer transactionally is not the >> same to the count of the read_committed messages consumed by spark >> streaming. >> >> Some consumer properties of my spark streaming job are as follows. >> >> auto.offset.reset=earliest >> enable.auto.commit=false >> isolation.level=read_committed >> >> >> I also added the following spark streaming configuration. >> >> sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); >> sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * >> 60 * 1000)); >> >> >> My spark streaming is using DirectStream like this. >> >> JavaInputDStream> stream = >> KafkaUtils.createDirectStream( >> ssc, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies.Subscribe(topics, >> kafkaParams) >> ); >> >> >> stream.foreachRDD(rdd -> O >> >>// get offset ranges. >> >>OffsetRange[] offsetRanges = ((HasOffsetRanges) >> rdd.rdd()).offsetRanges(); >> >>// process something. >> >> >>// commit offset. >>((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); >> >> } >> ); >> >> >> >> I have tested with a kafka consumer written with raw kafka-clients jar >> library without problem that it consumes read_committed messages correctly, >> and the count of consumed read_committed messages is equal to the count of >> messages sent by kafka producer. >> >> >> And sometimes, I got the following exception. >> >> Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, >> most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) >> (chango-private-1.chango.private executor driver): >> java.lang.IllegalArgumentException: requirement failed: Failed to get >> records for compacted spark-executor-school-student-group school-student-7 >> afte
Re: Spark streaming job for kafka transaction does not consume read_committed messages correctly.
Hi Kidong, There may be few potential reasons why the message counts from your Kafka producer and Spark Streaming consumer might not match, especially with transactional messages and read_committed isolation level. 1) Just ensure that both your Spark Streaming job and the Kafka consumer written with raw kafka-clients use the same consumer group. Messages are delivered to specific consumer groups, and if they differ, Spark Streaming might miss messages consumed by the raw consumer. 2) Your Spark Streaming configuration sets *enable.auto.commit=false* and uses *commitAsync manually*. However, I noted *spark.streaming.kafka.allowNonConsecutiveOffsets=true* which may be causing the problem. This setting allows Spark Streaming to read offsets that are not strictly increasing, which can happen with transactional reads. Generally recommended to set this to* false *for transactional reads to ensure Spark Streaming only reads committed messages. 3) Missed messages, in transactional messages, Kafka guarantees *delivery only after the transaction commits successfully. *There could be a slight delay between the producer sending the message and it becoming visible to consumers under read_committed isolation level. Spark Streaming could potentially miss messages during this window. 4) The exception Lost task 0.0 in stage 324.0, suggests a problem fetching records for a specific topic partition. Review your code handling of potential exceptions during rdd.foreachRDD processing. Ensure retries or appropriate error handling if encountering issues with specific partitions. 5) Try different configurations for *spark.streaming.kafka.consumer.poll.ms <http://spark.streaming.kafka.consumer.poll.ms>* to adjust polling frequency and potentially improve visibility into committed messages. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 12 Apr 2024 at 21:38, Kidong Lee wrote: > Hi, > > I have a kafka producer which sends messages transactionally to kafka and > spark streaming job which should consume read_committed messages from kafka. > But there is a problem for spark streaming to consume read_committed > messages. > The count of messages sent by kafka producer transactionally is not the > same to the count of the read_committed messages consumed by spark > streaming. > > Some consumer properties of my spark streaming job are as follows. > > auto.offset.reset=earliest > enable.auto.commit=false > isolation.level=read_committed > > > I also added the following spark streaming configuration. > > sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true"); > sparkConf.set("spark.streaming.kafka.consumer.poll.ms", String.valueOf(2 * 60 > * 1000)); > > > My spark streaming is using DirectStream like this. > > JavaInputDStream> stream = > KafkaUtils.createDirectStream( > ssc, > LocationStrategies.PreferConsistent(), > ConsumerStrategies.Subscribe(topics, > kafkaParams) > ); > > > stream.foreachRDD(rdd -> O > >// get offset ranges. > >OffsetRange[] offsetRanges = ((HasOffsetRanges) > rdd.rdd()).offsetRanges(); > >// process something. > > >// commit offset. >((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); > > } > ); > > > > I have tested with a kafka consumer written with raw kafka-clients jar > library without problem that it consumes read_committed messages correctly, > and the count of consumed read_committed messages is equal to the count of > messages sent by kafka producer. > > > And sometimes, I got the following exception. > > Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times, > most recent failure: Lost task 0.0 in stage 324.0 (TID 1674) > (chango-private-1.chango.private executor driver): > java.lang.IllegalArgumentException: requirement failed: Failed to get > records for compacted spark-executor-school-student-group school-student-7 > after polling for 12 > > at scala.Predef$.require(Predef.scala:281) > > at > org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsume
Spark column headings, camelCase or snake case?
I know this is a bit of a silly question. But what is the norm for Sparkcolumn headings? Is it camelCase or snakec_ase. For example here " someone suggested and I quote SumTotalInMillionGBP" accurately conveys the meaning but is a bit long and uses camelCase, which is not the standard convention for Spark DataFrames (usually snake_case). Use snake_case for better readability like: "total_price_in_millions_gbp" So this is the gist +--+-+---+ |district |NumberOfOffshoreOwned|total_price_in_millions_gbp| +--+-+---+ |CITY OF WESTMINSTER |4452 |21472.5| |KENSINGTON AND CHELSEA|2403 |6544.8 | |CAMDEN|1023 |4275.9 | |SOUTHWARK |1080 |3938.0 | |ISLINGTON |627 |3062.0 | |TOWER HAMLETS |1715 |3008.0 | |HAMMERSMITH AND FULHAM|765 |2137.2 | Now I recently saw a note (if i recall correctly) that Spark should be using camelCase in new spark related documents. What are the accepted views or does it matter? Thanks Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?
interesting. So below should be the corrected code with the suggestion in the [SPARK-47718] .sql() does not recognize watermark defined upstream - ASF JIRA (apache.org) <https://issues.apache.org/jira/browse/SPARK-47718> # Define schema for parsing Kafka messages schema = StructType([ StructField('createTime', TimestampType(), True), StructField('orderId', LongType(), True), StructField('payAmount', DoubleType(), True), StructField('payPlatform', IntegerType(), True), StructField('provinceId', IntegerType(), True), ]) # Read streaming data from Kafka source streaming_df = session.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "payment_msg") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \ .select("parsed_value.*") \ .withWatermark("createTime", "10 seconds") # Create temporary view for SQL queries *streaming_df.createOrReplaceTempView("streaming_df")* # Define SQL query with correct window function usage query = """ *SELECT* *window(start, '1 hour', '30 minutes') as window,* provinceId, sum(payAmount) as totalPayAmount FROM streaming_df GROUP BY provinceId, window(start, '1 hour', '30 minutes') ORDER BY window.start """ # Write the aggregated results to Kafka sink stream = session.sql(query) \ .writeStream \ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 9 Apr 2024 at 21:45, 刘唯 wrote: > Sorry this is not a bug but essentially a user error. Spark throws a > really confusing error and I'm also confused. Please see the reply in the > ticket for how to make things correct. > https://issues.apache.org/jira/browse/SPARK-47718 > > 刘唯 于2024年4月6日周六 11:41写道: > >> This indeed looks like a bug. I will take some time to look into it. >> >> Mich Talebzadeh 于2024年4月3日周三 01:55写道: >> >>> >>> hm. you are getting below >>> >>> AnalysisException: Append output mode not supported when there are >>> streaming aggregations on streaming DataFrames/DataSets without watermark; >>> >>> The problem seems to be that you are using the append output mode when >>> writing the streaming query results to Kafka. This mode is designed for >>> scenarios where you want to append new data to an existing dataset at the >>> sink (in this case, the "sink" topic in Kafka). However, your query >>> involves a streaming aggregation: group by provinceId, window('createTime', >>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming >>> requires a watermark to ensure exactly-once processing when using >>> aggregations with append mode. Your code already defines a watermark on the >>> "createTime" column with a delay of 10 seconds (withWatermark("createTime", >>> "10 seconds")). However, the error message indicates it is missing on the >>> start column. Try adding watermark to "start" Column: Modify your code as >>> below to include a watermark on the "start" column generated by the >>> window function: >>> >>> from pyspark.sql.functions import col, from_json, explode, window, sum, >>> watermark >>> >>> streaming_df = session.readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", "localhost:9092") \ >>> .option("subscribe", "payment_msg") \ >>> .option("startingOffsets", "earliest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> schema).alias("parsed_value")) \ >>> .select("pa
Re: [Spark SQL]: Source code for PartitionedFile
Hi, I believe this is the package https://raw.githubusercontent.com/apache/spark/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala And the code case class FilePartition(index: Int, files: Array[PartitionedFile]) extends Partition with InputPartition { override def preferredLocations(): Array[String] = { // Computes total number of bytes that can be retrieved from each host. val hostToNumBytes = mutable.HashMap.empty[String, Long] files.foreach { file => file.locations.filter(_ != "localhost").foreach { host => hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length } } // Selects the first 3 hosts with the most data to be retrieved. hostToNumBytes.toSeq.sortBy { case (host, numBytes) => numBytes }.reverse.take(3).map { case (host, numBytes) => host }.toArray } } HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 8 Apr 2024 at 20:31, Ashley McManamon < ashley.mcmana...@quantcast.com> wrote: > Hi All, > > I've been diving into the source code to get a better understanding of how > file splitting works from a user perspective. I've hit a deadend at > `PartitionedFile`, for which I cannot seem to find a definition? It appears > though it should be found at > org.apache.spark.sql.execution.datasources but I find no definition in the > entire source code. Am I missing something? > > I appreciate there may be an obvious answer here, apologies if I'm being > naive. > > Thanks, > Ashley McManamon > >
Re: How to get db related metrics when use spark jdbc to read db table?
Well you can do a fair bit with the available tools The Spark UI, particularly the Staging and Executors tabs, do provide some valuable insights related to database health metrics for applications using a JDBC source. Stage Overview: This section provides a summary of all the stages executed during the application's lifetime. It includes details such as the stage ID, description, submission time, duration, and number of tasks. Each Stage represents a set of tasks that perform the same computation, typically applied to a partition of the input data. The Stages tab offers insights into how these stages are executed and their associated metrics. This tab may include a directed acyclic graph (DAG) visualization, illustrating the logical and physical execution plan of the Spark application. Executors Tab: The Executors tab provides detailed information about the executors running in the Spark application. Executors are responsible for executing tasks on behalf of the Spark application. The "Executors" tab offers insights into the current state and resource usage of each executor. In addition, the underlying database will have some instrumentation to assist you with your work. say with Oracle (as an example), utilise tools like OEM, VM StatPack, SQL*Plus scripts etc or third-party monitoring tools to collect detailed database health metrics directly from the Oracle database server. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 8 Apr 2024 at 19:35, casel.chen wrote: > Hello, I have a spark application with jdbc source and do some > calculation. > To monitor application healthy, I need db related metrics per database > like number of connections, sql execution time and sql fired time > distribution etc. > Does anybody know how to get them? Thanks! > >
Re: External Spark shuffle service for k8s
Hi, First thanks everyone for their contributions I was going to reply to @Enrico Minack but noticed additional info. As I understand for example, Apache Uniffle is an incubating project aimed at providing a pluggable shuffle service for Spark. So basically, all these "external shuffle services" have in common is to offload shuffle data management to external services, thus reducing the memory and CPU overhead on Spark executors. That is great. While Uniffle and others enhance shuffle performance and scalability, it would be great to integrate them with Spark UI. This may require additional development efforts. I suppose the interest would be to have these external matrices incorporated into Spark with one look and feel. This may require customizing the UI to fetch and display metrics or statistics from the external shuffle services. Has any project done this? Thanks Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov wrote: > I see that both Uniffle and Celebron support S3/HDFS backends which is > great. > In the case someone is using S3/HDFS, I wonder what would be the > advantages of using Celebron or Uniffle vs IBM shuffle service plugin > <https://github.com/IBM/spark-s3-shuffle> or Cloud Shuffle Storage Plugin > from AWS > <https://docs.aws.amazon.com/glue/latest/dg/cloud-shuffle-storage-plugin.html> > ? > > These plugins do not require deploying a separate service. Are there any > advantages to using Uniffle/Celebron in the case of using S3 backend, which > would require deploying a separate service? > > Thanks > Vakaris > > On Mon, Apr 8, 2024 at 10:03 AM roryqi wrote: > >> Apache Uniffle (incubating) may be another solution. >> You can see >> https://github.com/apache/incubator-uniffle >> >> https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era >> >> Mich Talebzadeh 于2024年4月8日周一 07:15写道: >> >>> Splendid >>> >>> The configurations below can be used with k8s deployments of Spark. >>> Spark applications running on k8s can utilize these configurations to >>> seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3. >>> >>> For Google GCS we may have >>> >>> spark_config_gcs = { >>> "spark.kubernetes.authenticate.driver.serviceAccountName": >>> "service_account_name", >>> "spark.hadoop.fs.gs.impl": >>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", >>> "spark.hadoop.google.cloud.auth.service.account.enable": "true", >>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile": >>> "/path/to/keyfile.json", >>> } >>> >>> For Amazon S3 similar >>> >>> spark_config_s3 = { >>> "spark.kubernetes.authenticate.driver.serviceAccountName": >>> "service_account_name", >>> "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", >>> "spark.hadoop.fs.s3a.access.key": "s3_access_key", >>> "spark.hadoop.fs.s3a.secret.key": "secret_key", >>> } >>> >>> >>> To implement these configurations and enable Spark applications to >>> interact with GCS and S3, I guess we can approach it this way >>> >>> 1) Spark Repository Integration: These configurations need to be added >>> to the Spark repository as part of the supported configuration options for >>> k8s deployments. >>> >>> 2) Configuration Settings: Users need to specify these configurations >>> when submitting Spark applications to a Kubernetes cluster. They can >>> include these configurations in the Spark application code or pass them as >>> command-line arguments or environment variables during application >>> submission. >>> >>> HTH >>> >>> Mich Talebzadeh, >>> >>> Technologist | Solutions Architect | D
Re: Idiomatic way to rate-limit streaming sources to avoid OutOfMemoryError?
OK, This is a common issue in Spark Structured Streaming (SSS), where the source generates data faster than Spark can process it. SSS doesn't have a built-in mechanism for directly rate-limiting the incoming data stream itself. However, consider the following: - Limit the rate at which data is produced. This can involve configuring the data source itself to emit data at a controlled rate or implementing rate limiting mechanisms in the application or system that produces the data. - SSS supports backpressure, which allows it to dynamically adjust the ingestion rate based on the processing capacity of the system. This can help prevent overwhelming the system with data. To enable backpressure, set the appropriate configuration properties such as spark.conf.set("spark.streaming.backpressure.enabled", "true") and spark.streaming.backpressure.initialRate. - Consider adjusting the micro-batch interval to control the rate at which data is processed. Increasing the micro-batch interval and reduce the frequency of processing, allowing more time for each batch to be processed and reducing the likelihood of out-of-memory errors.. spark.conf.set("spark.sql.streaming.trigger.interval", " seconds" - Dynamic Resource Allocation (DRA), Not implemented yet. DRA will automatically adjust allocated resources based on workload. This ensures Spark has enough resources to process incoming data within the trigger interval, preventing backlogs and potential OOM issues. >From Spark UI, look at the streaming tab. There are various statistics there. In general your Processing Time has to be less than your batch interval. The scheduling Delay and Total Delay are additional indicator of health. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sun, 7 Apr 2024 at 15:11, Baran, Mert wrote: > Hi Spark community, > > I have a Spark Structured Streaming application that reads data from a > socket source (implemented very similarly to the > TextSocketMicroBatchStream). The issue is that the source can generate > data faster than Spark can process it, eventually leading to an > OutOfMemoryError when Spark runs out of memory trying to queue up all > the pending data. > > I'm looking for advice on the most idiomatic/recommended way in Spark to > rate-limit data ingestion to avoid overwhelming the system. > > Approaches I've considered: > > 1. Using a BlockingQueue with a fixed size to throttle the data. > However, this requires careful tuning of the queue size. If too small, > it limits throughput; if too large, you risk batches taking too long. > > 2. Fetching a limited number of records in the PartitionReader's next(), > adding the records into a queue and checking if the queue is empty. > However, I'm not sure if there is a built-in way to dynamically scale > the number of records fetched (i.e., dynamically calculating the offset) > based on the system load and capabilities. > > So in summary, what is the recommended way to dynamically rate-limit a > streaming source to match Spark's processing capacity and avoid > out-of-memory issues? Are there any best practices or configuration > options I should look at? > Any guidance would be much appreciated! Let me know if you need any > other details. > > Thanks, > Mert > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: External Spark shuffle service for k8s
Thanks Cheng for the heads up. I will have a look. Cheers Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sun, 7 Apr 2024 at 15:08, Cheng Pan wrote: > Instead of External Shuffle Shufle, Apache Celeborn might be a good option > as a Remote Shuffle Service for Spark on K8s. > > There are some useful resources you might be interested in. > > [1] https://celeborn.apache.org/ > [2] https://www.youtube.com/watch?v=s5xOtG6Venw > [3] https://github.com/aws-samples/emr-remote-shuffle-service > [4] https://github.com/apache/celeborn/issues/2140 > > Thanks, > Cheng Pan > > > > On Apr 6, 2024, at 21:41, Mich Talebzadeh > wrote: > > > > I have seen some older references for shuffle service for k8s, > > although it is not clear they are talking about a generic shuffle > > service for k8s. > > > > Anyhow with the advent of genai and the need to allow for a larger > > volume of data, I was wondering if there has been any more work on > > this matter. Specifically larger and scalable file systems like HDFS, > > GCS , S3 etc, offer significantly larger storage capacity than local > > disks on individual worker nodes in a k8s cluster, thus allowing > > handling much larger datasets more efficiently. Also the degree of > > parallelism and fault tolerance with these files systems come into > > it. I will be interested in hearing more about any progress on this. > > > > Thanks > > . > > > > Mich Talebzadeh, > > > > Technologist | Solutions Architect | Data Engineer | Generative AI > > > > London > > United Kingdom > > > > > > view my Linkedin profile > > > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > > > > > Disclaimer: The information provided is correct to the best of my > > knowledge but of course cannot be guaranteed . It is essential to note > > that, as with any advice, quote "one test result is worth one-thousand > > expert opinions (Werner Von Braun)". > > > > - > > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > > >
Re: External Spark shuffle service for k8s
Splendid The configurations below can be used with k8s deployments of Spark. Spark applications running on k8s can utilize these configurations to seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3. For Google GCS we may have spark_config_gcs = { "spark.kubernetes.authenticate.driver.serviceAccountName": "service_account_name", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.hadoop.google.cloud.auth.service.account.enable": "true", "spark.hadoop.google.cloud.auth.service.account.json.keyfile": "/path/to/keyfile.json", } For Amazon S3 similar spark_config_s3 = { "spark.kubernetes.authenticate.driver.serviceAccountName": "service_account_name", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3a.access.key": "s3_access_key", "spark.hadoop.fs.s3a.secret.key": "secret_key", } To implement these configurations and enable Spark applications to interact with GCS and S3, I guess we can approach it this way 1) Spark Repository Integration: These configurations need to be added to the Spark repository as part of the supported configuration options for k8s deployments. 2) Configuration Settings: Users need to specify these configurations when submitting Spark applications to a Kubernetes cluster. They can include these configurations in the Spark application code or pass them as command-line arguments or environment variables during application submission. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sun, 7 Apr 2024 at 13:31, Vakaris Baškirov wrote: > There is an IBM shuffle service plugin that supports S3 > https://github.com/IBM/spark-s3-shuffle > > Though I would think a feature like this could be a part of the main Spark > repo. Trino already has out-of-box support for s3 exchange (shuffle) and > it's very useful. > > Vakaris > > On Sun, Apr 7, 2024 at 12:27 PM Mich Talebzadeh > wrote: > >> >> Thanks for your suggestion that I take it as a workaround. Whilst this >> workaround can potentially address storage allocation issues, I was more >> interested in exploring solutions that offer a more seamless integration >> with large distributed file systems like HDFS, GCS, or S3. This would >> ensure better performance and scalability for handling larger datasets >> efficiently. >> >> >> Mich Talebzadeh, >> Technologist | Solutions Architect | Data Engineer | Generative AI >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Sat, 6 Apr 2024 at 21:28, Bjørn Jørgensen >> wrote: >> >>> You can make a PVC on K8S call it 300GB >>> >>> make a folder in yours dockerfile >>> WORKDIR /opt/spark/work-dir >>> RUN chmod g+w /opt/spark/work-dir >>> >>> start spark with adding this >>> >>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName", >>> "300gb") \ >>> >>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path", >>> "/opt/spark/work-dir") \ >>> >>> .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly", >>> "False") \ >>> >>> .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName", >>> "300gb"
Re: External Spark shuffle service for k8s
Thanks for your suggestion that I take it as a workaround. Whilst this workaround can potentially address storage allocation issues, I was more interested in exploring solutions that offer a more seamless integration with large distributed file systems like HDFS, GCS, or S3. This would ensure better performance and scalability for handling larger datasets efficiently. Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Sat, 6 Apr 2024 at 21:28, Bjørn Jørgensen wrote: > You can make a PVC on K8S call it 300GB > > make a folder in yours dockerfile > WORKDIR /opt/spark/work-dir > RUN chmod g+w /opt/spark/work-dir > > start spark with adding this > > .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName", > "300gb") \ > > .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path", > "/opt/spark/work-dir") \ > > .config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly", > "False") \ > > .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName", > "300gb") \ > > .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path", > "/opt/spark/work-dir") \ > > .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly", > "False") \ > .config("spark.local.dir", "/opt/spark/work-dir") > > > > > lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh < > mich.talebza...@gmail.com>: > >> I have seen some older references for shuffle service for k8s, >> although it is not clear they are talking about a generic shuffle >> service for k8s. >> >> Anyhow with the advent of genai and the need to allow for a larger >> volume of data, I was wondering if there has been any more work on >> this matter. Specifically larger and scalable file systems like HDFS, >> GCS , S3 etc, offer significantly larger storage capacity than local >> disks on individual worker nodes in a k8s cluster, thus allowing >> handling much larger datasets more efficiently. Also the degree of >> parallelism and fault tolerance with these files systems come into >> it. I will be interested in hearing more about any progress on this. >> >> Thanks >> . >> >> Mich Talebzadeh, >> >> Technologist | Solutions Architect | Data Engineer | Generative AI >> >> London >> United Kingdom >> >> >>view my Linkedin profile >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> Disclaimer: The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner Von Braun)". >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 >
External Spark shuffle service for k8s
I have seen some older references for shuffle service for k8s, although it is not clear they are talking about a generic shuffle service for k8s. Anyhow with the advent of genai and the need to allow for a larger volume of data, I was wondering if there has been any more work on this matter. Specifically larger and scalable file systems like HDFS, GCS , S3 etc, offer significantly larger storage capacity than local disks on individual worker nodes in a k8s cluster, thus allowing handling much larger datasets more efficiently. Also the degree of parallelism and fault tolerance with these files systems come into it. I will be interested in hearing more about any progress on this. Thanks . Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?
hm. you are getting below AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; The problem seems to be that you are using the append output mode when writing the streaming query results to Kafka. This mode is designed for scenarios where you want to append new data to an existing dataset at the sink (in this case, the "sink" topic in Kafka). However, your query involves a streaming aggregation: group by provinceId, window('createTime', '1 hour', '30 minutes'). The problem is that Spark Structured Streaming requires a watermark to ensure exactly-once processing when using aggregations with append mode. Your code already defines a watermark on the "createTime" column with a delay of 10 seconds (withWatermark("createTime", "10 seconds")). However, the error message indicates it is missing on the start column. Try adding watermark to "start" Column: Modify your code as below to include a watermark on the "start" column generated by the window function: from pyspark.sql.functions import col, from_json, explode, window, sum, watermark streaming_df = session.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "payment_msg") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \ .select("parsed_value.*") \ .withWatermark("createTime", "10 seconds") # Existing watermark on createTime *# Modified section with watermark on 'start' column* streaming_df = streaming_df.groupBy( col("provinceId"), window(col("createTime"), "1 hour", "30 minutes") ).agg( sum(col("payAmount")).alias("totalPayAmount") ).withWatermark(expr("start"), "10 seconds") # Watermark on window-generated 'start' # Rest of the code remains the same streaming_df.createOrReplaceTempView("streaming_df") spark.sql(""" SELECT window.start, window.end, provinceId, totalPayAmount FROM streaming_df ORDER BY window.start """) \ .writeStream \ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() Try and see how it goes HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 2 Apr 2024 at 22:43, Chloe He wrote: > Hi Mich, > > Thank you so much for your response. I really appreciate your help! > > You mentioned "defining the watermark using the withWatermark function on > the streaming_df before creating the temporary view” - I believe this is > what I’m doing and it’s not working for me. Here is the exact code snippet > that I’m running: > > ``` > >>> streaming_df = session.readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", "localhost:9092")\ > .option("subscribe", "payment_msg")\ > .option("startingOffsets","earliest")\ > .load()\ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value"))\ > .select("parsed_value.*")\ > .withWatermark("createTime", "10 seconds") > > >>> streaming_df.createOrReplaceTempView("streaming_df”) > > >>> spark.sql(""" > SELECT > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > FROM streaming_df > GROUP BY provinceId, window('cre
Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?
ire transformation and aggression #import this and anything else needed from pyspark.sql.functions import from_json, col, window from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType # Define the schema for the JSON data schema = ... # Replace with your schema definition # construct a streaming dataframe 'streamingDataFrame' that subscribes to topic temperature streamingDataFrame = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \ .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \ .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \ .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \ .option("subscribe", "temperature") \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) .select("parsed_value.*") .withWatermark("createTime", "10 seconds")) # Define the watermark here # Create a temporary view from the streaming DataFrame with watermark streaming_df.createOrReplaceTempView("michboy") # Execute SQL queries on the temporary view result_df = (spark.sql(""" SELECT window.start, window.end, provinceId, sum(payAmount) as totalPayAmount FROM michboy GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') ORDER BY window.start """) .writeStream .format("kafka") .option("checkpointLocation", "checkpoint") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "sink") .start()) Note that the watermark is defined using the withWatermark function on the streaming_df before creating the temporary view (michboy ). This way, the watermark information is correctly propagated to the temporary view, allowing you to execute SQL queries with window functions and aggregations on the streaming data. Note that by defining the watermark on the streaming DataFrame before creating the temporary view, Spark will recognize the watermark and allow streaming aggregations and window operations in your SQL queries. HTH Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 2 Apr 2024 at 20:24, Chloe He wrote: > Hello! > > I am attempting to write a streaming pipeline that would consume data from > a Kafka source, manipulate the data, and then write results to a downstream > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of using > the function API that Spark offers. I read a few guides on how to do this > and my understanding is that I need to create a temp view in order to > execute my raw SQL queries via spark.sql(). > > However, I’m having trouble defining watermarks on my source. It doesn’t > seem like there is a way to introduce watermark in the raw SQL that Spark > supports, so I’m using the .withWatermark() function. However, this > watermark does not work on the temp view. > > Example code: > ``` > streaming_df.select(from_json(col("value").cast("string"), > schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", > "10 seconds”) > > json_df.createOrReplaceTempView("json_df”) > > session.sql(""" > SELECT > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > FROM json_df > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > ORDER BY window.start > """)\ > .writeStream\ > .format("kafka") \ > .option("checkpointLocation", "checkpoint") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("topic", "sink") \ > .start() > ``` > This throws > ``` > AnalysisException: Append output mode not supported when there are > streaming aggregations on streaming DataFrames/DataSets without watermark; > ``` > > If I switch out the SQL query and write it in the function API instead, > everything seems to work fine. > > How can I use .sql() in conjunction with watermarks? > > Best, > Chloe >
Re: Feature article: Leveraging Generative AI with Apache Spark: Transforming Data Engineering
Sorry from this link Leveraging Generative AI with Apache Spark: Transforming Data Engineering | LinkedIn <https://www.linkedin.com/pulse/leveraging-generative-ai-apache-spark-transforming-mich-lxbte/?trackingId=aqZMBOg4O1KYRB4Una7NEg%3D%3D> Mich Talebzadeh, Technologist | Data | Generative AI | Financial Fraud London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 22 Mar 2024 at 16:16, Mich Talebzadeh wrote: > You may find this link of mine in Linkedin for the said article. We > can use Linkedin for now. > > Leveraging Generative AI with Apache Spark: Transforming Data > Engineering | LinkedIn > > > Mich Talebzadeh, > > Technologist | Data | Generative AI | Financial Fraud > > London > United Kingdom > > >view my Linkedin profile > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > Disclaimer: The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner Von Braun)". >
Feature article: Leveraging Generative AI with Apache Spark: Transforming Data Engineering
You may find this link of mine in Linkedin for the said article. We can use Linkedin for now. Leveraging Generative AI with Apache Spark: Transforming Data Engineering | LinkedIn Mich Talebzadeh, Technologist | Data | Generative AI | Financial Fraud London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re:
You can try this val kafkaReadStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", topicName) .option("startingOffsets", startingOffsetsMode) .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) .load() kafkaReadStream .writeStream .foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId)) .trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds")) .option("checkpointLocation", checkpoint_path) .start() .awaitTermination() Notice the function sendToSink The foreachBatch method ensures that the sendToSink function is called for each micro-batch, regardless of whether the DataFrame contains data or not. Let us look at that function import org.apache.spark.sql.functions._ def sendToSink(df: DataFrame, batchId: Long): Unit = { if (!df.isEmpty) { println(s"From sendToSink, batchId is $batchId, at ${java.time.LocalDateTime.now()}") // df.show(100, false) df.persist() // Write to BigQuery batch table // s.writeTableToBQ(df, "append", config.getString("MDVariables.targetDataset"), config.getString("MDVariables.targetTable")) df.unpersist() // println("wrote to DB") } else { println("DataFrame df is empty") } } If the DataFrame is empty, it prints a message indicating that the DataFrame is empty. You can of course adapt it for your case HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 21 Mar 2024 at 23:14, Рамик И wrote: > > Hi! > I want to exucute code inside forEachBatch that will trigger regardless of > whether there is data in the batch or not. > > > val kafkaReadStream = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", broker) > .option("subscribe", topicName) > .option("startingOffsets", startingOffsetsMode) > .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) > .load() > > > kafkaReadStream > .writeStream > .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds")) > .foreachBatch { > > > } > .start() > .awaitTermination() >
Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
One option that comes to my mind, is that given the cyclic nature of these types of proposals in these two forums, we should be able to use Databricks's existing knowledge sharing hub Knowledge Sharing Hub - Databricks <https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub> as well. The majority of topics will be of interest to their audience as well. In addition, they seem to invite everyone to contribute. Unless you have an overriding concern why we should not take this approach, I can enquire from Databricks community managers whether they can entertain this idea. They seem to have a well defined structure for hosting topics. Let me know your thoughts Thanks <https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub> Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 19 Mar 2024 at 08:25, Joris Billen wrote: > +1 > > > On 18 Mar 2024, at 21:53, Mich Talebzadeh > wrote: > > Well as long as it works. > > Please all check this link from Databricks and let us know your thoughts. > Will something similar work for us?. Of course Databricks have much deeper > pockets than our ASF community. Will it require moderation in our side to > block spams and nutcases. > > Knowledge Sharing Hub - Databricks > <https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub> > > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 18 Mar 2024 at 20:31, Bjørn Jørgensen > wrote: > >> something like this Spark community · GitHub >> <https://github.com/Spark-community> >> >> >> man. 18. mars 2024 kl. 17:26 skrev Parsian, Mahmoud >> : >> >>> Good idea. Will be useful >>> >>> >>> >>> +1 >>> >>> >>> >>> >>> >>> >>> >>> *From: *ashok34...@yahoo.com.INVALID >>> *Date: *Monday, March 18, 2024 at 6:36 AM >>> *To: *user @spark , Spark dev list < >>> d...@spark.apache.org>, Mich Talebzadeh >>> *Cc: *Matei Zaharia >>> *Subject: *Re: A proposal for creating a Knowledge Sharing Hub for >>> Apache Spark Community >>> >>> External message, be mindful when clicking links or attachments >>> >>> >>> >>> Good idea. Will be useful >>> >>> >>> >>> +1 >>> >>> >>> >>> On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>> >>> >>> >>> >>> Some of you may be aware that Databricks community Home | Databricks >>> >>> have just launched a knowledge sharing hub. I thought it would be a >>> >>> good idea for the Apache Spark user group to have the same, especially >>> >>> for repeat questions on Spark core, Spark SQL, Spark Structured >>> >>> Streaming, Spark Mlib and so forth. >>> >>> >>> >>> Apache Spark user and dev groups have been around for a good while. >>> >>> They are serving their purpose . We went through creating a slack >>> >>> community that managed to create more more heat than light.. This is >>> >>> what Databricks community came up with and I quote >>> >>> >>> >>> "Knowledge Sharing Hub >>> >>> Dive into a collaborative space where members like YOU can ex
Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
OK thanks for the update. What does officially blessed signify here? Can we have and run it as a sister site? The reason this comes to my mind is that the interested parties should have easy access to this site (from ISUG Spark sites) as a reference repository. I guess the advice would be that the information (topics) are provided as best efforts and cannot be guaranteed. Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 18 Mar 2024 at 21:04, Reynold Xin wrote: > One of the problem in the past when something like this was brought up was > that the ASF couldn't have officially blessed venues beyond the already > approved ones. So that's something to look into. > > Now of course you are welcome to run unofficial things unblessed as long > as they follow trademark rules. > > > > On Mon, Mar 18, 2024 at 1:53 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Well as long as it works. >> >> Please all check this link from Databricks and let us know your thoughts. >> Will something similar work for us?. Of course Databricks have much deeper >> pockets than our ASF community. Will it require moderation in our side to >> block spams and nutcases. >> >> Knowledge Sharing Hub - Databricks >> <https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub> >> >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Mon, 18 Mar 2024 at 20:31, Bjørn Jørgensen >> wrote: >> >>> something like this Spark community · GitHub >>> <https://github.com/Spark-community> >>> >>> >>> man. 18. mars 2024 kl. 17:26 skrev Parsian, Mahmoud < >>> mpars...@illumina.com.invalid>: >>> >>>> Good idea. Will be useful >>>> >>>> >>>> >>>> +1 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *From: *ashok34...@yahoo.com.INVALID >>>> *Date: *Monday, March 18, 2024 at 6:36 AM >>>> *To: *user @spark , Spark dev list < >>>> d...@spark.apache.org>, Mich Talebzadeh >>>> *Cc: *Matei Zaharia >>>> *Subject: *Re: A proposal for creating a Knowledge Sharing Hub for >>>> Apache Spark Community >>>> >>>> External message, be mindful when clicking links or attachments >>>> >>>> >>>> >>>> Good idea. Will be useful >>>> >>>> >>>> >>>> +1 >>>> >>>> >>>> >>>> On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>> >>>> >>>> >>>> >>>> Some of you may be aware that Databricks community Home | Databricks >>>> >>>> have just launched a knowledge sharing hub. I thought it would be a >>>> >>>> good idea for the Apache Spark user group to have the same, especially >>>> >>>> for repeat questions on Spark core, Spark SQL, Spark Structured >>>> >>>> Streaming, Spark Mlib and so forth. >>>> >>>> >>>> >>>> Apache Spark user and dev groups have been around for a good while. >>>> >>>> They are serving their purpose . We went through c
Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
Well as long as it works. Please all check this link from Databricks and let us know your thoughts. Will something similar work for us?. Of course Databricks have much deeper pockets than our ASF community. Will it require moderation in our side to block spams and nutcases. Knowledge Sharing Hub - Databricks <https://community.databricks.com/t5/knowledge-sharing-hub/bd-p/Knowledge-Sharing-Hub> Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 18 Mar 2024 at 20:31, Bjørn Jørgensen wrote: > something like this Spark community · GitHub > <https://github.com/Spark-community> > > > man. 18. mars 2024 kl. 17:26 skrev Parsian, Mahmoud > : > >> Good idea. Will be useful >> >> >> >> +1 >> >> >> >> >> >> >> >> *From: *ashok34...@yahoo.com.INVALID >> *Date: *Monday, March 18, 2024 at 6:36 AM >> *To: *user @spark , Spark dev list < >> d...@spark.apache.org>, Mich Talebzadeh >> *Cc: *Matei Zaharia >> *Subject: *Re: A proposal for creating a Knowledge Sharing Hub for >> Apache Spark Community >> >> External message, be mindful when clicking links or attachments >> >> >> >> Good idea. Will be useful >> >> >> >> +1 >> >> >> >> On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >> >> >> >> >> Some of you may be aware that Databricks community Home | Databricks >> >> have just launched a knowledge sharing hub. I thought it would be a >> >> good idea for the Apache Spark user group to have the same, especially >> >> for repeat questions on Spark core, Spark SQL, Spark Structured >> >> Streaming, Spark Mlib and so forth. >> >> >> >> Apache Spark user and dev groups have been around for a good while. >> >> They are serving their purpose . We went through creating a slack >> >> community that managed to create more more heat than light.. This is >> >> what Databricks community came up with and I quote >> >> >> >> "Knowledge Sharing Hub >> >> Dive into a collaborative space where members like YOU can exchange >> >> knowledge, tips, and best practices. Join the conversation today and >> >> unlock a wealth of collective wisdom to enhance your experience and >> >> drive success." >> >> >> >> I don't know the logistics of setting it up.but I am sure that should >> >> not be that difficult. If anyone is supportive of this proposal, let >> >> the usual +1, 0, -1 decide >> >> >> >> HTH >> >> >> >> Mich Talebzadeh, >> >> Dad | Technologist | Solutions Architect | Engineer >> >> London >> >> United Kingdom >> >> >> >> >> >> view my Linkedin profile >> >> >> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!HrbR-XT-OQ!Wu9fFP8RFJW2N_YUvwl9yctGHxtM-CFPe6McqOJDrxGBjIaRoF8vRwpjT9WzHojwI2R09Nbg8YE9ggB4FtocU8cQFw$> >> >> >> >> >> >> >> >> Disclaimer: The information provided is correct to the best of my >> >> knowledge but of course cannot be guaranteed . It is essential to note >> >> that, as with any advice, quote "one test result is worth one-thousand >> >> expert opinions (Werner Von Braun)". >> >> >> >> - >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> > > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 >
Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
+1 for me Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 18 Mar 2024 at 16:23, Parsian, Mahmoud wrote: > Good idea. Will be useful > > > > +1 > > > > > > > > *From: *ashok34...@yahoo.com.INVALID > *Date: *Monday, March 18, 2024 at 6:36 AM > *To: *user @spark , Spark dev list < > d...@spark.apache.org>, Mich Talebzadeh > *Cc: *Matei Zaharia > *Subject: *Re: A proposal for creating a Knowledge Sharing Hub for Apache > Spark Community > > External message, be mindful when clicking links or attachments > > > > Good idea. Will be useful > > > > +1 > > > > On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > > > > > > Some of you may be aware that Databricks community Home | Databricks > > have just launched a knowledge sharing hub. I thought it would be a > > good idea for the Apache Spark user group to have the same, especially > > for repeat questions on Spark core, Spark SQL, Spark Structured > > Streaming, Spark Mlib and so forth. > > > > Apache Spark user and dev groups have been around for a good while. > > They are serving their purpose . We went through creating a slack > > community that managed to create more more heat than light.. This is > > what Databricks community came up with and I quote > > > > "Knowledge Sharing Hub > > Dive into a collaborative space where members like YOU can exchange > > knowledge, tips, and best practices. Join the conversation today and > > unlock a wealth of collective wisdom to enhance your experience and > > drive success." > > > > I don't know the logistics of setting it up.but I am sure that should > > not be that difficult. If anyone is supportive of this proposal, let > > the usual +1, 0, -1 decide > > > > HTH > > > > Mich Talebzadeh, > > Dad | Technologist | Solutions Architect | Engineer > > London > > United Kingdom > > > > > > view my Linkedin profile > > > > > > https://en.everybodywiki.com/Mich_Talebzadeh > <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!HrbR-XT-OQ!Wu9fFP8RFJW2N_YUvwl9yctGHxtM-CFPe6McqOJDrxGBjIaRoF8vRwpjT9WzHojwI2R09Nbg8YE9ggB4FtocU8cQFw$> > > > > > > > > Disclaimer: The information provided is correct to the best of my > > knowledge but of course cannot be guaranteed . It is essential to note > > that, as with any advice, quote "one test result is worth one-thousand > > expert opinions (Werner Von Braun)". > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: [GraphX]: Prevent recomputation of DAG
Hi, I must admit I don't know much about this Fruchterman-Reingold (call it FR) visualization using GraphX and Kubernetes..But you are suggesting this slowdown issue starts after the second iteration, and caching/persisting the graph after each iteration does not help. FR involves many computations between vertex pairs. In MapReduce (or shuffle) steps, Data might be shuffled across the network, impacting performance for large graphs. The usual steps to verify this is through Spark UI in Stages, SQL and execute tabbs, You will see the time taken for each step and the amount of read/write etc. Also repeatedly creating and destroying GraphX graphs in each iteration may lead to garbage collection (GC) overhead.So you should consider r profiling your application to identify bottlenecks and pinpoint which part of the code is causing the slowdown. As I mentioned Spark offers profiling tools like Spark UI or third-party libraries.for this purpose. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". On Sun, 17 Mar 2024 at 18:45, Marek Berith wrote: > > Dear community, > for my diploma thesis, we are implementing a distributed version of > Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our > solution is a backend that continously computes new positions of vertices in a > graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an > iterative algorithm, meaning that in each iteration repulsive and attractive > forces between vertices are computed and then new positions of vertices based > on those forces are computed. Graph vertices and edges are stored in a GraphX > graph structure. Forces between vertices are computed using MapReduce(between > each pair of vertices) and aggregateMessages(for vertices connected via > edges). After an iteration of the algorithm, the recomputed positions from the > RDD are serialized using collect and sent to the RabbitMQ queue. > > Here comes the issue. The first two iterations of the algorithm seem to be > quick, but at the third iteration, the algorithm is very slow until it reaches > a point at which it cannot finish an iteration in real time. It seems like > caching of the graph may be an issue, because if we serialize the graph after > each iteration in an array and create new graph from the array in the new > iteration, we get a constant usage of memory and each iteration takes the same > amount of time. We had already tried to cache/persist/checkpoint the graph > after each iteration but it didn't help, so maybe we are doing something > wrong. We do not think that serializing the graph into an array should be the > solution for such a complex library like Apache Spark. I'm also not very > confident how this fix will affect performance for large graphs or in parallel > environment. We are attaching a short example of code that shows doing an > iteration of the algorithm, input and output example. > > We would appreciate if you could help us fix this issue or give us any > meaningful ideas, as we had tried everything that came to mind. > > We look forward to your reply. > Thank you, Marek Berith > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
A proposal for creating a Knowledge Sharing Hub for Apache Spark Community
Some of you may be aware that Databricks community Home | Databricks have just launched a knowledge sharing hub. I thought it would be a good idea for the Apache Spark user group to have the same, especially for repeat questions on Spark core, Spark SQL, Spark Structured Streaming, Spark Mlib and so forth. Apache Spark user and dev groups have been around for a good while. They are serving their purpose . We went through creating a slack community that managed to create more more heat than light.. This is what Databricks community came up with and I quote "Knowledge Sharing Hub Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success." I don't know the logistics of setting it up.but I am sure that should not be that difficult. If anyone is supportive of this proposal, let the usual +1, 0, -1 decide HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: pyspark - Where are Dataframes created from Python objects stored?
Yes, transformations are indeed executed on the worker nodes, but they are only performed when necessary, usually when an action is called. This lazy evaluation helps in optimizing the execution of Spark jobs by allowing Spark to optimize the execution plan and perform optimizations such as pipelining transformations and removing unnecessary computations. "I may need something like that for synthetic data for testing. Any way to do that ?" Have a look at this. https://github.com/joke2k/faker <https://github.com/joke2k/faker>HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 18 Mar 2024 at 07:16, Sreyan Chakravarty wrote: > > On Fri, Mar 15, 2024 at 3:10 AM Mich Talebzadeh > wrote: > >> >> No Data Transfer During Creation: --> Data transfer occurs only when an >> action is triggered. >> Distributed Processing: --> DataFrames are distributed for parallel >> execution, not stored entirely on the driver node. >> Lazy Evaluation Optimization: --> Delaying data transfer until necessary >> enhances performance. >> Shuffle vs. Partitioning: --> Data movement during partitioning is not >> considered a shuffle in Spark terminology. >> Shuffles involve more complex data rearrangement. >> > > So just to be clear the transformations are always executed on the worker > node but it is just transferred until an action on the dataframe is > triggered. > > Am I correct ? > > If so, then how do I generate a large dataset ? > > I may need something like that for synthetic data for testing. Any way to > do that ? > > > -- > Regards, > Sreyan Chakravarty >
Python library that generates fake data using Faker
I came across this a few weeks ago. II a nutshell you can use it for generating test data and other scenarios where you need realistic-looking but not necessarily real data. With so many regulations and copyrights etc it is a viable alternative. I used it to generate 1000 lines of mixed true and fraudulent transactions to build a machine learning model to detect fraudulent transactions using PySpark's MLlib library. You can install it via pip install Faker Details from https://github.com/joke2k/faker HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: pyspark - Where are Dataframes created from Python objects stored?
Hi, When you create a DataFrame from Python objects using spark.createDataFrame, here it goes: *Initial Local Creation:* The DataFrame is initially created in the memory of the driver node. The data is not yet distributed to executors at this point. *The role of lazy Evaluation:* Spark applies lazy evaluation, *meaning transformations are not executed immediately*. It constructs a logical plan describing the operations, but data movement does not occur yet. *Action Trigger:* When you initiate an action (things like show(), collect(), etc), Spark triggers the execution. *When partitioning and distribution come in:Spark partitions the DataFrame into logical chunks for parallel processing*. It divides the data based on a partitioning scheme (default is hash partitioning). Each partition is sent to different executor nodes for distributed execution. This stage involves data transfer across the cluster, but it is not that expensive shuffle you have heard of. Shuffles happen within repartitioning or certain join operations. *Storage on Executors:* Executors receive their assigned partitions and store them in their memory. If memory is limited, Spark spills partitions to disk. look at stages tab in UI (4040) *In summary:* No Data Transfer During Creation: --> Data transfer occurs only when an action is triggered. Distributed Processing: --> DataFrames are distributed for parallel execution, not stored entirely on the driver node. Lazy Evaluation Optimization: --> Delaying data transfer until necessary enhances performance. Shuffle vs. Partitioning: --> Data movement during partitioning is not considered a shuffle in Spark terminology. Shuffles involve more complex data rearrangement. *Considerations: * Large DataFrames: For very large DataFrames - manage memory carefully to avoid out-of-memory errors. Consider options like: - Increasing executor memory - Using partitioning strategies to optimize memory usage - Employing techniques like checkpointing to persistent storage (hard disks) or caching for memory efficiency - You can get additional info from Spark UI default port 4040 tabs like SQL and executors - Spark uses Catalyst optimiser for efficient execution plans. df.explain("extended") shows both logical and physical plans HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 14 Mar 2024 at 19:46, Sreyan Chakravarty wrote: > I am trying to understand Spark Architecture. > > For Dataframes that are created from python objects ie. that are *created > in memory where are they stored ?* > > Take following example: > > from pyspark.sql import Rowimport datetime > courses = [ > { > 'course_id': 1, > 'course_title': 'Mastering Python', > 'course_published_dt': datetime.date(2021, 1, 14), > 'is_active': True, > 'last_updated_ts': datetime.datetime(2021, 2, 18, 16, 57, 25) > } > > ] > > > courses_df = spark.createDataFrame([Row(**course) for course in courses]) > > > Where is the dataframe stored when I invoke the call: > > courses_df = spark.createDataFrame([Row(**course) for course in courses]) > > Does it: > >1. Send the data to a random executor ? > > >- Does this mean this counts as a shuffle ? > > >1. Or does it stay on the driver node ? > > >- That does not make sense when the dataframe grows large. > > > -- > Regards, > Sreyan Chakravarty >
Re: Bug in How to Monitor Streaming Queries in PySpark
Thanks for the clarification. That makes sense.. In the code below, we can see def onQueryProgress(self, event): print("onQueryProgress") # Access micro-batch data microbatch_data = event.progress #print("microbatch_data received") # Check if data is received #print(microbatch_data) print(f"Type of microbatch_data is {type(microbatch_data)}") #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond") incorrect processedRowsPerSecond = microbatch_data.processedRowsPerSecond if processedRowsPerSecond is not None: # Check if value exists print("processedRowsPerSecond retrieved") print(f"Processed rows per second is -> {processedRowsPerSecond}") else: print("processedRowsPerSecond not retrieved!") The output onQueryProgress Type of microbatch_data is processedRowsPerSecond retrieved Processed rows per second is -> 2.570694087403599 So we are dealing with the attribute of the class and NOT the dictionary. The line (processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")) fails because it uses the .get() method, while the second line (processedRowsPerSecond = microbatch_data.processedRowsPerSecond) accesses the attribute directly. In short, they need to ensure that that event.progress* returns a dictionary * Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 12 Mar 2024 at 04:04, 刘唯 wrote: > Oh I see why the confusion. > > microbatch_data = event.progress > > means that microbatch_data is a StreamingQueryProgress instance, it's not > a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`, > instead of the `get` method which is used for dictionaries. > > But weirdly, for query.lastProgress and query.recentProgress, they should > return StreamingQueryProgress but instead they returned a dict. So the > `get` method works there. > > I think PySpark should improve on this part. > > Mich Talebzadeh 于2024年3月11日周一 05:51写道: > >> Hi, >> >> Thank you for your advice >> >> This is the amended code >> >>def onQueryProgress(self, event): >> print("onQueryProgress") >> # Access micro-batch data >> microbatch_data = event.progress >> #print("microbatch_data received") # Check if data is received >> #print(microbatch_data) >> #processed_rows_per_second = >> microbatch_data.get("processed_rows_per_second") >> processed_rows_per_second = >> microbatch_data.get("processedRowsPerSecond") >> print("CPC", processed_rows_per_second) >> if processed_rows_per_second is not None: # Check if value exists >>print("ocessed_rows_per_second retrieved") >>print(f"Processed rows per second: >> {processed_rows_per_second}") >> else: >>print("processed_rows_per_second not retrieved!") >> >> This is the output >> >> onQueryStarted >> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started! >> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". >> SLF4J: Defaulting to no-operation MDCAdapter implementation. >> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for >> further details. >> --- >> Batch: 0 >> --- >> +---+-+---+---+ >> |key|doubled_value|op_type|op_time| >> +---+-+---+---+ >> +---+-+---+---+ >> >> onQueryProgress >> --- >> Batch: 1 >> --- >> ++-+---++ >> | key|doubled_value|op_type| op_time| >> ++-+---++ >> |a960f663-d13a-49c...|0| 1|2024-03-11 12:17:...| >> +-
Re: Bug in How to Monitor Streaming Queries in PySpark
Hi, Thank you for your advice This is the amended code def onQueryProgress(self, event): print("onQueryProgress") # Access micro-batch data microbatch_data = event.progress #print("microbatch_data received") # Check if data is received #print(microbatch_data) #processed_rows_per_second = microbatch_data.get("processed_rows_per_second") processed_rows_per_second = microbatch_data.get("processedRowsPerSecond") print("CPC", processed_rows_per_second) if processed_rows_per_second is not None: # Check if value exists print("ocessed_rows_per_second retrieved") print(f"Processed rows per second: {processed_rows_per_second}") else: print("processed_rows_per_second not retrieved!") This is the output onQueryStarted 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started! SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. --- Batch: 0 --- +---+-+---+---+ |key|doubled_value|op_type|op_time| +---+-+---+---+ +---+-+---+---+ onQueryProgress --- Batch: 1 --- ++-+---++ | key|doubled_value|op_type| op_time| ++-+---++ |a960f663-d13a-49c...|0| 1|2024-03-11 12:17:...| ++-+---++ onQueryProgress --- Batch: 2 --- ++-+---++ | key|doubled_value|op_type| op_time| ++-+---++ |a960f663-d13a-49c...|2| 1|2024-03-11 12:17:...| ++-+---+----+ I am afraid it is not working. Not even printing anything Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 11 Mar 2024 at 05:07, 刘唯 wrote: > *now -> not > > 刘唯 于2024年3月10日周日 22:04写道: > >> Have you tried using microbatch_data.get("processedRowsPerSecond")? >> Camel case now snake case >> >> Mich Talebzadeh 于2024年3月10日周日 11:46写道: >> >>> >>> There is a paper from Databricks on this subject >>> >>> >>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html >>> >>> But having tested it, there seems to be a bug there that I reported to >>> Databricks forum as well (in answer to a user question) >>> >>> I have come to a conclusion that this is a bug. In general there is a >>> bug in obtaining individual values from the dictionary. For example, a bug >>> in the way Spark Streaming is populating the processed_rows_per_second key >>> within the microbatch_data -> microbatch_data = event.progres dictionary or >>> any other key. I have explored various debugging steps, and even though the >>> key seems to exist, the value might not be getting set. Note that the >>> dictionary itself prints the elements correctly. This is with regard to >>> method onQueryProgress(self, event) in class >>> MyListener(StreamingQueryListener): >>> >>> For example with print(microbatch_data), you get all printed as below >>> >>> onQueryProgress >>> microbatch_data received >>> { >>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", >>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", >>> "name" : null, >>> "timestamp" : "2024-03-10T09:21:27.233Z", >>> "batchId" : 21, >>> "numInputRows" : 1, >>> "inputRowsPerSecond" : 10
Bug in How to Monitor Streaming Queries in PySpark
There is a paper from Databricks on this subject https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html But having tested it, there seems to be a bug there that I reported to Databricks forum as well (in answer to a user question) I have come to a conclusion that this is a bug. In general there is a bug in obtaining individual values from the dictionary. For example, a bug in the way Spark Streaming is populating the processed_rows_per_second key within the microbatch_data -> microbatch_data = event.progres dictionary or any other key. I have explored various debugging steps, and even though the key seems to exist, the value might not be getting set. Note that the dictionary itself prints the elements correctly. This is with regard to method onQueryProgress(self, event) in class MyListener(StreamingQueryListener): For example with print(microbatch_data), you get all printed as below onQueryProgress microbatch_data received { "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", "name" : null, "timestamp" : "2024-03-10T09:21:27.233Z", "batchId" : 21, "numInputRows" : 1, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 5.347593582887701, "durationMs" : { "addBatch" : 37, "commitOffsets" : 41, "getBatch" : 0, "latestOffset" : 0, "queryPlanning" : 5, "triggerExecution" : 187, "walCommit" : 104 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default", "startOffset" : 20, "endOffset" : 21, "latestOffset" : 21, "numInputRows" : 1, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 5.347593582887701 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c", "numOutputRows" : 1 } } However, the observed behaviour (i.e. processed_rows_per_second is either None or not being updated correctly). The spark version I used for my test is 3.4 Sample code uses format=rate for simulating a streaming process. You can test the code yourself, all in one from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener from pyspark.sql.functions import col, round, current_timestamp, lit import uuid def process_data(df): processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\ withColumn("doubled_value", col("value") * 2). \ withColumn("op_type", lit(1)). \ withColumn("op_time", current_timestamp()) return processed_df # Create a Spark session appName = "testListener" spark = SparkSession.builder.appName(appName).getOrCreate() # Define the schema for the streaming data schema = "key string timestamp timestamp, value long" # Define my listener. class MyListener(StreamingQueryListener): def onQueryStarted(self, event): print("onQueryStarted") print(f"'{event.name}' [{event.id}] got started!") def onQueryProgress(self, event): print("onQueryProgress") # Access micro-batch data microbatch_data = event.progress print("microbatch_data received") # Check if data is received print(microbatch_data) processed_rows_per_second = microbatch_data.get("processed_rows_per_second") if processed_rows_per_second is not None: # Check if value exists print("processed_rows_per_second retrieved") print(f"Processed rows per second: {processed_rows_per_second}") else: print("processed_rows_per_second not retrieved!") def onQueryTerminated(self, event): print("onQueryTerminated") if event.exception: print(f"Query terminated with exception: {event.exception}") else: print("Query successfully terminated.") # Add my listener. listener_instance = MyListener() spark.streams.addListener(listener_instance) # Create a streaming DataFrame with the rate source streaming_df = ( spark.readStream .format("rate") .option("rowsPerSecond", 1) .load() ) # Apply processing function to the streaming DataFrame processed_streaming_df = process_data(streaming_df) # Define the output sink (for example, console sink) query = ( processed_streaming_df.select( \ col("key").alias("key") \ , col("doubled_va
Re: Creating remote tables using PySpark
The error message shows a mismatch between the configured warehouse directory and the actual location accessible by the Spark application running in the container.. You have configured the SparkSession with spark.sql.warehouse.dir="file:/data/hive/warehouse". This tells Spark where to store temporary and intermediate data during operations like saving DataFrames as tables. When running the application remotely, the container cannot access the directory /data/hive/warehouseon your local machine. This directory path may exist on the container's host system, but not within the container itself.. You can set spark.sql.warehouse.dirto a directory within the container's file system. This directory should be accessible by the Spark application running inside the container. For example: spark = SparkSession.builder \ .appName("testme") \ .master("spark://192.168.1.245:7077") \ .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \ # Change this to anything suitable within the container .config("hive.metastore.uris","thrift://192.168.1.245:9083") \ .enableHiveSupport() \ .getOrCreate() Use spark.conf.get("spark.sql.warehouse.dir") to print the configured warehouse directory after creating the SparkSession to confirm all is OK HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 8 Mar 2024 at 06:01, Tom Barber wrote: > Okay interesting, maybe my assumption was incorrect, although I'm still > confused. > > I tried to mount a central mount point that would be the same on my local > machine and the container. Same error although I moved the path to > /tmp/hive/data/hive/ but when I rerun the test code to save a table, > the complaint is still for > > Warehouse Dir: file:/tmp/hive/data/hive/warehouse > Metastore URIs: thrift://192.168.1.245:9083 > Warehouse Dir: file:/tmp/hive/data/hive/warehouse > Metastore URIs: thrift://192.168.1.245:9083 > ERROR FileOutputCommitter: Mkdirs failed to create > file:/data/hive/warehouse/input.db/accounts_20240307_232110_1_0_6_post21_g4fdc321_d20240307/_temporary/0 > > so what is /data/hive even referring to when I print out the spark conf > values and neither now refer to /data/hive/ > > On Thu, Mar 7, 2024 at 9:49 PM Tom Barber wrote: > >> Wonder if anyone can just sort my brain out here as to whats possible or >> not. >> >> I have a container running Spark, with Hive and a ThriftServer. I want to >> run code against it remotely. >> >> If I take something simple like this >> >> from pyspark.sql import SparkSession >> from pyspark.sql.types import StructType, StructField, IntegerType, >> StringType >> >> # Initialize SparkSession >> spark = SparkSession.builder \ >> .appName("ShowDatabases") \ >> .master("spark://192.168.1.245:7077") \ >> .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \ >> .config("hive.metastore.uris","thrift://192.168.1.245:9083")\ >> .enableHiveSupport() \ >> .getOrCreate() >> >> # Define schema of the DataFrame >> schema = StructType([ >> StructField("id", IntegerType(), True), >> StructField("name", StringType(), True) >> ]) >> >> # Data to be converted into a DataFrame >> data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")] >> >> # Create DataFrame >> df = spark.createDataFrame(data, schema) >> >> # Show the DataFrame (optional, for verification) >> df.show() >> >> # Save the DataFrame to a table named "my_table" >> df.write.mode("overwrite").saveAsTable("my_table") >> >> # Stop the SparkSession >> spark.stop() >> >> When I run it in the container it runs fine, but when I run it remotely >> it says: >> >> : java.io.FileNotFoundException: File >> file:/data/hive/warehouse/my_table/_temporary/0 does not exist >> at >> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597) >>
Re: It seems --py-files only takes the first two arguments. Can someone please confirm?
Sorry I forgot. This below is catered for yarn mode if your application code primarily consists of Python files and does not require a separate virtual environment with specific dependencies, you can use the --py-files argument in spark-submit spark-submit --verbose \ --master yarn \ --deploy-mode cluster \ --name $APPNAME \ --driver-memory 1g \ # Adjust memory as needed --executor-memory 1g \ # Adjust memory as needed --num-executors 2 \ # Adjust executors as needed -*-py-files ${build_directory}/source_code.zip \* $CODE_DIRECTORY_CLOUD/my_application_entry_point.py # Path to your main application script For application code with a separate virtual environment) If your application code has specific dependencies that you manage in a separate virtual environment, you can leverage the --conf spark.yarn.dist.archives argument. spark-submit --verbose \ -master yarn \ -deploy-mode cluster \ --name $APPNAME \ --driver-memory 1g \ # Adjust memory as needed --executor-memory 1g \ # Adjust memory as needed --num-executors 2 \ # Adjust executors as needed- *-conf "spark.yarn.dist.archives"=${pyspark_venv}.tar.gz#pyspark_venv \* $CODE_DIRECTORY_CLOUD/my_application_entry_point.py # Path to your main application script Explanation: - --conf "spark.yarn.dist.archives"=${pyspark_venv}.tar.gz#pyspark_venv: This configures Spark to distribute your virtual environment archive ( pyspark_venv.tar.gz) to the Yarn cluster nodes. The #pyspark_venv part defines a symbolic link name within the container. - You do not need --py-fileshere because the virtual environment archive will contain all necessary dependencies. Choosing the best approach: The choice depends on your project setup: - No Separate Virtual Environment: Use --py-files if your application code consists mainly of Python files and doesn't require a separate virtual environment. - Separate Virtual Environment: Use --conf spark.yarn.dist.archives if you manage dependencies in a separate virtual environment archive. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 5 Mar 2024 at 17:28, Mich Talebzadeh wrote: > > > > I use zip file personally and pass the application name (in your case > main.py) as the last input line like below > > APPLICATION is your main.py. It does not need to be called main.py. It > could be anything like testpython.py > > CODE_DIRECTORY_CLOUD="gs://spark-on-k8s/codes" ## replace gs with s3 > # zip needs to be done at root directory of code > zip -rq ${source_code}.zip ${source_code} > gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD ## replace gsutil with > aws s3 > gsutil cp /${source_code}/src/${APPLICATION} $CODE_DIRECTORY_CLOUD > > your spark job > > spark-submit --verbose \ >--properties-file ${property_file} \ >--master k8s://https://$KUBERNETES_MASTER_IP:443 \ >--deploy-mode cluster \ >--name $APPNAME \ > * --py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \* >--conf spark.kubernetes.namespace=$NAMESPACE \ >--conf spark.network.timeout=300 \ >--conf spark.kubernetes.allocation.batch.size=3 \ >--conf spark.kubernetes.allocation.batch.delay=1 \ >--conf spark.kubernetes.driver.container.image=${IMAGEDRIVER} \ >--conf spark.kubernetes.executor.container.image=${IMAGEDRIVER} > \ >--conf spark.kubernetes.driver.pod.name=$APPNAME \ >--conf > spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ >--conf > spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ >--conf > spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" > \ >--conf spark.dynamicAllocation.enabled=true \ >--conf spark.dynamicAllocation.shuffleTracking.enabled=true \ >--conf spark.dynamicAllocation.shuffleTracking.timeout=20s \ >--conf spark.dynamicAllocation.executorIdleTimeout=30s \ >--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=40s \ >--conf spark.dynamicAllocation.minExecutors=0 \ >--conf spark.dyn
Re: It seems --py-files only takes the first two arguments. Can someone please confirm?
I use zip file personally and pass the application name (in your case main.py) as the last input line like below APPLICATION is your main.py. It does not need to be called main.py. It could be anything like testpython.py CODE_DIRECTORY_CLOUD="gs://spark-on-k8s/codes" ## replace gs with s3 # zip needs to be done at root directory of code zip -rq ${source_code}.zip ${source_code} gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD ## replace gsutil with aws s3 gsutil cp /${source_code}/src/${APPLICATION} $CODE_DIRECTORY_CLOUD your spark job spark-submit --verbose \ --properties-file ${property_file} \ --master k8s://https://$KUBERNETES_MASTER_IP:443 \ --deploy-mode cluster \ --name $APPNAME \ * --py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \* --conf spark.kubernetes.namespace=$NAMESPACE \ --conf spark.network.timeout=300 \ --conf spark.kubernetes.allocation.batch.size=3 \ --conf spark.kubernetes.allocation.batch.delay=1 \ --conf spark.kubernetes.driver.container.image=${IMAGEDRIVER} \ --conf spark.kubernetes.executor.container.image=${IMAGEDRIVER} \ --conf spark.kubernetes.driver.pod.name=$APPNAME \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ --conf spark.dynamicAllocation.shuffleTracking.timeout=20s \ --conf spark.dynamicAllocation.executorIdleTimeout=30s \ --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=40s \ --conf spark.dynamicAllocation.minExecutors=0 \ --conf spark.dynamicAllocation.maxExecutors=20 \ --conf spark.driver.cores=3 \ --conf spark.executor.cores=3 \ --conf spark.driver.memory=1024m \ --conf spark.executor.memory=1024m \ * $CODE_DIRECTORY_CLOUD/${APPLICATION}* HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 5 Mar 2024 at 16:15, Pedro, Chuck wrote: > Hi all, > > > > I am working in Databricks. When I submit a spark job with the –py-files > argument, it seems the first two are read in but the third is ignored. > > > > "--py-files", > > "s3://some_path/appl_src.py", > > "s3://some_path/main.py", > > "s3://a_different_path/common.py", > > > > I can see the first two acknowledged in the Log4j but not the third. > > > > 24/02/28 21:41:00 INFO Utils: Fetching s3://some_path/appl_src.py to ... > > 24/02/28 21:41:00 INFO Utils: Fetching s3://some_path/main.py to ... > > > > As a result, the job fails because appl_src.py is importing from common.py > but can’t find it. > > > > I posted to both Databricks community here > <https://community.databricks.com/t5/data-engineering/spark-submit-not-reading-one-of-my-py-files-arguments/m-p/62361#M31953> > and Stack Overflow here > <https://stackoverflow.com/questions/78077822/databricks-spark-submit-getting-error-with-py-files> > but did not get a response. > > > > I’m aware that we could use a .zip file, so I tried zipping the first two > arguments but then got a totally different error: > > > > “Exception in thread "main" org.apache.spark.SparkException: Failed to get > main class in JAR with error 'null'. Please specify one with --class.” > > > > Basically I just want the application code in one s3 path and a “common” > utilities package in another path. Thanks for your help. > > > > > > > > *Kind regards,* > > Chuck Pedro > > > > > -- > This message (including any attachments) may contain confidential, > proprietary, privileged and/or private information. The information is > intended to be for the use of the individual or entity designated above. If > you are not the intended recipient of this message, please notify the > sender immediately, and delete the message and any attachments. Any > disclosure, reproduction, distribution or other use of this message or any > attachments by an individual or entity other than the intended recipient is > prohibited. > > TRVDiscDefault::1201 >
Working with a text file that is both compressed by bz2 followed by zip in PySpark
I have downloaded Amazon reviews for sentiment analysis from here. The file is not particularly large (just over 500MB) but comes in the following format test.ft.txt.bz2.zip So it is a text file that is compressed by bz2 followed by zip. Now I like tro do all these operations in PySpark. In PySpark a file cannot have both .bz2 and .zip simultaneously.. The way I do it is to place the downloaded file in a local directory. Then just do some operations that are simple but messy.. I try to unzip the file using zipfile package. This works with bash stype filename. as opposed to python style filename "file:///.." This necessitates using different style, one for OS type for zip and the other Python style to read bz2 file directory into df in Pyspark import os import zipfile data_path = "file:///d4T/hduser/sentiments/" input_file_path = os.path.join(data_path, "test.ft.txt.bz2") output_file_path = os.path.join(data_path, "review_text_file") dir_name = "/d4T/hduser/sentiments/" zipped_file=os.path.join(dir_name, "test.ft.txt.bz2.zip") bz2_file=os.path.join(dir_name, "test.ft.txt.bz2") try: # Unzip the file with zipfile.ZipFile(zipped_file, 'r') as zip_ref: zip_ref.extractall(os.path.dirname(bz2_file)) # Now bz2_file should contain the path to the unzipped file print(f"Unzipped file: {bz2_file}") except Exception as e: print(f"Error during unzipping: {str(e)}") # Load the bz2 file into a DataFrame df = spark.read.text(input_file_path) # Remove the '__label__1' and '__label__2' prefixes df = df.withColumn("review_text", expr("regexp_replace(value, '__label__[12] ', '')")) Then the rest is just spark-ml Once I finished I remove the bz2 file to cleanup if os.path.exists(bz2_file): # Check if bz2 file exists try: os.remove(bz2_file) print(f"Successfully deleted {bz2_file}") except OSError as e: print(f"Error deleting {bz2_file}: {e}") else: print(f"bz2 file {bz2_file} could not be found") My question is can these operations be done more efficiently in Pyspark itself ideally with one df operation reading the original file (.bz2.zip)? Thanks Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Re: pyspark dataframe join with two different data type
This is what you want, how to join two DFs with a string column in one and an array of strings in the other, keeping only rows where the string is present in the array. from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.functions import expr spark = SparkSession.builder.appName("joins").getOrCreate() data1 = [Row(combined_id=[1, 2, 3]) # this one has a column combined_id as an array of integers data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with single integers df1 = spark.createDataFrame(data1) df2 = spark.createDataFrame(data2) df1.printSchema() df2.printSchema() # Perform the join with array_contains. It takes two arguments: an array and a value. It returns True if the value exists as an element within the array, otherwise False. joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)")) # Show the result joined_df.show() root |-- combined_id: array (nullable = true) ||-- element: long (containsNull = true) root |-- mr_id: long (nullable = true) +---+-+ |combined_id|mr_id| +---+-+ | [1, 2, 3]|2| | [4, 5, 6]|5| +---+-+ HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 29 Feb 2024 at 20:50, Karthick Nk wrote: > Hi All, > > I have two dataframe with below structure, i have to join these two > dataframe - the scenario is one column is string in one dataframe and in > other df join column is array of string, so we have to inner join two df > and get the data if string value is present in any of the array of string > value in another dataframe, > > > df1 = spark.sql(""" > SELECT > mr.id as mr_id, > pv.id as pv_id, > array(mr.id, pv.id) as combined_id > FROM > table1 mr > INNER JOIN table2 pv ON pv.id = Mr.recordid >where > pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36' > or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b' > """) > > # df1.display() > > # Your second query > df2 = spark.sql(""" > SELECT > id > FROM > table2 > WHERE > id = '35122806-4cd2-4916-a149-24ea55c2dc36' > > """) > > > > Result data: > 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone is > common between string and array of string value. > > Can you share the sample snippet, how we can do the join for this two > different datatype in the dataframe. > > if any clarification needed, pls feel free to ask. > > Thanks > >
Re: [Spark Core] Potential bug in JavaRDD#countByValue
Hi, Quick observations from what you have provided - The observed discrepancy between rdd.count() and rdd.map(Item::getType).countByValue()in distributed mode suggests a potential aggregation issue with countByValue(). The correct results in local mode give credence to this theory. - Workarounds using mapToPair() and reduceByKey() produce identical results, indicating a broader pattern rather than method specific behaviour. - Dataset.groupBy().count()yields accurate results, but this method incurs overhead for RDD-to-Dataset conversion. Your expected total count of 75187 is around 7 times larger than the observed count of 10519, mapping to the number of your executors 7. This suggests potentially incorrect aggregation or partial aggregation across executors. Now before raising red flag, these could be the culprit - Data Skew, uneven distribution of data across executors could cause partial aggregation if a single executor processes most items of a particular type. - Partial Aggregations, Spark might be combining partial counts from executors incorrectly, leading to inaccuracies. - Finally a bug in 3.5 is possible. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 27 Feb 2024 at 19:02, Stuart Fehr wrote: > Hello, I recently encountered a bug with the results from > JavaRDD#countByValue that does not reproduce when running locally. For > background, we are running a Spark 3.5.0 job on AWS EMR 7.0.0. > > The code in question is something like this: > > JavaRDD rdd = // ... >> rdd.count(); // 75187 > > > > // Get the count broken down by type >> rdd.map(Item::getType).countByValue(); > > > Which gives these results from the resulting Map: > > TypeA: 556 > TypeB: 9168 > TypeC: 590 > TypeD: 205 > (total: 10519) > > These values are incorrect, since every item has a type defined, so the > total of all the types should be 75187. When I inspected this stage in the > Spark UI, I found that it was using 7 executors. Since the value here is > about 1/7th of the actual expected value, I suspect that there is some > issue with the way that the executors report their results back to the > driver. These results for the same code are correct when I run the job in > local mode ("local[4]"), so it may also have something to do with how data > is shared across processes. > > For workarounds, I have also tried: > > rdd.mapToPair(item -> Tuple2.apply(item.getType(), 1)).countByKey(); >> rdd.mapToPair(item -> Tuple2.apply(item.getType(), >> 1L)).reduceByKey(Long::sum).collectAsMap(); > > > These yielded the same (incorrect) result. > > I did find that using Dataset.groupBy().count() did yield the correct > results: > > TypeA: 3996 > TypeB: 65490 > TypeC: 4224 > TypeD: 1477 > > So, I have an immediate workaround, but it is somewhat awkward since I > have to create a Dataframe from a JavaRDD each time. > > Am I doing something wrong? Do these methods not work the way that I > expected them to from reading the documentation? Is this a legitimate bug? > > I would be happy to provide more details if that would help in debugging > this scenario. > > Thank you for your time, > ~Stuart Fehr >
Re: Issue of spark with antlr version
Hi, You have provided little information about where Spark fits in here. So I am guessing :) Data Source (JSON, XML, log file, etc.) --> Preprocessing (Spark jobs for filtering, cleaning, etc.)? --> Antlr Parser (Generated tool) --> Extracted Data (Mapped to model) --> Spring Data Model (Java objects) --> Spring Application Logic (Controllers, Services, Repositories) etc. Is this a good guess? HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 27 Feb 2024 at 12:25, Sahni, Ashima wrote: > Hi Team, > > > > Can you please let us know the update on below. > > > > Thanks, > > Ashima > > > > *From:* Chawla, Parul > *Sent:* Sunday, February 25, 2024 11:57 PM > *To:* user@spark.apache.org > *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti < > jyoti.misra.paras...@accenture.com> > *Subject:* Issue of spark with antlr version > > > > Hi Spark Team, > > > > > > Our application is currently using spring framrwork 5.3.31 .To upgrade it > to 6.x , as per application dependency we must upgrade Spark and > Hibernate jars as well . > > With Hibernate compatible upgrade, the dependent Antlr4 jar version has > been upgraded to 4.10.1 but there’s no Spark version available with the > upgraded Antlr4 jar. > > Can u please update when we can have updated version with upgraded antl4 > version.. > > > > > > Regards, > > Parul > > -- > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security, AI-powered support > capabilities, and assessment of internal compliance with Accenture policy. > Your privacy is important to us. Accenture uses your personal data only in > compliance with data protection laws. For further information on how > Accenture processes your personal data, please see our privacy statement at > https://www.accenture.com/us-en/privacy-policy. > > __ > > www.accenture.com >
Re: Bugs with joins and SQL in Structured Streaming
Hi, These are all on spark 3.5, correct? Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 26 Feb 2024 at 22:18, Andrzej Zera wrote: > Hey all, > > I've been using Structured Streaming in production for almost a year > already and I want to share the bugs I found in this time. I created a test > for each of the issues and put them all here: > https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala > > I split the issues into three groups: outer joins on event time, interval > joins and Spark SQL. > > Issues related to outer joins: > >- When joining three or more input streams on event time, if two or >more streams don't contain an event for a join key (which is event time), >no row will be output even if other streams contain an event for this join >key. Tests that check for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86 >and > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169 >- When joining aggregated stream with raw events with a stream with >already aggregated events (aggregation made outside of Spark), then no row >will be output if that second stream don't contain a corresponding event. >Test that checks for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266 >- When joining two aggregated streams (aggregated in Spark), no result >is produced. Test that checks for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341. >I've already reported this one here: >https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been >handled yet. > > Issues related to interval joins: > >- When joining three streams (A, B, C) using interval join on event >time, in the way that B.eventTime is conditioned on A.eventTime and >C.eventTime is also conditioned on A.eventTime, and then doing window >aggregation based on A's event time, the result is output only after >watermark crosses the window end + interval(A, B) + interval (A, C). >However, I'd expect results to be output faster, i.e. when the watermark >crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is >that event B can happen 3 minutes after event A and event C can happen 5 >minutes after A, there is no point to suspend reporting output for 8 >minutes (3+5) after the end of the window if we know that no more event can >be matched after 5 min from the window end (assuming window end is based on >A's event time). Test that checks for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32 > > SQL issues: > >- WITH clause (in contrast to subquery) seems to create a static >DataFrame that can't be used in streaming joins. Test that checks for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31 >- Two subqueries, each aggregating data using window() functio, breaks >the output schema. Test that checks for this: > > https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122 > > I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so > won't be able to provide fixes. But I hope the test cases I provided can be > of some help. > > Regards, > Andrzej >
Re: [Beginner Debug]: Executor OutOfMemoryError
Seems like you are having memory issues. Examine your settings. 1. It appears that your driver memory setting is too high. It should be a fraction of total memy provided by YARN 2. Use the Spark UI to monitor the job's memory consumption. Check the Storage tab to see how memory is being utilized across caches, data, and shuffle. 3. Check the Executors tab to identify tasks or executors that are experiencing memory issues. Look for tasks with high input sizes or shuffle spills. 4. In YARN mode, consider setting spark.executor.memoryOverhead property to handle executor overhead. This is important for tasks that require additional memory beyond the executor memory setting. Example 5. --conf spark.executor.memoryOverhead=1000 HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 23 Feb 2024 at 02:42, Shawn Ligocki wrote: > Hi I'm new to Spark and I'm running into a lot of OOM issues while trying > to scale up my first Spark application. I am running into these issues with > only 1% of the final expected data size. Can anyone help me understand how > to properly configure Spark to use limited memory or how to debug which > part of my application is causing so much memory trouble? > > My logs end up with tons of messages like: > > 24/02/22 10:51:01 WARN TaskMemoryManager: Failed to allocate a page >> (134217728 bytes), try again. >> 24/02/22 10:51:01 WARN RowBasedKeyValueBatch: Calling spill() on >> RowBasedKeyValueBatch. Will not spill but return 0. >> 24/02/22 10:52:28 WARN Executor: Issue communicating with driver in >> heartbeater >> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [1 >> milliseconds]. This timeout is controlled by >> spark.executor.heartbeatInterval >> ... >> 24/02/22 10:58:17 WARN NettyRpcEnv: Ignored message: >> HeartbeatResponse(false) >> 24/02/22 10:58:17 WARN HeartbeatReceiver: Removing executor driver with >> no recent heartbeats: 207889 ms exceeds timeout 12 ms >> 24/02/22 10:58:17 ERROR Executor: Exception in task 175.0 in stage 2.0 >> (TID 676) >> java.lang.OutOfMemoryError: Java heap space >> ... > > > Background: The goal of this application is to load a large number of > parquet files, group by a couple fields and compute some summarization > metrics for each group and write the result out. In Python basically: > > from pyspark.sql import SparkSession >> import pyspark.sql.functions as func > > >> spark = SparkSession.builder.getOrCreate() >> df = spark.read.parquet(*pred_paths) >> df = df.groupBy("point_id", "species_code").agg( >> func.count("pred_occ").alias("ensemble_support")) >> df.write.parquet(output_path) > > > And I am launching it with: > > spark-submit \ >> --name ensemble \ >> --driver-memory 64g --executor-memory 64g \ >> stem/ensemble_spark.py > > > I noticed that increasing --driver-memory and --executor-memory did help > me scale up somewhat, but I cannot increase those forever. > > Some details: > >- All my tests are currently on a single cluster node (with 128GB RAM >& 64 CPU cores) or locally on my laptop (32GB RAM & 12 CPU cores). >Eventually, I expect to run this in parallel on the cluster. >- This is running on Spark 3.0.1 (in the cluster), I'm seeing the same >issues with 3.5 on my laptop. >- The input data is tons of parquet files stored on NFS. For the final >application it will be about 50k parquet files ranging in size up to 15GB >each. Total size of 100TB, 4 trillion rows, 5 columns. I am currently >testing with ~1% this size: 500 files, 1TB total, 40B rows total. >- There should only be a max of 100 rows per group. So I expect an >output size somewhere in the range 1-5TB, 40-200B rows. For the test: 50GB, >2B rows. These output files are also written to NFS. >- The rows for the same groups are not near each other. Ex: no single >parquet file will have any two rows for the same group. > > Here are some questions I have: > >1. Does Spark know how much memory is available? Do I need to tell it >somehow? Is t
Re: Spark 4.0 Query Analyzer Bug Report
Indeed valid points raised including the potential typo in the new spark version. I suggest, in the meantime, you should look for the so called alternative debugging methods - - Simpler explain(), try basic explain() or explain("extended"). This might provide a less detailed, but potentially functional, explanation. - Manual Analysis*, *analyze the query structure and logical steps yourself - Spark UI, review the Spark UI (accessible through your Spark application on 4040) for delving into query execution and potential bottlenecks. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 21 Feb 2024 at 08:37, Holden Karau wrote: > Do you mean Spark 3.4? 4.0 is very much not released yet. > > Also it would help if you could share your query & more of the logs > leading up to the error. > > On Tue, Feb 20, 2024 at 3:07 PM Sharma, Anup > wrote: > >> Hi Spark team, >> >> >> >> We ran into a dataframe issue after upgrading from spark 3.1 to 4. >> >> >> >> query_result.explain(extended=True)\n File >> \"…/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py\" >> >> raise Py4JJavaError(\npy4j.protocol.Py4JJavaError: An error occurred while >> calling z:org.apache.spark.sql.api.python.PythonSQLUtils.explainString.\n: >> java.lang.IllegalStateException: You hit a query analyzer bug. Please report >> your query to Spark user mailing list.\n\tat >> org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:516)\n\tat >> >> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)\n\tat >> scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)\n\tat >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)\n\tat >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat >> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)\n\tat >> >> org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)\n\tat >> >> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)\n\tat >> >> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)\n\tat >> >> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)\n\tat >> scala.collection.Iterator.foreach(Iterator.scala:943)\n\tat >> scala.collection.Iterator.foreach$(Iterator.scala:943)\n\tat >> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)\n\tat >> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)\n\tat >> scala.collect... >> >> >> >> >> >> Could you please let us know if this is already being looked at? >> >> >> >> Thanks, >> >> Anup >> > > > -- > Cell : 425-233-8271 >
Kafka-based Spark Streaming and Vertex AI for Sentiment Analysis
I am working on a pet project to implement a real-time sentiment analysis system for analyzing customer reviews. It leverages Kafka for data ingestion, Spark Structured Streaming (SSS) for real-time processing, and Vertex AI for sentiment analysis and potential action triggers. *Features* - Real-time processing of customer reviews using SSS. - Sentiment analysis using pre-assigned labels or Vertex AI <https://cloud.google.com/vertex-ai?hl=en>models. - Integration with Vertex AI for model deployment and prediction serving. - Potential actions based on sentiment analysis results (e.g., notifications, database updates). *Tech stack* - Kafka: Stream processing platform for data ingestion. - SSS for real-time data processing on incoming messages with cleansing - Vertex AI: Machine learning platform for model training I have created sample Json data with relevant attributes for product review as shown below { "rowkey": "7de43681-0e4a-45cb-ad40-5f14f5678333", "product_id": "product-id-1616", "timereported": "2024-02-21T08:46:40", "description": "Easy to use and setup, perfect for beginners.", "price": GBP507, "sentiment": negative, "product_category": "Electronics", "customer_id": "customer4", "location": "UK", "rating": 6, "review_text": "Sleek and modern design, but lacking some features.", "user_feedback": "Negative", "review_source": "online", "sentiment_confidence": 0.33, "product_features": "user-friendly", "timestamp": "", "language": "English" }, I also attached a high level diagram. There is recently a demand for Gemini usage. Your views are appreciated. Thanks Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* I am an architect and not a data scientist. The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Thanks for your kind words Sri Well it is true that as yet spark on kubernetes is not on-par with spark on YARN in maturity and essentially spark on kubernetes is still work in progress.* So in the first place IMO one needs to think why executors are failing. What causes this behaviour? Is it the code or some inadequate set-up? *These things come to my mind - Resource Allocation: Insufficient resources (CPU, memory) can lead to executor failures. - Mis-configuration Issues: Verify that the configurations are appropriate for your workload. - External Dependencies: If your Spark job relies on external services or data sources, ensure they are accessible. Issues such as network problems or unavailability of external services can lead to executor failures. - Data Skew: Uneven distribution of data across partitions can lead to data skew and cause some executors to process significantly more data than others. This can lead to resource exhaustion on specific executors. - Spark Version and Kubernetes Compatibility: Is Spark running on EKS or GKE -- that you are using a Spark version that is compatible with your Kubernetes environment. These vendors normally run older, more stable versions of Spark. Compatibility issues can arise when using your newer version of Spark. - How up-to-date are your docker images on container registries (ECR, GCR).Is there any incompatibility between docker images built on a Spark version and the host spark version you are submitting your spark-submit from? HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 23:18, Sri Potluri wrote: > Dear Mich, > > Thank you for your detailed response and the suggested approach to > handling retry logic. I appreciate you taking the time to outline the > method of embedding custom retry mechanisms directly into the application > code. > > While the solution of wrapping the main logic of the Spark job in a loop > for controlling the number of retries is technically sound and offers a > workaround, it may not be the most efficient or maintainable solution for > organizations running a large number of Spark jobs. Modifying each > application to include custom retry logic can be a significant undertaking, > introducing variability in how retries are handled across different jobs, > and require additional testing and maintenance. > > Ideally, operational concerns like retry behavior in response to > infrastructure failures should be decoupled from the business logic of > Spark applications. This separation allows data engineers and scientists to > focus on the application logic without needing to implement and test > infrastructure resilience mechanisms. > > Thank you again for your time and assistance. > > Best regards, > Sri Potluri > > On Mon, Feb 19, 2024 at 5:03 PM Mich Talebzadeh > wrote: > >> Went through your issue with the code running on k8s >> >> When an executor of a Spark application fails, the system attempts to >> maintain the desired level of parallelism by automatically recreating a new >> executor to replace the failed one. While this behavior is beneficial for >> transient errors, ensuring that the application continues to run, it >> becomes problematic in cases where the failure is due to a persistent issue >> (such as misconfiguration, inaccessible external resources, or incompatible >> environment settings). In such scenarios, the application enters a loop, >> continuously trying to recreate executors, which leads to resource wastage >> and complicates application management. >> >> Well fault tolerance is built especially in k8s cluster. You can >> implement your own logic to control the retry attempts. You can do this >> by wrapping the main logic of your Spark job in a loop and controlling the >> number of retries. If a persistent issue is detected, you can choose to >> stop the job. Today is the third time that looping control has come up :) >> >> Take this code >> >> import time >> max_retries = 5 retries = 0 while retries < max_retries: try: # Your >> Spark job logic here except Exception as e: # Log the exception >> print(f"Exception in Spark job:
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Went through your issue with the code running on k8s When an executor of a Spark application fails, the system attempts to maintain the desired level of parallelism by automatically recreating a new executor to replace the failed one. While this behavior is beneficial for transient errors, ensuring that the application continues to run, it becomes problematic in cases where the failure is due to a persistent issue (such as misconfiguration, inaccessible external resources, or incompatible environment settings). In such scenarios, the application enters a loop, continuously trying to recreate executors, which leads to resource wastage and complicates application management. Well fault tolerance is built especially in k8s cluster. You can implement your own logic to control the retry attempts. You can do this by wrapping the main logic of your Spark job in a loop and controlling the number of retries. If a persistent issue is detected, you can choose to stop the job. Today is the third time that looping control has come up :) Take this code import time max_retries = 5 retries = 0 while retries < max_retries: try: # Your Spark job logic here except Exception as e: # Log the exception print(f"Exception in Spark job: {str(e)}") # Increment the retry count retries += 1 # Sleep time.sleep(60) else: # Break out of the loop if the job completes successfully break HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh wrote: > Not that I am aware of any configuration parameter in Spark classic to > limit executor creation. Because of fault tolerance Spark will try to > recreate failed executors. Not really that familiar with the Spark operator > for k8s. There may be something there. > > Have you considered custom monitoring and handling within Spark itself > using max_retries = 5 etc? > > HTH > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 19 Feb 2024 at 18:34, Sri Potluri wrote: > >> Hello Spark Community, >> >> I am currently leveraging Spark on Kubernetes, managed by the Spark >> Operator, for running various Spark applications. While the system >> generally works well, I've encountered a challenge related to how Spark >> applications handle executor failures, specifically in scenarios where >> executors enter an error state due to persistent issues. >> >> *Problem Description* >> >> When an executor of a Spark application fails, the system attempts to >> maintain the desired level of parallelism by automatically recreating a new >> executor to replace the failed one. While this behavior is beneficial for >> transient errors, ensuring that the application continues to run, it >> becomes problematic in cases where the failure is due to a persistent issue >> (such as misconfiguration, inaccessible external resources, or incompatible >> environment settings). In such scenarios, the application enters a loop, >> continuously trying to recreate executors, which leads to resource wastage >> and complicates application management. >> >> *Desired Behavior* >> >> Ideally, I would like to have a mechanism to limit the number of retries >> for executor recreation. If the system fails to successfully create an >> executor more than a specified number of times (e.g., 5 attempts), the >> entire Spark application should fail and stop trying to recreate the >> executor. This behavior would help in efficiently managing resources and >> avoiding prolonged failure states. >> >> *Questions for the Community* >> >> 1.
Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow
Ok thanks for your clarifications Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 17:24, Chao Sun wrote: > Hi Mich, > > > Also have you got some benchmark results from your tests that you can > possibly share? > > We only have some partial benchmark results internally so far. Once > shuffle and better memory management have been introduced, we plan to > publish the benchmark results (at least TPC-H) in the repo. > > > Compared to standard Spark, what kind of performance gains can be > expected with Comet? > > Currently, users could benefit from Comet in a few areas: > - Parquet read: a few improvements have been made against reading from S3 > in particular, so users can expect better scan performance in this scenario > - Hash aggregation > - Columnar shuffle > - Decimals (Java's BigDecimal is pretty slow) > > > Can one use Comet on k8s in conjunction with something like a Volcano > addon? > > I think so. Comet is mostly orthogonal to the Spark scheduler framework. > > Chao > > > > > > > On Fri, Feb 16, 2024 at 5:39 AM Mich Talebzadeh > wrote: > >> Hi Chao, >> >> As a cool feature >> >> >>- Compared to standard Spark, what kind of performance gains can be >>expected with Comet? >>- Can one use Comet on k8s in conjunction with something like a >>Volcano addon? >> >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge, sourced from both personal expertise and other resources but of >> course cannot be guaranteed . It is essential to note that, as with any >> advice, one verified and tested result holds more weight than a thousand >> expert opinions. >> >> >> On Tue, 13 Feb 2024 at 20:42, Chao Sun wrote: >> >>> Hi all, >>> >>> We are very happy to announce that Project Comet, a plugin to >>> accelerate Spark query execution via leveraging DataFusion and Arrow, >>> has now been open sourced under the Apache Arrow umbrella. Please >>> check the project repo >>> https://github.com/apache/arrow-datafusion-comet for more details if >>> you are interested. We'd love to collaborate with people from the open >>> source community who share similar goals. >>> >>> Thanks, >>> Chao >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>>
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Not that I am aware of any configuration parameter in Spark classic to limit executor creation. Because of fault tolerance Spark will try to recreate failed executors. Not really that familiar with the Spark operator for k8s. There may be something there. Have you considered custom monitoring and handling within Spark itself using max_retries = 5 etc? HTH HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 18:34, Sri Potluri wrote: > Hello Spark Community, > > I am currently leveraging Spark on Kubernetes, managed by the Spark > Operator, for running various Spark applications. While the system > generally works well, I've encountered a challenge related to how Spark > applications handle executor failures, specifically in scenarios where > executors enter an error state due to persistent issues. > > *Problem Description* > > When an executor of a Spark application fails, the system attempts to > maintain the desired level of parallelism by automatically recreating a new > executor to replace the failed one. While this behavior is beneficial for > transient errors, ensuring that the application continues to run, it > becomes problematic in cases where the failure is due to a persistent issue > (such as misconfiguration, inaccessible external resources, or incompatible > environment settings). In such scenarios, the application enters a loop, > continuously trying to recreate executors, which leads to resource wastage > and complicates application management. > > *Desired Behavior* > > Ideally, I would like to have a mechanism to limit the number of retries > for executor recreation. If the system fails to successfully create an > executor more than a specified number of times (e.g., 5 attempts), the > entire Spark application should fail and stop trying to recreate the > executor. This behavior would help in efficiently managing resources and > avoiding prolonged failure states. > > *Questions for the Community* > > 1. Is there an existing configuration or method within Spark or the Spark > Operator to limit executor recreation attempts and fail the job after > reaching a threshold? > > 2. Has anyone else encountered similar challenges and found workarounds or > solutions that could be applied in this context? > > > *Additional Context* > > I have explored Spark's task and stage retry configurations > (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these > do not directly address the issue of limiting executor creation retries. > Implementing a custom monitoring solution to track executor failures and > manually stop the application is a potential workaround, but it would be > preferable to have a more integrated solution. > > I appreciate any guidance, insights, or feedback you can provide on this > matter. > > Thank you for your time and support. > > Best regards, > Sri P >
Re: Regarding Spark on Kubernetes(EKS)
Sure but first it would be beneficial to understand the way Spark works on Kubernetes and the concept.s Have a look at this article of mine Spark on Kubernetes, A Practitioner’s Guide <https://www.linkedin.com/pulse/spark-kubernetes-practitioners-guide-mich-talebzadeh-ph-d-%3FtrackingId=Wsu3lkoPaCWqGemYHe8%252BLQ%253D%253D/?trackingId=Wsu3lkoPaCWqGemYHe8%2BLQ%3D%3D> HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 15:09, Jagannath Majhi < jagannath.ma...@cloud.cbnits.com> wrote: > Yes > > On Mon, Feb 19, 2024, 8:35 PM Mich Talebzadeh > wrote: > >> OK you have a jar file that you want to work with when running using >> Spark on k8s as the execution engine (EKS) as opposed to YARN on EMR as >> the execution engine? >> >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Mon, 19 Feb 2024 at 14:38, Jagannath Majhi < >> jagannath.ma...@cloud.cbnits.com> wrote: >> >>> I am not using any private docker image. Only I am running the jar file >>> in EMR using spark-submit command so now I want to run this jar file in eks >>> so can you please tell me how can I set-up for this ?? >>> >>> On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi < >>> jagannath.ma...@cloud.cbnits.com> wrote: >>> >>>> Can we connect over Google meet?? >>>> >>>> On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Where is your docker file? In ECR container registry. >>>>> If you are going to use EKS, then it need to be accessible to all >>>>> nodes of cluster >>>>> >>>>> When you build your docker image, put your jar under the $SPARK_HOME >>>>> directory. Then add a line to your docker build file as below >>>>> Here I am accessing Google BigQuery DW from EKS >>>>> # Add a BigQuery connector jar. >>>>> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ >>>>> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' >>>>> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \ >>>>> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}" >>>>> COPY --chown=spark:spark \ >>>>> spark-bigquery-with-dependencies_2.12-0.22.2.jar >>>>> "${SPARK_EXTRA_JARS_DIR}" >>>>> >>>>> Here I am accessing Google BigQuery DW from EKS cluster >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>>view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>>
Re: Regarding Spark on Kubernetes(EKS)
OK you have a jar file that you want to work with when running using Spark on k8s as the execution engine (EKS) as opposed to YARN on EMR as the execution engine? Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 14:38, Jagannath Majhi < jagannath.ma...@cloud.cbnits.com> wrote: > I am not using any private docker image. Only I am running the jar file in > EMR using spark-submit command so now I want to run this jar file in eks so > can you please tell me how can I set-up for this ?? > > On Mon, Feb 19, 2024, 8:06 PM Jagannath Majhi < > jagannath.ma...@cloud.cbnits.com> wrote: > >> Can we connect over Google meet?? >> >> On Mon, Feb 19, 2024, 8:03 PM Mich Talebzadeh >> wrote: >> >>> Where is your docker file? In ECR container registry. >>> If you are going to use EKS, then it need to be accessible to all nodes >>> of cluster >>> >>> When you build your docker image, put your jar under the $SPARK_HOME >>> directory. Then add a line to your docker build file as below >>> Here I am accessing Google BigQuery DW from EKS >>> # Add a BigQuery connector jar. >>> ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ >>> ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' >>> RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \ >>> && chown spark:spark "${SPARK_EXTRA_JARS_DIR}" >>> COPY --chown=spark:spark \ >>> spark-bigquery-with-dependencies_2.12-0.22.2.jar >>> "${SPARK_EXTRA_JARS_DIR}" >>> >>> Here I am accessing Google BigQuery DW from EKS cluster >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi < >>> jagannath.ma...@cloud.cbnits.com> wrote: >>> >>>> Dear Spark Community, >>>> >>>> I hope this email finds you well. I am reaching out to seek assistance >>>> and guidance regarding a task I'm currently working on involving Apache >>>> Spark. >>>> >>>> I have developed a JAR file that contains some Spark applications and >>>> functionality, and I need to run this JAR file within a Spark cluster. >>>> However, the JAR file is located in an AWS S3 bucket. I'm facing some >>>> challenges in configuring Spark to access and execute this JAR file >>>> directly from the S3 bucket. >>>> >>>> I would greatly appreciate any advice, best practices, or pointers on >>>> how to achieve this integration effectively. Specifically, I'm looking for >>>> insights on: >>>> >>>>1. Configuring Spark to access and retrieve the JAR file from an >>>>AWS S3 bucket. >>>>2. Setting up the necessary permissions and authentication >>>>mechanisms to ensure seamless access to the S3 bucket. >>>>3. Any potential performance considerations or optimizations when >>>>running Spark applications with dependencies stored in remote storage >>>> like >>>>AWS S3. >>>> >>>> If anyone in the community has prior experience or knowledge in this >>>> area, I would be extremely grateful for your guidance. Additionally, if >>>> there are any relevant resources, documentation, or tutorials that you >>>> could recommend, it would be incredibly helpful. >>>> >>>> Thank you very much for considering my request. I look forward to >>>> hearing from you and benefiting from the collective expertise of the Spark >>>> community. >>>> >>>> Best regards, Jagannath Majhi >>>> >>>
Re: Regarding Spark on Kubernetes(EKS)
Where is your docker file? In ECR container registry. If you are going to use EKS, then it need to be accessible to all nodes of cluster When you build your docker image, put your jar under the $SPARK_HOME directory. Then add a line to your docker build file as below Here I am accessing Google BigQuery DW from EKS # Add a BigQuery connector jar. ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" \ && chown spark:spark "${SPARK_EXTRA_JARS_DIR}" COPY --chown=spark:spark \ spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}" Here I am accessing Google BigQuery DW from EKS cluster HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 13:42, Jagannath Majhi < jagannath.ma...@cloud.cbnits.com> wrote: > Dear Spark Community, > > I hope this email finds you well. I am reaching out to seek assistance and > guidance regarding a task I'm currently working on involving Apache Spark. > > I have developed a JAR file that contains some Spark applications and > functionality, and I need to run this JAR file within a Spark cluster. > However, the JAR file is located in an AWS S3 bucket. I'm facing some > challenges in configuring Spark to access and execute this JAR file > directly from the S3 bucket. > > I would greatly appreciate any advice, best practices, or pointers on how > to achieve this integration effectively. Specifically, I'm looking for > insights on: > >1. Configuring Spark to access and retrieve the JAR file from an AWS >S3 bucket. >2. Setting up the necessary permissions and authentication mechanisms >to ensure seamless access to the S3 bucket. >3. Any potential performance considerations or optimizations when >running Spark applications with dependencies stored in remote storage like >AWS S3. > > If anyone in the community has prior experience or knowledge in this area, > I would be extremely grateful for your guidance. Additionally, if there are > any relevant resources, documentation, or tutorials that you could > recommend, it would be incredibly helpful. > > Thank you very much for considering my request. I look forward to hearing > from you and benefiting from the collective expertise of the Spark > community. > > Best regards, Jagannath Majhi >
Re: Re-create SparkContext of SparkSession inside long-lived Spark app
OK got it Someone asked a similar but not related to shuffle question in Spark slack channel.. This is a simple Python code that creates shuffle files in shuffle_directory = "/tmp/spark_shuffles" and simulates working examples using a loop and periodically cleans up shuffle files older than 1 second.. Take it for a spin import os import glob import time from datetime import datetime, timedelta import shutil from pyspark.sql import SparkSession def generate_shuffle_data(spark, shuffle_directory): # Generate some micky mouse data data = [("A", 1), ("B", 2), ("A", 3), ("C", 4), ("B", 5)] columns = ["column_to_check", "partition_column"] df = spark.createDataFrame(data, columns) df.printSchema() # Write DataFrame with shuffle to the specified output path df.write.mode("overwrite").partitionBy("partition_column").parquet(shuffle_directory) def simulate_long_lived_spark_app(): shuffle_directory = "/tmp/spark_shuffles" # Remove the directory if it exists if os.path.exists(shuffle_directory): shutil.rmtree(shuffle_directory) # Create the directory os.makedirs(shuffle_directory) spark = SparkSession.builder.appName("shuffleCleanupExample").getOrCreate() try: for iteration in range(1, 6): # Simulating 5 iterations of the long-lived Spark app print(f"Iteration {iteration}") # Generate and write shuffle data generate_shuffle_data(spark, shuffle_directory) # Perform some Spark operations (simulated processing) # your code # Periodically clean up shuffle files older than 1 second try: cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds=1) print("Shuffle cleanup successful.") except Exception as e: print(f"Error during shuffle cleanup: {str(e)}") # Simulate some delay between iterations time.sleep(2) finally: # Stop the Spark session spark.stop() def cleanup_unnecessary_shuffles(shuffle_directory, max_age_seconds): current_time = datetime.now() # Iterate through shuffle files in the directory for shuffle_file in os.listdir(shuffle_directory): shuffle_file_path = os.path.join(shuffle_directory, shuffle_file) # Check if it's a file and not a directory if os.path.isfile(shuffle_file_path): # Get the creation time of the file file_creation_time = datetime.fromtimestamp(os.path.getctime(shuffle_file_path)) # Calculate the age of the file in seconds age_seconds = (current_time - file_creation_time).total_seconds() try: # Check if the file is older than the specified max_age_seconds if age_seconds > max_age_seconds: # Perform cleanup (delete the file) os.remove(shuffle_file_path) print(f"Deleted old shuffle file: {shuffle_file_path}") except Exception as e: print(f"Error during cleanup: {str(e)}") # Run the simulation simulate_long_lived_spark_app() .and the output Iteration 1 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 2 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 3 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 4 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. Iteration 5 root |-- column_to_check: string (nullable = true) |-- partition_column: long (nullable = true) Shuffle cleanup successful. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 08:27, Saha, Daniel wrote: > Thanks for the suggestions Mich, Jörn, and Adam. > > > > The rationale for long-lived app with loop versus submitting multiple yarn > applications is mainly for simplicity. Plan to run app on an multi-tenant > EMR cluster alongsi
Re: Re-create SparkContext of SparkSession inside long-lived Spark app
Hi, What do you propose or you think will help when these spark jobs are independent of each other --> So once a job/iterator is complete, there is no need to retain these shuffle files. You have a number of options to consider starting from spark configuration parameters and so forth https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior Aside, have you turned on dynamic resource allocation and the relevant parameters. Can you up executor memory -> spark.storage.,memoryFraction and spark.shuffle.spillThreshold as well? You can of course use brute force with shutil.rmtree(path) to remove these files. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, one verified and tested result holds more weight than a thousand expert opinions. On Sat, 17 Feb 2024 at 23:40, Saha, Daniel wrote: > Hi, > > > > *Background*: I am running into executor disk space issues when running a > long-lived Spark 3.3 app with YARN on AWS EMR. The app performs > back-to-back spark jobs in a sequential loop with each iteration performing > 100gb+ shuffles. The files taking up the space are related to shuffle > blocks [1]. Disk is only cleared when restarting the YARN app. For all > intents and purposes, each job is independent. So once a job/iterator is > complete, there is no need to retain these shuffle files. I want to try > stopping and recreating the Spark context between loop iterations/jobs to > indicate to Spark DiskBlockManager that these intermediate results are no > longer needed [2]. > > > > *Questions*: > >- Are there better ways to remove/clean the directory containing these >old, no longer used, shuffle results (aside from cron or restarting yarn >app)? >- How to recreate the spark context within a single application? I see >no methods in Spark Session for doing this, and each new Spark session >re-uses the existing spark context. After stopping the SparkContext, >SparkSession does not re-create a new one. Further, creating a new >SparkSession via constructor and passing in a new SparkContext is not >allowed as it is a protected/private method. > > > > Thanks > > Daniel > > > > [1] > /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5 > > [2] https://stackoverflow.com/a/38791921 >
Re: job uuid not unique
As a bare minimum you will need to add some error trapping and exception handling! scala> import org.apache.hadoop.fs.FileAlreadyExistsException import org.apache.hadoop.fs.FileAlreadyExistsException and try your code try { df .coalesce(1) .write .option("fs.s3a.committer.require.uuid", "true") .option("fs.s3a.committer.generate.uuid", "true") .option("fs.s3a.committer.name", "magic") .option("fs.s3a.committer.magic.enabled", "true") .option("orc.compress", "zlib") .mode(SaveMode.Append) .orc(path) } catch { case e: FileAlreadyExistsException => println("File already exists. Handling it...") // other catch blocks for the other exceptions? } FileAlreadyExistsException allows you to continue without crashing etc. Another brute force is that instead of *SaveMode.Append*, you can try using *SaveMode.Overwrite**.* This will overwrite the existing data if it already exists. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, one verified and tested result holds more weight than a thousand expert opinions. On Fri, 16 Feb 2024 at 21:25, Рамик И wrote: > > Hi > I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I > get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException. > > Spark version: 3.5.0 > scala version : 2.13.8 > Cluster: k8s > > libraryDependencies > org.apache.hadoop.hadoop-aws3.3.4 > com.amazonaws.aws-java-sdk-s31.12.600 > > > > code: > df > .coalesce(1) > .write > .option("fs.s3a.committer.require.uuid", "true") > .option("fs.s3a.committer.generate.uuid", "true") > .option("fs.s3a.committer.name", "magic") > .option("fs.s3a.committer.magic.enabled", "true") > .option("orc.compress", "zlib") > .mode(SaveMode.Append) > .orc(path) > > > > executor 9 > > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID > 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid > 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic > to output data to s3a://mybucket/test > 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer > MagicCommitter{AbstractS3ACommitter{role=Task committer > attempt_202402161305112153373254688311399_0367_m_00_13217, name=magic, > outputPath=s3a://mybucket/test, > workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13217/__base, > uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid > source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test > 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output > committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task > attempt_202402161305112153373254688311399_0367_m_00_13217 > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task > attempt_202402161305112153373254688311399_0367_m_00_13217: duration > 0:00.061s > 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1 > (TID 13217) > org.apache.hadoop.fs.FileAlreadyExistsException: > s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc > already exists > > > executor 10 > 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID > 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid > 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic > to output data to s3a://mybucket/test > 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer > MagicCommitter{AbstractS3ACommitter{role=Task committer > attempt_202402161305112153373254688311399_0367_m_00_13216, name=magic, > outputPath=s3a://mybucket/test, > workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13216/__base, > uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid > source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test > 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output > committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter > 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup
Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow
Hi Chao, As a cool feature - Compared to standard Spark, what kind of performance gains can be expected with Comet? - Can one use Comet on k8s in conjunction with something like a Volcano addon? HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge, sourced from both personal expertise and other resources but of course cannot be guaranteed . It is essential to note that, as with any advice, one verified and tested result holds more weight than a thousand expert opinions. On Tue, 13 Feb 2024 at 20:42, Chao Sun wrote: > Hi all, > > We are very happy to announce that Project Comet, a plugin to > accelerate Spark query execution via leveraging DataFusion and Arrow, > has now been open sourced under the Apache Arrow umbrella. Please > check the project repo > https://github.com/apache/arrow-datafusion-comet for more details if > you are interested. We'd love to collaborate with people from the open > source community who share similar goals. > > Thanks, > Chao > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow
Hi,I gather from the replies that the plugin is not currently available in the form expected although I am aware of the shell script. Also have you got some benchmark results from your tests that you can possibly share? Thanks, Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge, sourced from both personal expertise and other resources but of course cannot be guaranteed . It is essential to note that, as with any advice, one verified and tested result holds more weight than a thousand expert opinions. On Thu, 15 Feb 2024 at 01:18, Chao Sun wrote: > Hi Praveen, > > We will add a "Getting Started" section in the README soon, but basically > comet-spark-shell > <https://github.com/apache/arrow-datafusion-comet/blob/main/bin/comet-spark-shell> > in > the repo should provide a basic tool to build Comet and launch a Spark > shell with it. > > Note that we haven't open sourced several features yet including shuffle > support, which the aggregate operation depends on. Please stay tuned! > > Chao > > > On Wed, Feb 14, 2024 at 2:44 PM praveen sinha > wrote: > >> Hi Chao, >> >> Is there any example app/gist/repo which can help me use this plugin. I >> wanted to try out some realtime aggregate performance on top of parquet and >> spark dataframes. >> >> Thanks and Regards >> Praveen >> >> >> On Wed, Feb 14, 2024 at 9:20 AM Chao Sun wrote: >> >>> > Out of interest what are the differences in the approach between this >>> and Glutten? >>> >>> Overall they are similar, although Gluten supports multiple backends >>> including Velox and Clickhouse. One major difference is (obviously) >>> Comet is based on DataFusion and Arrow, and written in Rust, while >>> Gluten is mostly C++. >>> I haven't looked very deep into Gluten yet, but there could be other >>> differences such as how strictly the engine follows Spark's semantics, >>> table format support (Iceberg, Delta, etc), fallback mechanism >>> (coarse-grained fallback on stage level or more fine-grained fallback >>> within stages), UDF support (Comet hasn't started on this yet), >>> shuffle support, memory management, etc. >>> >>> Both engines are backed by very strong and vibrant open source >>> communities (Velox, Clickhouse, Arrow & DataFusion) so it's very >>> exciting to see how the projects will grow in future. >>> >>> Best, >>> Chao >>> >>> On Tue, Feb 13, 2024 at 10:06 PM John Zhuge wrote: >>> > >>> > Congratulations! Excellent work! >>> > >>> > On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu wrote: >>> >> >>> >> Absolutely thrilled to see the project going open-source! Huge >>> congrats to Chao and the entire team on this milestone! >>> >> >>> >> Yufei >>> >> >>> >> >>> >> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun wrote: >>> >>> >>> >>> Hi all, >>> >>> >>> >>> We are very happy to announce that Project Comet, a plugin to >>> >>> accelerate Spark query execution via leveraging DataFusion and Arrow, >>> >>> has now been open sourced under the Apache Arrow umbrella. Please >>> >>> check the project repo >>> >>> https://github.com/apache/arrow-datafusion-comet for more details if >>> >>> you are interested. We'd love to collaborate with people from the >>> open >>> >>> source community who share similar goals. >>> >>> >>> >>> Thanks, >>> >>> Chao >>> >>> >>> >>> - >>> >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>> >>> > >>> > >>> > -- >>> > John Zhuge >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>>
Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-
You are getting DiskChecker$DiskErrorExceptionerror when no new records are published to Kafka for a few days. The error indicates that the Spark application could not find a valid local directory to create temporary files for data processing. This mightbe due to any of these - if no records are published to Kafka for a prolonged period, the S3 partition cleanup logic, enabled by default in S3AFileSystem <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html> , might have removed the temporary directories used for writing batch data. When processing resumes, a new temporary directory is needed, but the error occurs due to insufficient space or permission issues (see below) - Limited local disk space: ensure sufficient free space on the worker nodes where Spark executors are running. - Incorrectly configured spark.local.dir*:* verify that the spark.local.dir property in your Spark configuration points to a writable directory with enough space. - Permission issues: check directories listed in spark.local.dir are accessible by the Spark user with read/write permissions. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 13 Feb 2024 at 22:06, Abhishek Singla wrote: > Hi Team, > > Could someone provide some insights into this issue? > > Regards, > Abhishek Singla > > On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla < > abhisheksingla...@gmail.com> wrote: > >> Hi Team, >> >> Version: 3.2.2 >> Java Version: 1.8.0_211 >> Scala Version: 2.12.15 >> Cluster: Standalone >> >> I am using Spark Streaming to read from Kafka and write to S3. The job >> fails with below error if there are no records published to Kafka for a few >> days and then there are some records published. Could someone help me in >> identifying the root cause of this job failure. >> >> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = >> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = >> 0919e548-9706-4757-be94-359848100070] terminated with error >> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any >> valid local directory for s3ablock-0001- >> at >> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462) >> at >> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165) >> at >> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019) >> at >> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816) >> at >> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204) >> at >> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369) >> at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305) >> at >> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102) >> at >> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626) >> at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701) >> at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697) >> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) >> at org.apache.hadoop.fs.FileContext.create(FileContext.java:703) >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327) >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140) >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143) >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333) >> at >> org.apache
Re: Null pointer exception while replying WAL
OK Getting Null pointer exception while replying WAL! One possible reason is that the messages RDD might contain null elements, and attempting to read JSON from null values can result in an NPE. To handle this, you can add a filter before processing the RDD to remove null elements. msgs.foreachRDD { rdd => if (rdd.take(1).nonEmpty) { val messages: RDD[String] = rdd .map { sr => Option(sr).getOrElse("NO records found") } .filter(_ != "NO records found") try { val messagesJson = spark.read.json(messages) messagesJson.write.mode("append").parquet(data) } catch { case ex: Exception => ex.printStackTrace() } } } This modification uses Option *t*o handle potential null values in the rdd and filters out any elements that are still "NO records found" after the mapping operation. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 12 Feb 2024 at 14:22, nayan sharma wrote: > > Please find below code > > def main(args: Array[String]): Unit = { > val config: Config = ConfigFactory.load() > val streamC = StreamingContext.getOrCreate( > checkpointDirectory, > () => functionToCreateContext(config, checkpointDirectory) > ) > > streamC.start() > streamC.awaitTermination() > } > > def functionToCreateContext(config: Config, checkpointDirectory: > String): StreamingContext = { > > val brokerUrl = config.getString("streaming.solace.brokerURL") > val username = config.getString("streaming.solace.userName") > val passwordSol = config.getString("streaming.solace.password") > val vpn = config.getString("streaming.solace.vpn") > val queue = config.getString("streaming.solace.queueName") > val connectionFactory = > config.getString("streaming.solace.connectionFactory") > > > > val spark = SparkSession > .builder() > .appName("rem-Streaming-Consumer") > .config("spark.streaming.receiver.writeAheadLog.enable", "true") > .config("spark.streaming.blockInterval", blockInterval) > .config("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .config("spark.streaming.receiver.writeAheadLog.enable", "true") >.enableHiveSupport > .getOrCreate() > val sc = spark.sparkContext > val ssc = new StreamingContext(sc, Seconds(batchInterval)) > ssc.checkpoint(checkpointDirectory) > > val converter: Message => Option[String] = { > case msg: TextMessage => > Some(msg.getText) > case _ => > None > } > > val props = new Properties() > props.setProperty( > Context.INITIAL_CONTEXT_FACTORY, > "com.solacesystems.jndi.SolJNDIInitialContextFactory" > ) > props.setProperty(Context.PROVIDER_URL, brokerUrl) > props.setProperty(Context.SECURITY_PRINCIPAL, username) > props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol) > props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn) > > val msgs = JmsStreamUtils.createSynchronousJmsQueueStream( > ssc, > > JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), > connectionFactoryName > = connectionFactory,messageSelector = > ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 ) > > msgs.foreachRDD(rdd => > if (rdd.take(1).length > 0) { > val messages: RDD[String] = rdd.map { sr => > if (sr == null) { > println("NO records found") > "NO records found" > } else { > println("Input Records from Solace queue : " + sr.toString) > sr.toString > } > } > Thread.sleep(12) > try{ > * val messagesJson = spark.read.json(messages) ===> getting NPE > here after restarting using WAL* > messagesJson.write.mode("append").parquet(data) > } > catch { > case ex => ex.printStackTrace(
Re: Null pointer exception while replying WAL
Hi, It is challenging to make a recommendation without further details. I am guessing you are trying to build a fault-tolerant spark application (spark structured streaming) that consumes messages from Solace? To address *NullPointerException* in the context of the provided information, you need to review the part of the code where the exception is thrown and identifying which object or method call is resulting in *null* can help the debugging process plus checking the logs. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 10 Feb 2024 at 05:29, nayan sharma wrote: > Hi Users, > > I am trying to build fault tolerant spark solace consumer. > > Issue :- we have to take restart of the job due to multiple issue load > average is one of them. At that time whatever spark is processing or > batches in the queue is lost. We can't replay it because we already had > send ack while calling store(). > > Solution:- I have tried implementing WAL and checkpointing in the > solution. Job is able to identify the lost batches, records are not being > written in the log file but throwing NPE. > > We are creating sparkcontext using sc.getorcreate() > > > Thanks, > Nayan >
Re: Building an Event-Driven Real-Time Data Processor with Spark Structured Streaming and API Integration
The full code is available from the link below https://github.com/michTalebzadeh/Event_Driven_Real_Time_data_processor_with_SSS_and_API_integration Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 9 Feb 2024 at 16:16, Mich Talebzadeh wrote: > Appreciate your thoughts on this, Personally I think Spark Structured > Streaming can be used effectively in an Event Driven Architecture as well > as continuous streaming) > > From the link here > <https://www.linkedin.com/posts/activity-7161748945801617409-v29V?utm_source=share_medium=member_desktop> > > HTH, > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >
Building an Event-Driven Real-Time Data Processor with Spark Structured Streaming and API Integration
Appreciate your thoughts on this, Personally I think Spark Structured Streaming can be used effectively in an Event Driven Architecture as well as continuous streaming) >From the link here <https://www.linkedin.com/posts/activity-7161748945801617409-v29V?utm_source=share_medium=member_desktop> HTH, Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Issue in Creating Temp_view in databricks and using spark.sql().
I agree with what is stated. This is the gist of my understanding having tested it. When working with Spark Structured Streaming, each streaming query runs in its own separate Spark session to ensure isolation and avoid conflicts between different queries. So here I have: def process_data(self, df: F.DataFrame, batchId: int) -> None: if(len(df.take(1))) > 0: df.select(col("timestamp"), col("value"), col("rowkey"), col("ID"), col("CLUSTERED"), col("op_time")).show(1, False) df.createOrReplaceTempView("tmp_view") try: rows = *df.sparkSession.sq*l("SELECT COUNT(1) FROM tmp_view").collect()[0][0] print(f"Number of rows: {rows}") except Exception as e: logging.error(f"Error counting rows: {e}") else: logging.warning("DataFrame is empty") Here, df.sparkSession accesses the rows associated with the streaming DataFrame 'df' +---++++-+---+ |timestamp |value |rowkey |ID |CLUSTERED|op_time| +---++++-+---+ |2024-01-31 20:31:24.152|25754740|df4d864d-517d-4f59-8f9e-bd1e7cd9678f|25754740|2575473.9|2024-01-31 20:31:30| +---++--------++-+---+ only showing top 1 row rows is 50 HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 31 Jan 2024 at 13:30, Karthick Nk wrote: > Hi Team, > > I am using structered streaming in pyspark in azure Databricks, in that I > am creating temp_view from dataframe > (df.createOrReplaceTempView('temp_view')) for performing spark sql query > transformation. > In that I am facing the issue that temp_view not found, so that as a > workaround i have created global temp_view to use. > But same when i have tried to create without streaming, i am able to > perform the temp_view. > > > write_to_final_table = > > (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1 > minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema", > "true").option("failOnDataLoss", "false").start() > > > def process_micro_batch(micro_batch_df, batchId) : > micro_batch_df.createOrReplaceTempView("temp_view") > df = spark.sql(f"select * from temp_view") > return df > > Here, I am getting error, while reading data from temp_view that temp_view > not found error. > > > I need to perform or create temp_view (*Not global temp_view)based on the > dataframe, and need to perform the spark sql transformation in structered > streaming. > > I have few question in my hand? > 1. is strucutered streaming and spark.sql will have different > spark.context within same databricks notebook? > 2. If i want to create temp_view based on the dataframe and need to > perform the spark sql operation, how can i create the tempview (Not global > tempview, Since global temp view will be available in the cluster level > across all the notebook)? > > Thanks & Regards >
Re: Issue in Creating Temp_view in databricks and using spark.sql().
hm. In your logic here def process_micro_batch(micro_batch_df, batchId) : micro_batch_df.createOrReplaceTempView("temp_view") df = spark.sql(f"select * from temp_view") return df Is this function called and if so do you check if micro_batch_df contains rows -> if len(micro_batch_df.take(1)) > 0: something like # Modified process_data function to check for external trigger def process_data(batch_df: F.DataFrame, batchId: int) -> None: *if len(batch_df.take(1)) > 0:* # Check for external event trigger if listen_for_external_event(): # Assuming 'data' is a list of dictionaries obtained from the API in each batch api_data = get_api_data() if api_data: dfAPI = spark_session.createDataFrame(api_data, schema=data_schema) dfAPI = dfAPI \ .withColumn("op_type", lit(udfs.op_type_api_udf())) \ .withColumn("op_time", udfs.timestamp_udf(current_timestamp())) dfAPI.show(10, False) else: logging.warning("Error getting API data.") else: logging.info("No external trigger received.") *else:* *logging.warning("DataFrame is empty")* # Streaming DataFrame Creation: # construct a streaming dataframe that subscribes to topic rate for data """ This creates a streaming DataFrame by subscribing to a rate source. It simulates a stream by generating data at a specified rate (rowsPerSecond). """ streamingDataFrame = spark_session.readStream.format("rate") \ .option("rowsPerSecond", 100) \ .option("subscribe", "rate") \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "latest") \ .load() # Generate a unique query name by appending a timestamp query_name = f"{appName}_{int(time.time())}" logging.info(query_name) # Main loop to continuously check for events while True: # Start the streaming query only if an external event is received if listen_for_external_event(): query_name = f"{appName}_{int(time.time())}" logging.info(query_name) result_query = ( streamingDataFrame.writeStream .outputMode('append') .option("truncate", "false") .foreachBatch(lambda df, batchId: process_data(df, batchId)) .trigger(processingTime=f'{processingTime} seconds') .option('checkpointLocation', checkpoint_path) .queryName(f"{query_name}") .start() ) break # Exit the loop after starting the streaming query else: time.sleep(5) # Sleep for a while before checking for the next event HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 31 Jan 2024 at 13:30, Karthick Nk wrote: > Hi Team, > > I am using structered streaming in pyspark in azure Databricks, in that I > am creating temp_view from dataframe > (df.createOrReplaceTempView('temp_view')) for performing spark sql query > transformation. > In that I am facing the issue that temp_view not found, so that as a > workaround i have created global temp_view to use. > But same when i have tried to create without streaming, i am able to > perform the temp_view. > > > write_to_final_table = > > (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1 > minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema", > "true").option("failOnDataLoss", "false").start() > > > def process_micro_batch(micro_batch_df, batchId) : > micro_batch_df.createOrReplaceTempView("temp_view") > df = spark.sql(f"select * from temp_view") > return df > > Here, I am getting error, while reading data from temp_view that temp_view > not found error. > > > I need to perform or create temp_view (*Not global temp_view)
Re: startTimestamp doesn't work when using rate-micro-batch format
As I stated earlier on,, there are alternatives that you might explore socket sources for testing purposes. from pyspark.sql import SparkSession from pyspark.sql.functions import expr, when from pyspark.sql.types import StructType, StructField, LongType spark = SparkSession.builder \ .master("local[*]") \ .appName("StreamingSparkPartitioned") \ .getOrCreate() expression = when(expr("value % 3 = 1"), "stupid_event") \ .otherwise(when(expr("value % 3 = 2"), "smart_event") \ .otherwise("neutral_event")) # Define the schema to match the socket source data schema = StructType([StructField("value", LongType())]) checkpoint_path = "file:///ssd/hduser/randomdata/chkpt" # Start a socket source for testing socket_streamingDF = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", ) \ .load() \ .withColumn("value", expr("CAST(value AS LONG)")) \ .withColumn("event_type", expression) query = ( socket_streamingDF.writeStream .outputMode("append") .format("console") .trigger(processingTime="1 second") .option("checkpointLocation", checkpoint_path) .start() ) query.awaitTermination() In this example, it listens to a socket on localhost: and expects a single integer value per line. You can use tools like netcat to send data to this socket for testing. echo "1" | nc -lk HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 29 Jan 2024 at 11:33, Perfect Stranger wrote: > Yes, there's definitely an issue, can someone fix it? I'm not familiar > with apache jira, do I need to make a bug report or what? > > On Mon, Jan 29, 2024 at 2:57 AM Mich Talebzadeh > wrote: > >> OK >> >> This is the equivalent Python code >> >> from pyspark.sql import SparkSession >> from pyspark.sql.functions import expr, when >> from pyspark.sql.types import StructType, StructField, LongType >> from datetime import datetime >> >> spark = SparkSession.builder \ >> .master("local[*]") \ >> .appName("StreamingSparkPartitioned") \ >> .getOrCreate() >> >> expression = when(expr("value % 3 = 1"), "stupid_event") \ >> .otherwise(when(expr("value % 3 = 2"), >> "smart_event").otherwise("neutral_event")) >> >> # Define the schema to match the rate-micro-batch data source >> schema = StructType([StructField("timestamp", LongType()), >> StructField("value", LongType())]) >> checkpoint_path = "file:///ssd/hduser/randomdata/chkpt" >> >> # Convert human-readable timestamp to Unix timestamp in milliseconds >> start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000) >> >> streamingDF = spark.readStream \ >> .format("rate-micro-batch") \ >> .option("rowsPerBatch", "100") \ >> .option("startTimestamp", start_timestamp) \ >> .option("numPartitions", 1) \ >> .load() \ >> .withColumn("event_type", expression) >> >> query = ( >> streamingDF.writeStream >> .outputMode("append") >> .format("console") >> .trigger(processingTime="1 second") >> .option("checkpointLocation", checkpoint_path) >> .start() >> ) >> >> query.awaitTermination() >> >> This is the error I am getting >> File >> "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line >> 38, in >> query.awaitTermination() >> File >> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line >> 201, in awaitTermination >> File >> "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line >> 1322, in __call__ >> File >> "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", >> line 175, in deco >> pyspark.errors.exceptions.ca
Re: startTimestamp doesn't work when using rate-micro-batch format
OK This is the equivalent Python code from pyspark.sql import SparkSession from pyspark.sql.functions import expr, when from pyspark.sql.types import StructType, StructField, LongType from datetime import datetime spark = SparkSession.builder \ .master("local[*]") \ .appName("StreamingSparkPartitioned") \ .getOrCreate() expression = when(expr("value % 3 = 1"), "stupid_event") \ .otherwise(when(expr("value % 3 = 2"), "smart_event").otherwise("neutral_event")) # Define the schema to match the rate-micro-batch data source schema = StructType([StructField("timestamp", LongType()), StructField("value", LongType())]) checkpoint_path = "file:///ssd/hduser/randomdata/chkpt" # Convert human-readable timestamp to Unix timestamp in milliseconds start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000) streamingDF = spark.readStream \ .format("rate-micro-batch") \ .option("rowsPerBatch", "100") \ .option("startTimestamp", start_timestamp) \ .option("numPartitions", 1) \ .load() \ .withColumn("event_type", expression) query = ( streamingDF.writeStream .outputMode("append") .format("console") .trigger(processingTime="1 second") .option("checkpointLocation", checkpoint_path) .start() ) query.awaitTermination() This is the error I am getting File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line 38, in query.awaitTermination() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId = f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable value for offset Did not find value which can be converted into long Seems like there might be an issue with the *rate-micro-batch* source when using the *startTimestamp* option. You can try using socket source for testing purposes HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 28 Jan 2024 at 22:00, Perfect Stranger wrote: > I described the issue here: > > > https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format > > Could someone please respond? > > The rate-micro-batch format doesn't seem to respect the startTimestamp > option. > > Thanks. >
Re: [Structured Streaming] Keeping checkpointing cost under control
catching up a bit late on this, I mentioned optimising RockDB as below in my earlier thread, specifically # Add RocksDB configurations here spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", "64") # Example configuration spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style", "level") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase", "67108864") - Maybe a bit more clarity will be useful, although they can be subjective and somehow debatable. 1. spark.sql.streaming.stateStore.rocksdb.changelog - Enable Changelog: - Benefit: Changelog is essential for maintaining state consistency and fault tolerance. Enabling it ensures that changes are logged and can be replayed in case of failures. - Drawback: The changelog can consume additional storage, especially when dealing with frequent state updates. 2. spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB - Write Buffer Size: - Benefit: A larger write buffer can improve write performance and reduce write amplification. - Drawback: It may lead to increased memory usage. The optimal size depends on the characteristics of your workload and available resources. 3. spark.sql.streaming.stateStore.rocksdb.compaction.style - Compaction Style: - Benefit: Choosing an appropriate compaction style (e.g., level) can impact read and write performance. - Drawback: Different compaction styles have trade-offs, and the optimal choice depends on factors like read vs. write workload and available storage. 4. spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase - Target File Size for Level-based Compaction: ** There is another compaction style called "uniform" which can be beneficial in scenarios with a heavy write workload ** - Borrowing from colloquial English, your mileage varies as usual. However, since you said you may consider tuning RockDB, I add the following if I may. - The choice of RocksDB and its configurations is often dependent on the specific requirements and characteristics of your streaming application. RocksDB can provide good performance for stateful streaming applications with proper tuning. However memory usage is a crucial consideration, especially when dealing with large state sizes. Also note that changelog is essential for achieving fault tolerance but may introduce additional storage overhead. - The optimal configuration depends on factors like the volume and characteristics of the streaming data, the frequency of state updates, and available resources. In summary, what works well for one workload might not be optimal for another. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 15:42, Andrzej Zera wrote: > Yes, I agree. But apart from maintaining this state internally (in memory > or in memory+disk as in case of RocksDB), every trigger it saves some > information about this state in a checkpoint location. I'm afraid we can't > do much about this checkpointing operation. I'll continue looking for > information on how I can decrease the number of LIST requests (ListBucket > operations) made in this process. > > Thank you for your input so far! > Andrzej > > śr., 10 sty 2024 o 16:33 Mich Talebzadeh > napisał(a): > >> Hi, >> >> You may have a point on scenario 2. >> >> Caching Streaming DataFrames: In Spark Streaming, each batch of data is >> processed incrementally, and it may not fit the typical caching we >> discussed. Instead, Spark Streaming has its mechanisms to manage and >> optimize the processing of streaming data. Case in point for caching >> partial results, one often relies on maintaining state by using stateful >> operations (see below) on Structured Streaming DataFrames. In such >> scenarios, Spark maintains state internally based on the operations >> performed. For example, if you are doing a groupBy
Re: Structured Streaming Process Each Records Individually
Hi, Let us visit the approach as some fellow members correctly highlighted the use case for spark structured streaming and two key concepts that I will mention - foreach: A method for applying custom write logic to each individual row in a streaming DataFrame or Dataset. - foreachBatch: A method for applying custom write logic to entire micro-batches of data, providing more flexibility for complex operations. - sendToSink (my chosen name here, Custom Logic ) : A user-defined function that encapsulates the logic for writing a micro-batch to a sink (In my case Google BigQuery DW) Let us create a pseudo code (in Python for sendToSink function used in f oreachBatch(SendToBSink) def sendToSink(df, batchId): if len(df.take(1)) > 0: # Check for empty DataFrame try: # Extract table names from the "@table" column table_names = df.select("@table").rdd.flatMap(lambda row: row).collect() # Iterate through each table name for table_name in table_names: # Filter the DataFrame for rows belonging to the current table table_df = df.filter(col("@table") == table_name) # Handle nested structures for specific tables if table_name in ["product_zones", "product_devices"]: # Extract nested data (e.g., "zones" or "device" columns) nested_data = table_df.select("zones", "device").rdd.flatMap(lambda row: row) # Create a separate DataFrame for nested data nested_df = spark.createDataFrame(nested_data, schema=nested_data.first().asDict()) # Write nested DataFrame to its corresponding table write_to_sink(nested_df, table_name) # Write the main DataFrame to its table write_to_sink(table_df, table_name) except Exception as e: # Log errors gracefully log_error(f"Error processing table {table_name}: {e}") else: print("DataFrame is empty") # Handle empty DataFrame HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 18:51, PRASHANT L wrote: > Hi > I have a use case where I need to process json payloads coming from Kafka > using structured streaming , but thing is json can have different formats , > schema is not fixed > and each json will have a @type tag so based on tag , json has to be > parsed and loaded to table with tag name , and if a json has nested sub > tags , those tags shd go to different table > so I need to process each json record individually , and determine > destination tables what would be the best approach > > >> *{* >> *"os": "andriod",* >> *"type": "mobile",* >> *"device": {* >> *"warrenty": "3 years",* >> *"replace": "yes"* >> *},* >> *"zones": [* >> *{* >> *"city": "Bangalore",* >> *"state": "KA",* >> *"pin": "577401"* >> *},* >> *{* >> *"city": "Mumbai",* >> *"state": "MH",* >> *"pin": "576003"* >> *}* >> *],* >> *"@table": "product"**}* > > > so for the above json , there are 3 tables created > 1. Product (@type) THis is a parent table > 2. poduct_zones and product_devices , child table >
Re: Structured Streaming Process Each Records Individually
Use an intermediate work table to put json data streaming in there in the first place and then according to the tag store the data in the correct table HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 18:51, PRASHANT L wrote: > Hi > I have a use case where I need to process json payloads coming from Kafka > using structured streaming , but thing is json can have different formats , > schema is not fixed > and each json will have a @type tag so based on tag , json has to be > parsed and loaded to table with tag name , and if a json has nested sub > tags , those tags shd go to different table > so I need to process each json record individually , and determine > destination tables what would be the best approach > > >> *{* >> *"os": "andriod",* >> *"type": "mobile",* >> *"device": {* >> *"warrenty": "3 years",* >> *"replace": "yes"* >> *},* >> *"zones": [* >> *{* >> *"city": "Bangalore",* >> *"state": "KA",* >> *"pin": "577401"* >> *},* >> *{* >> *"city": "Mumbai",* >> *"state": "MH",* >> *"pin": "576003"* >> *}* >> *],* >> *"@table": "product"**}* > > > so for the above json , there are 3 tables created > 1. Product (@type) THis is a parent table > 2. poduct_zones and product_devices , child table >
Re: [Structured Streaming] Keeping checkpointing cost under control
Hi, You may have a point on scenario 2. Caching Streaming DataFrames: In Spark Streaming, each batch of data is processed incrementally, and it may not fit the typical caching we discussed. Instead, Spark Streaming has its mechanisms to manage and optimize the processing of streaming data. Case in point for caching partial results, one often relies on maintaining state by using stateful operations (see below) on Structured Streaming DataFrames. In such scenarios, Spark maintains state internally based on the operations performed. For example, if you are doing a groupBy followed by an aggregation, Spark Streaming will manage the state of the keys and update them incrementally. Just to clarify, in the context of Spark Structured Streaming stateful operation refers to an operation that maintains and updates some form of state across batches of streaming data. Unlike stateless operations, which process each batch independently, stateful operations retain information from previous batches and use it to produce results for the current batch. So, bottom line, while one may not explicitly cache a streaming data frame, Spark internally optimizes the processing by maintaining the necessary state. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 14:20, Andrzej Zera wrote: > Hey, > > Yes, that's how I understood it (scenario 1). However, I'm not sure if > scenario 2 is possible. I think cache on streaming DataFrame is supported > only in forEachBatch (in which it's actually no longer a streaming DF). > > śr., 10 sty 2024 o 15:01 Mich Talebzadeh > napisał(a): > >> Hi, >> >> With regard to your point >> >> - Caching: Can you please explain what you mean by caching? I know that >> when you have batch and streaming sources in a streaming query, then you >> can try to cache batch ones to save on reads. But I'm not sure if it's what >> you mean, and I don't know how to apply what you suggest to streaming data. >> >> Let us visit this >> >> Caching purpose in Structured Streaming is to store frequently accessed >> data in memory or disk for faster retrieval, reducing repeated reads from >> sources. >> >> - Types: >> >>- Memory Caching: Stores data in memory for extremely fast access. >>- Disk Caching: Stores data on disk for larger datasets or >>persistence across triggers >> >> >> - Scenarios: >> >> Joining Streaming Data with Static Data: Cache static datasets >> (e.g., reference tables) to avoid repeated reads for each micro-batch. >> >>- >>- Reusing Intermediate Results: Cache intermediate dataframes that >>are expensive to compute and used multiple times within the query. >>- Window Operations: Cache data within a window to avoid re-reading >>for subsequent aggregations or calculations within that window. >> >> - Benefits: >> >>- Performance: Faster query execution by reducing I/O operations and >>computation overhead. >>- Cost Optimization: Reduced reads from external sources can lower >>costs, especially for cloud-based sources. >>- Scalability: Helps handle higher data volumes and throughput by >>minimizing expensive re-computations. >> >> >> Example codec >> >> scenario 1 >> >> static_data = spark.read.load("path/to/static/data") static_data.cache() >> streaming_data = spark.readStream.format("...").load() joined_data = >> streaming_data.join(static_data, ...) # Static data is cached for >> efficient joins >> >> scenario 2 >> >> intermediate_df = streaming_data.groupBy(...).count() >> intermediate_df.cache() >> # Use cached intermediate_df for further transformations or actions >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsib
Re: [Structured Streaming] Keeping checkpointing cost under control
Hi, With regard to your point - Caching: Can you please explain what you mean by caching? I know that when you have batch and streaming sources in a streaming query, then you can try to cache batch ones to save on reads. But I'm not sure if it's what you mean, and I don't know how to apply what you suggest to streaming data. Let us visit this Caching purpose in Structured Streaming is to store frequently accessed data in memory or disk for faster retrieval, reducing repeated reads from sources. - Types: - Memory Caching: Stores data in memory for extremely fast access. - Disk Caching: Stores data on disk for larger datasets or persistence across triggers - Scenarios: Joining Streaming Data with Static Data: Cache static datasets (e.g., reference tables) to avoid repeated reads for each micro-batch. - - Reusing Intermediate Results: Cache intermediate dataframes that are expensive to compute and used multiple times within the query. - Window Operations: Cache data within a window to avoid re-reading for subsequent aggregations or calculations within that window. - Benefits: - Performance: Faster query execution by reducing I/O operations and computation overhead. - Cost Optimization: Reduced reads from external sources can lower costs, especially for cloud-based sources. - Scalability: Helps handle higher data volumes and throughput by minimizing expensive re-computations. Example codec scenario 1 static_data = spark.read.load("path/to/static/data") static_data.cache() streaming_data = spark.readStream.format("...").load() joined_data = streaming_data.join(static_data, ...) # Static data is cached for efficient joins scenario 2 intermediate_df = streaming_data.groupBy(...).count() intermediate_df.cache() # Use cached intermediate_df for further transformations or actions HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 10 Jan 2024 at 13:10, Andrzej Zera wrote: > Thank you very much for your suggestions. Yes, my main concern is > checkpointing costs. > > I went through your suggestions and here're my comments: > > - Caching: Can you please explain what you mean by caching? I know that > when you have batch and streaming sources in a streaming query, then you > can try to cache batch ones to save on reads. But I'm not sure if it's what > you mean, and I don't know how to apply what you suggest to streaming data. > > - Optimize Checkpointing Frequency: I'm already using changelog > checkpointing with RocksDB and increased trigger interval to a maximum > acceptable value. > > - Minimize LIST Request: That's where I can get most savings. My LIST > requests account for ~70% of checkpointing costs. From what I see, LIST > requests are ~2.5x the number of PUT requests. Unfortunately, when I > changed to checkpoting location DBFS, it didn't help with minimizing LIST > requests. They are roughly at the same level. From what I see, S3 Optimized > Committer is EMR-specific so I can't use it in Databricks. The fact that I > don't see a difference between S3 and DBFS checkpoint location suggests > that both must implement the same or similar committer. > > - Optimizing RocksDB: I still need to do this but I don't suspect it will > help much. From what I understand, these settings shouldn't have a > significant impact on the number of requests to S3. > > Any other ideas how to limit the number of LIST requests are appreciated > > niedz., 7 sty 2024 o 15:38 Mich Talebzadeh > napisał(a): > >> OK I assume that your main concern is checkpointing costs. >> >> - Caching: If your queries read the same data multiple times, caching >> the data might reduce the amount of data that needs to be checkpointed. >> >> - Optimize Checkpointing Frequency i.e >> >>- Consider Changelog Checkpointing with RocksDB. This can >>potentially reduce checkpoint size and duration by only storing state >>changes, rather than the entire state. >>- Adjust Trigger Interval (if possible): While not ideal for your >>near-real time requirement, even a slight increase in the trigger interval >>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs. >> >> - Minimize LIST Requests: >> >>- Enable S3 Optimized Committe
Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.
Hi Ashok, Thanks for pointing out the databricks article Scalable Spark Structured Streaming for REST API Destinations | Databricks Blog <https://www.databricks.com/blog/scalable-spark-structured-streaming-rest-api-destinations> I browsed it and it is basically similar to many of us involved with spark structure streaming with *foreachBatch. *This article and mine both mention REST API as part of the architecture. However, there are notable differences I believe. In my proposed approach: 1. Event-Driven Model: - Spark Streaming waits until Flask REST API makes a request for events to be generated within PySpark. - Messages are generated and then fed into any sink based on the Flask REST API's request. - This creates a more event-driven model where Spark generates data when prompted by external requests. In the Databricks article scenario: Continuous Data Stream: - There is an incoming stream of data from sources like Kafka, AWS Kinesis, or Azure Event Hub handled by foreachBatch - As messages flow off this stream, calls are made to a REST API with some or all of the message data. - This suggests a continuous flow of data where messages are sent to a REST API as soon as they are available in the streaming source. *Benefits of Event-Driven Model:* 1. Responsiveness: Ideal for scenarios where data generation needs to be aligned with specific events or user actions. 2. Resource Optimization: Can reduce resource consumption by processing data only when needed. 3. Flexibility: Allows for dynamic control over data generation based on external triggers. *Benefits of Continuous Data Stream Mode with foreachBatch:* 1. Real-Time Processing: Facilitates immediate analysis and action on incoming data. 2. Handling High Volumes: Well-suited for scenarios with continuous, high-volume data streams. 3. Low-Latency Applications: Essential for applications requiring near real-time responses. *Potential Use Cases for my approach:* - On-Demand Data Generation: Generating data for simulations, reports, or visualizations based on user requests. - Triggered Analytics: Executing specific analytics tasks only when certain events occur, such as detecting anomalies or reaching thresholds say fraud detection. - Custom ETL Processes: Facilitating data extraction, transformation, and loading workflows based on external events or triggers Something to note on latency. Event-driven models like mine can potentially introduce slight latency compared to continuous processing, as data generation depends on API calls. So my approach is more event-triggered and responsive to external requests, while foreachBatch scenario is more continuous and real-time, processing and sending data as it becomes available. In summary, both approaches have their merits and are suited to different use cases depending on the nature of the data flow and processing requirements. Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 9 Jan 2024 at 19:11, ashok34...@yahoo.com wrote: > Hey Mich, > > Thanks for this introduction on your forthcoming proposal "Spark > Structured Streaming and Flask REST API for Real-Time Data Ingestion and > Analytics". I recently came across an article by Databricks with title > Scalable > Spark Structured Streaming for REST API Destinations > <https://www.databricks.com/blog/scalable-spark-structured-streaming-rest-api-destinations> > . Their use case is similar to your suggestion but what they are saying > is that they have incoming stream of data from sources like Kafka, AWS > Kinesis, or Azure Event Hub. In other words, a continuous flow of data > where messages are sent to a REST API as soon as they are available in the > streaming source. Their approach is practical but wanted to get your > thoughts on their article with a better understanding on your proposal and > differences. > > Thanks > > > On Tuesday, 9 January 2024 at 00:24:19 GMT, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > > > Please also note that Flask, by default, is a single-threaded web > framework. While it is suitable for development and small-scale > applications, it may not handle concurrent requests efficiently in a > production environment. > In production, one can utilise Gunicorn
Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.
Please also note that Flask, by default, is a single-threaded web framework. While it is suitable for development and small-scale applications, it may not handle concurrent requests efficiently in a production environment. In production, one can utilise Gunicorn (Green Unicorn) which is a WSGI ( Web Server Gateway Interface) that is commonly used to serve Flask applications in production. It provides multiple worker processes, each capable of handling a single request at a time. This makes Gunicorn suitable for handling multiple simultaneous requests and improves the concurrency and performance of your Flask application. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh wrote: > Thought it might be useful to share my idea with fellow forum members. During > the breaks, I worked on the *seamless integration of Spark Structured > Streaming with Flask REST API for real-time data ingestion and analytics*. > The use case revolves around a scenario where data is generated through > REST API requests in real time. The Flask REST AP > <https://en.wikipedia.org/wiki/Flask_(web_framework)>I efficiently > captures and processes this data, saving it to a Spark Structured Streaming > DataFrame. Subsequently, the processed data could be channelled into any > sink of your choice including Kafka pipeline, showing a robust end-to-end > solution for dynamic and responsive data streaming. I will delve into the > architecture, implementation, and benefits of this combination, enabling > one to build an agile and efficient real-time data application. I will put > the code in GitHub for everyone's benefit. Hopefully your comments will > help me to improve it. > > Cheers > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >
Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.
Thought it might be useful to share my idea with fellow forum members. During the breaks, I worked on the *seamless integration of Spark Structured Streaming with Flask REST API for real-time data ingestion and analytics*. The use case revolves around a scenario where data is generated through REST API requests in real time. The Flask REST AP <https://en.wikipedia.org/wiki/Flask_(web_framework)>I efficiently captures and processes this data, saving it to a Spark Structured Streaming DataFrame. Subsequently, the processed data could be channelled into any sink of your choice including Kafka pipeline, showing a robust end-to-end solution for dynamic and responsive data streaming. I will delve into the architecture, implementation, and benefits of this combination, enabling one to build an agile and efficient real-time data application. I will put the code in GitHub for everyone's benefit. Hopefully your comments will help me to improve it. Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Pyspark UDF as a data source for streaming
Hi, Have you come back with some ideas for implementing this? Specifically integrating Spark Structured Streaming with REST API? FYI, I did some work on it as it can have potential wider use cases, i.e. the seamless integration of Spark Structured Streaming with Flask REST API for real-time data ingestion and analytics. My use case revolves around a scenario where data is generated through REST API requests in real time with Pyspark.. The Flask REST API efficiently captures and processes this data, saving it to a sync of your choice like a data warehouse or kafka. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович wrote: > Hello! > > Is it possible to write pyspark UDF, generated data to streaming dataframe? > > I want to get some data from REST API requests in real time and consider > to save this data to dataframe. > > And then put it to Kafka. > > I can't realise how to create streaming dataframe from generated data. > > > > I am new in spark streaming. > > Could you give me some hints? > > > > Best regards, > > Stanislav Porotikov > > >
Re: [Structured Streaming] Keeping checkpointing cost under control
OK I assume that your main concern is checkpointing costs. - Caching: If your queries read the same data multiple times, caching the data might reduce the amount of data that needs to be checkpointed. - Optimize Checkpointing Frequency i.e - Consider Changelog Checkpointing with RocksDB. This can potentially reduce checkpoint size and duration by only storing state changes, rather than the entire state. - Adjust Trigger Interval (if possible): While not ideal for your near-real time requirement, even a slight increase in the trigger interval (e.g., to 7-8 seconds) can reduce checkpoint frequency and costs. - Minimize LIST Requests: - Enable S3 Optimized Committer: or as you stated consider DBFS You can also optimise RocksDB. Set your state backend to RocksDB, if not already. Here are what I use # Add RocksDB configurations here spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog", "true") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", "64") # Example configuration spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style", "level") spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase", "67108864") These configurations provide a starting point for tuning RocksDB. Depending on your specific use case and requirements, of course, your mileage varies. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 7 Jan 2024 at 08:07, Andrzej Zera wrote: > Usually one or two topics per query. Each query has its own checkpoint > directory. Each topic has a few partitions. > > Performance-wise I don't experience any bottlenecks in terms of > checkpointing. It's all about the number of requests (including a high > number of LIST requests) and the associated cost. > > sob., 6 sty 2024 o 13:30 Mich Talebzadeh > napisał(a): > >> How many topics and checkpoint directories are you dealing with? >> >> Does each topic has its own checkpoint on S3? >> >> All these checkpoints are sequential writes so even SSD would not really >> help >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera wrote: >> >>> Hey, >>> >>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that >>> require near-real time accuracy with trigger intervals in the level of 5-10 >>> seconds. I usually run 3-6 streaming queries as part of the job and each >>> query includes at least one stateful operation (and usually two or more). >>> My checkpoint location is S3 bucket and I use RocksDB as a state store. >>> Unfortunately, checkpointing costs are quite high. It's the main cost item >>> of the system and it's roughly 4-5 times the cost of compute. >>> >>> To save on compute costs, the following things are usually recommended: >>> >>>- increase trigger interval (as mentioned, I don't have much space >>>here) >>>- decrease the number of shuffle partitions (I have 2x the number of >>>workers) >>> >>> I'm looking for some other recommendations that I can use to save on >>> checkpointing costs. I saw that most requests are LIST requests. Can we cut >>> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS, >>> will it help in any way? >>> >>> Thank you! >>> Andrzej >>> >>>
Re: [Structured Streaming] Keeping checkpointing cost under control
How many topics and checkpoint directories are you dealing with? Does each topic has its own checkpoint on S3? All these checkpoints are sequential writes so even SSD would not really help HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 6 Jan 2024 at 08:19, Andrzej Zera wrote: > Hey, > > I'm running a few Structured Streaming jobs (with Spark 3.5.0) that > require near-real time accuracy with trigger intervals in the level of 5-10 > seconds. I usually run 3-6 streaming queries as part of the job and each > query includes at least one stateful operation (and usually two or more). > My checkpoint location is S3 bucket and I use RocksDB as a state store. > Unfortunately, checkpointing costs are quite high. It's the main cost item > of the system and it's roughly 4-5 times the cost of compute. > > To save on compute costs, the following things are usually recommended: > >- increase trigger interval (as mentioned, I don't have much space >here) >- decrease the number of shuffle partitions (I have 2x the number of >workers) > > I'm looking for some other recommendations that I can use to save on > checkpointing costs. I saw that most requests are LIST requests. Can we cut > them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS, > will it help in any way? > > Thank you! > Andrzej > >
Re: Issue with Spark Session Initialization in Kubernetes Deployment
Hi, I personally do not use the Spark operator. Anyhow, the Spark Operator automates the deployment and management of Spark applications within Kubernetes. However, it does not eliminate the need to configure Spark sessions for proper communication with the k8 cluster. So specifying the master URL is mandatory even when using the Spark Operator in Kubernetes deployments. from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("AppConstants.APPLICATION_NAME") \ .config("spark.master", "k8s://https://:") \ .getOrCreate() HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 4 Jan 2024 at 21:29, Atul Patil wrote: > Hello Team, > > I am currently working on initializing a Spark session using Spark > Structure Streaming within a Kubernetes deployment managed by the Spark > Operator. During the initialization process, I encountered an error message > indicating the necessity to set a master URL: > > *"Caused by: org.apache.spark.SparkException: A master URL must be set in > your configuration."* > > Could you kindly confirm if specifying the master URL is mandatory in this > context of using Spark Structure Streaming programming with the Spark > Operator for Kubernetes deployments? > > Below is the code snippet I am using for Spark session initialization: > SparkSession sparkSession = SparkSession.builder() > .appName(AppConstants.APPLICATION_NAME) > .getOrCreate() > > Thank you! > > Regards, > Atul > > >
Re: Pyspark UDF as a data source for streaming
Hi, Do you have more info on this Jira besides the github link as I don't seem to find it! Thanks Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon wrote: > Just fyi streaming python data source is in progress > https://github.com/apache/spark/pull/44416 we will likely release this in > spark 4.0 > > On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович > wrote: > >> Yes, it's actual data. >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >> *From:* Mich Talebzadeh >> *Sent:* Wednesday, December 27, 2023 9:43 PM >> *Cc:* user@spark.apache.org >> *Subject:* Re: Pyspark UDF as a data source for streaming >> >> >> >> Is this generated data actual data or you are testing the application? >> >> >> >> Sounds like a form of Lambda architecture here with some >> decision/processing not far from the attached diagram >> >> >> >> HTH >> >> >> Mich Talebzadeh, >> >> Dad | Technologist | Solutions Architect | Engineer >> >> London >> >> United Kingdom >> >> >> >> [image: Рисунок удален отправителем.] view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> >> >> >> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович < >> s.poroti...@skbkontur.ru> wrote: >> >> Actually it's json with specific structure from API server. >> >> But the task is to check constantly if new data appears on API server and >> load it to Kafka. >> >> Full pipeline can be presented like that: >> >> REST API -> Kafka -> some processing -> Kafka/Mongo -> … >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >> *From:* Mich Talebzadeh >> *Sent:* Wednesday, December 27, 2023 6:17 PM >> *To:* Поротиков Станислав Вячеславович >> *Cc:* user@spark.apache.org >> *Subject:* Re: Pyspark UDF as a data source for streaming >> >> >> >> Ok so you want to generate some random data and load it into Kafka on a >> regular interval and the rest? >> >> >> >> HTH >> >> Mich Talebzadeh, >> >> Dad | Technologist | Solutions Architect | Engineer >> >> London >> >> United Kingdom >> >> >> >> [image: Рисунок удален отправителем.] view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> >> >> >> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович >> wrote: >> >> Hello! >> >> Is it possible to write pyspark UDF, generated data to streaming >> dataframe? >> >> I want to get some data from REST API requests in real time and consider >> to save this data to dataframe. >> >> And then put it to Kafka. >> >> I can't realise how to create streaming dataframe from generated data. >> >> >> >> I am new in spark streaming. >> >> Could you give me some hints? >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >>
Re: Pyspark UDF as a data source for streaming
Hi Stanislav , On Pyspark DF can you the following df.printSchema() and send the output please HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 28 Dec 2023 at 12:31, Поротиков Станислав Вячеславович < s.poroti...@skbkontur.ru> wrote: > Ok. Thank you very much! > > > > Best regards, > > Stanislav Porotikov > > > > *From:* Mich Talebzadeh > *Sent:* Thursday, December 28, 2023 5:14 PM > *To:* Hyukjin Kwon > *Cc:* Поротиков Станислав Вячеславович ; > user@spark.apache.org > *Subject:* Re: Pyspark UDF as a data source for streaming > > > > You can work around this issue by trying to write your DF to a flat file > and use Kafka to pick it up from the flat file and stream it in. > > > > Bear in mind that Kafa will require a unique identifier as K/V pair. Check > this link how to generate UUID for this purpose > > > https://stackoverflow.com/questions/49785108/spark-streaming-with-python-how-to-add-a-uuid-column > > > > HTH > > > Mich Talebzadeh, > > Dad | Technologist | Solutions Architect | Engineer > > London > > United Kingdom > > > > [image: Рисунок удален отправителем.] view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > > > > On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon wrote: > > Just fyi streaming python data source is in progress > > https://github.com/apache/spark/pull/44416 we will likely release this in > spark 4.0 > > > > On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович > wrote: > > Yes, it's actual data. > > > > Best regards, > > Stanislav Porotikov > > > > *From:* Mich Talebzadeh > *Sent:* Wednesday, December 27, 2023 9:43 PM > *Cc:* user@spark.apache.org > *Subject:* Re: Pyspark UDF as a data source for streaming > > > > Is this generated data actual data or you are testing the application? > > > > Sounds like a form of Lambda architecture here with some > decision/processing not far from the attached diagram > > > > HTH > > > Mich Talebzadeh, > > Dad | Technologist | Solutions Architect | Engineer > > London > > United Kingdom > > > > [image: Рисунок удален отправителем.] view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > > > > On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович < > s.poroti...@skbkontur.ru> wrote: > > Actually it's json with specific structure from API server. > > But the task is to check constantly if new data appears on API server and > load it to Kafka. > > Full pipeline can be presented like that: > > REST API -> Kafka -> some processing -> Kafka/Mongo -> … > > > > Best regards, > > Stanislav Porotikov > > > > *From:* Mich Talebzadeh > *Sent:* Wednesday, December 27, 2023 6:17 PM > *To:* Поротиков Станислав Вячеславович > *Cc:* user@spark.apache.org > *Subject:* Re: Pyspark UDF as a data source for streaming > > > > Ok so you want to generate some random data and load it into Kafka on a > regular interval and the rest? > > > > HTH > > Mich Talebzadeh, > > Dad | Technologist | Solutions Architect | Engineer > > London > > United Kingdom > > > > [image: Р
Re: Pyspark UDF as a data source for streaming
You can work around this issue by trying to write your DF to a flat file and use Kafka to pick it up from the flat file and stream it in. Bear in mind that Kafa will require a unique identifier as K/V pair. Check this link how to generate UUID for this purpose https://stackoverflow.com/questions/49785108/spark-streaming-with-python-how-to-add-a-uuid-column HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 28 Dec 2023 at 09:33, Hyukjin Kwon wrote: > Just fyi streaming python data source is in progress > https://github.com/apache/spark/pull/44416 we will likely release this in > spark 4.0 > > On Thu, Dec 28, 2023 at 4:53 PM Поротиков Станислав Вячеславович > wrote: > >> Yes, it's actual data. >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >> *From:* Mich Talebzadeh >> *Sent:* Wednesday, December 27, 2023 9:43 PM >> *Cc:* user@spark.apache.org >> *Subject:* Re: Pyspark UDF as a data source for streaming >> >> >> >> Is this generated data actual data or you are testing the application? >> >> >> >> Sounds like a form of Lambda architecture here with some >> decision/processing not far from the attached diagram >> >> >> >> HTH >> >> >> Mich Talebzadeh, >> >> Dad | Technologist | Solutions Architect | Engineer >> >> London >> >> United Kingdom >> >> >> >> [image: Рисунок удален отправителем.] view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> >> >> >> On Wed, 27 Dec 2023 at 13:26, Поротиков Станислав Вячеславович < >> s.poroti...@skbkontur.ru> wrote: >> >> Actually it's json with specific structure from API server. >> >> But the task is to check constantly if new data appears on API server and >> load it to Kafka. >> >> Full pipeline can be presented like that: >> >> REST API -> Kafka -> some processing -> Kafka/Mongo -> … >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >> *From:* Mich Talebzadeh >> *Sent:* Wednesday, December 27, 2023 6:17 PM >> *To:* Поротиков Станислав Вячеславович >> *Cc:* user@spark.apache.org >> *Subject:* Re: Pyspark UDF as a data source for streaming >> >> >> >> Ok so you want to generate some random data and load it into Kafka on a >> regular interval and the rest? >> >> >> >> HTH >> >> Mich Talebzadeh, >> >> Dad | Technologist | Solutions Architect | Engineer >> >> London >> >> United Kingdom >> >> >> >> [image: Рисунок удален отправителем.] view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> >> >> >> On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович >> wrote: >> >> Hello! >> >> Is it possible to write pyspark UDF, generated data to streaming >> dataframe? >> >> I want to get some data from REST API requests in real time and consider >> to save this data to dataframe. >> >> And then put it to Kafka. >> >> I can't realise how to create streaming dataframe from generated data. >> >> >> >> I am new in spark streaming. >> >> Could you give me some hints? >> >> >> >> Best regards, >> >> Stanislav Porotikov >> >> >> >>
Re: Pyspark UDF as a data source for streaming
Ok so you want to generate some random data and load it into Kafka on a regular interval and the rest? HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 27 Dec 2023 at 12:16, Поротиков Станислав Вячеславович wrote: > Hello! > > Is it possible to write pyspark UDF, generated data to streaming dataframe? > > I want to get some data from REST API requests in real time and consider > to save this data to dataframe. > > And then put it to Kafka. > > I can't realise how to create streaming dataframe from generated data. > > > > I am new in spark streaming. > > Could you give me some hints? > > > > Best regards, > > Stanislav Porotikov > > >
Re: Validate spark sql
Worth trying EXPLAIN <https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html>statement as suggested by @tianlangstudio HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 25 Dec 2023 at 11:15, tianlangstudio wrote: > > What about EXPLAIN? > https://spark.apache.org/docs/3.5.0/sql-ref-syntax-qry-explain.html#content > > > ><https://www.upwork.com/fl/huanqingzhu> > <https://www.tianlang.tech/>Fusion Zhu <https://www.tianlang.tech/> > > ------ > 发件人:ram manickam > 发送时间:2023年12月25日(星期一) 12:58 > 收件人:Mich Talebzadeh > 抄 送:Nicholas Chammas; user< > user@spark.apache.org> > 主 题:Re: Validate spark sql > > Thanks Mich, Nicholas. I tried looking over the stack overflow post and > none of them > Seems to cover the syntax validation. Do you know if it's even possible to > do syntax validation in spark? > > Thanks > Ram > > On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > Well not to put too finer point on it, in a public forum, one ought to > respect the importance of open communication. Everyone has the right to ask > questions, seek information, and engage in discussions without facing > unnecessary patronization. > > > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > Disclaimer: Use it at your own risk.Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas > wrote: > This is a user-list question, not a dev-list question. Moving this > conversation to the user list and BCC-ing the dev list. > > Also, this statement > > > We are not validating against table or column existence. > > is not correct. When you call spark.sql(…), Spark will lookup the table > references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them. > > Also, when you run DDL via spark.sql(…), Spark will actually run it. So > spark.sql(“drop table my_table”) will actually drop my_table. It’s not a > validation-only operation. > > This question of validating SQL is already discussed on Stack Overflow > <https://stackoverflow.com/q/46973729/877069>. You may find some useful > tips there. > > Nick > > > On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh > wrote: > > > Yes, you can validate the syntax of your PySpark SQL queries without > connecting to an actual dataset or running the queries on a cluster. > PySpark provides a method for syntax validation without executing the > query. Something like below > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 3.4.0 > /_/ > Using Python version 3.9.16 (main, Apr 24 2023 10:36:11) > Spark context Web UI available at http://rhes75:4040 > Spark context available as 'sc' (master = local[*], app id = > local-1703410019374). > SparkSession available as 'spark'. > >>> from pyspark.sql import SparkSession > >>> spark = SparkSession.builder.appName("validate").getOrCreate() > 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only > runtime SQL configurations will take effect. > >>> sql = "SELECT * FROM WHERE = some value" > >>> try: > ... spark.sql(sql) > ... print("is working") > ... except Exception as e: > ... print(f"Syntax error: {e}") > ... > Syntax error: > [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14) > == SQL == > SELECT * FROM WHERE = some value > --^^^ > > Here we only check for syntax errors and not the actual existence of query > semantics. We are not
Re: Validate spark sql
Well not to put too finer point on it, in a public forum, one ought to respect the importance of open communication. Everyone has the right to ask questions, seek information, and engage in discussions without facing unnecessary patronization. Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas wrote: > This is a user-list question, not a dev-list question. Moving this > conversation to the user list and BCC-ing the dev list. > > Also, this statement > > > We are not validating against table or column existence. > > is not correct. When you call spark.sql(…), Spark will lookup the table > references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them. > > Also, when you run DDL via spark.sql(…), Spark will actually run it. So > spark.sql(“drop table my_table”) will actually drop my_table. It’s not a > validation-only operation. > > This question of validating SQL is already discussed on Stack Overflow > <https://stackoverflow.com/q/46973729/877069>. You may find some useful > tips there. > > Nick > > > On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh > wrote: > > > Yes, you can validate the syntax of your PySpark SQL queries without > connecting to an actual dataset or running the queries on a cluster. > PySpark provides a method for syntax validation without executing the > query. Something like below > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 3.4.0 > /_/ > > Using Python version 3.9.16 (main, Apr 24 2023 10:36:11) > Spark context Web UI available at http://rhes75:4040 > Spark context available as 'sc' (master = local[*], app id = > local-1703410019374). > SparkSession available as 'spark'. > >>> from pyspark.sql import SparkSession > >>> spark = SparkSession.builder.appName("validate").getOrCreate() > 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session; only > runtime SQL configurations will take effect. > >>> sql = "SELECT * FROM WHERE = some value" > >>> try: > ... spark.sql(sql) > ... print("is working") > ... except Exception as e: > ... print(f"Syntax error: {e}") > ... > Syntax error: > [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14) > > == SQL == > SELECT * FROM WHERE = some value > --^^^ > > Here we only check for syntax errors and not the actual existence of query > semantics. We are not validating against table or column existence. > > This method is useful when you want to catch obvious syntax errors before > submitting your PySpark job to a cluster, especially when you don't have > access to the actual data. > > In summary > >- Theis method validates syntax but will not catch semantic errors >- If you need more comprehensive validation, consider using a testing >framework and a small dataset. >- For complex queries, using a linter or code analysis tool can help >identify potential issues. > > HTH > > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 24 Dec 2023 at 07:57, ram manickam wrote: > >> Hello, >> Is there a way to validate pyspark sql to validate only syntax errors?. I >> cannot connect do actual data set to perform this validation. Any >> help would be appreciated. >> >> >> Thanks >> Ram >> > >
Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)
Apologies Koert! Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 15 Dec 2023 at 14:12, Mich Talebzadeh wrote: > Hi kurt, > > I read this document of yours. indeed interesting and pretty recent (9th > Dec). > > I am more focused on GCP and GKE > <https://cloud.google.com/kubernetes-engine?hl=en>. But obviously the > concepts are the same. One thing I noticed, there was a lack of mention > of Workload Identity federation or eqivalent, which is the recommended way > for workloads running on k8s to access Cloud services in a secure and > manageable way. Specifically I quote "Workload Identity > <https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity> > allows workloads in your GKE clusters to impersonate Identity and Access > Management (IAM) service accounts to access Google Cloud services." > > *Cheers* > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 14 Dec 2023 at 07:58, Koert Kuipers wrote: > >> yes it does using IAM roles for service accounts. >> see: >> >> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html >> >> i wrote a little bit about this also here: >> https://technotes.tresata.com/spark-on-k8s/ >> >> On Wed, Dec 13, 2023 at 7:52 AM Atul Patil wrote: >> >>> Hello Team, >>> >>> >>> >>> Does Spark support role-based authentication and access to Amazon S3 for >>> Kubernetes deployment? >>> >>> *Note: we have deployed our spark application in the Kubernetes cluster.* >>> >>> >>> >>> Below are the Hadoop-AWS dependencies we are using: >>> >>> >>>org.apache.hadoop >>>hadoop-aws >>>3.3.4 >>> >>> >>> >>> >>> We are using the following configuration when creating the spark >>> session, but it is not working:: >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider", >>> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn", >>> System.getenv("AWS_ROLE_ARN")); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider", >>> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint", >>> "s3.eu-central-1.amazonaws.com"); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region", >>> Regions.EU_CENTRAL_1.getName()); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file", >>> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE")); >>> >>> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", >>> "30m"); >>> >>> >>> >>> Thank you! >>> >>> >>> >>> Regards, >>> >>> Atul >>> >> >> CONFIDENTIALITY NOTICE: This electronic communication and any files >> transmitted with it are confidential, privileged and intended solely for >> the use of the individual or entity to whom they are addressed. If you are >> not the intended recipient, you are hereby notified that any disclosure, >> copying, distribution (electronic or otherwise) or forwarding of, or the >> taking of any action in reliance on the contents of this transmission is >> strictly prohibited. Please notify the sender immediately by e-mail if you >> have received this email by mistake and delete this email from your system. >> >> Is it necessary to print this email? If you care about the environment >> like we do, please refrain from printing emails. It helps to keep the >> environment forested and litter-free. > >
Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)
Hi kurt, I read this document of yours. indeed interesting and pretty recent (9th Dec). I am more focused on GCP and GKE <https://cloud.google.com/kubernetes-engine?hl=en>. But obviously the concepts are the same. One thing I noticed, there was a lack of mention of Workload Identity federation or eqivalent, which is the recommended way for workloads running on k8s to access Cloud services in a secure and manageable way. Specifically I quote "Workload Identity <https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity> allows workloads in your GKE clusters to impersonate Identity and Access Management (IAM) service accounts to access Google Cloud services." *Cheers* Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 14 Dec 2023 at 07:58, Koert Kuipers wrote: > yes it does using IAM roles for service accounts. > see: > > https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html > > i wrote a little bit about this also here: > https://technotes.tresata.com/spark-on-k8s/ > > On Wed, Dec 13, 2023 at 7:52 AM Atul Patil wrote: > >> Hello Team, >> >> >> >> Does Spark support role-based authentication and access to Amazon S3 for >> Kubernetes deployment? >> >> *Note: we have deployed our spark application in the Kubernetes cluster.* >> >> >> >> Below are the Hadoop-AWS dependencies we are using: >> >> >>org.apache.hadoop >>hadoop-aws >>3.3.4 >> >> >> >> >> We are using the following configuration when creating the spark session, >> but it is not working:: >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider", >> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn", >> System.getenv("AWS_ROLE_ARN")); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider", >> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint", >> "s3.eu-central-1.amazonaws.com"); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region", >> Regions.EU_CENTRAL_1.getName()); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file", >> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE")); >> >> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", >> "30m"); >> >> >> >> Thank you! >> >> >> >> Regards, >> >> Atul >> > > CONFIDENTIALITY NOTICE: This electronic communication and any files > transmitted with it are confidential, privileged and intended solely for > the use of the individual or entity to whom they are addressed. If you are > not the intended recipient, you are hereby notified that any disclosure, > copying, distribution (electronic or otherwise) or forwarding of, or the > taking of any action in reliance on the contents of this transmission is > strictly prohibited. Please notify the sender immediately by e-mail if you > have received this email by mistake and delete this email from your system. > > Is it necessary to print this email? If you care about the environment > like we do, please refrain from printing emails. It helps to keep the > environment forested and litter-free.
Re: [EXTERNAL] Re: Spark-submit without access to HDFS
Hi Eugene, With regard to your points What are the PYTHONPATH and SPARK_HOME env variables in your script? OK let us look at a typical of my Spark project structure - project_root |-- README.md |-- __init__.py |-- conf | |-- (configuration files for Spark) |-- deployment | |-- deployment.yaml |-- design | |-- (design-related files or documentation) |-- othermisc | |-- (other miscellaneous files) |-- sparkutils | |-- (utility modules or scripts specific to Spark) |-- src |-- (main source code for your Spark application) If you want Spark to recognize modules from the sparkutils directory or any other directories within your project, you can include those directories in the PYTHONPATH. For example, if you want to include the sparkutils directory: export PYTHONPATH=/path/to/project_root/sparkutils:$PYTHONPATH to recap, the ${PYTHONPATH} variable is primarily used to specify additional directories where Python should look for modules and packages. In the context of Spark, it is typically used to include directories containing custom Python code or modules that your Spark application depends on. With regard to The --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" configuration option in Spark is used when submitting a Spark application to run on YARN - --conf: This is used to specify Spark configuration properties when submitting a Spark application. - spark.yarn.appMasterEnv.SPARK_HOME: This is a Spark configuration property that defines the value of the SPARK_HOME environment variable for the Spark application's Application Master (the process responsible for managing the execution of tasks on a YARN cluster). - $SPARK_HOME: This holds the path to the Spark installation directory. This configuration is setting the SPARK_HOME environment variable for the Spark Application Master when the application is running on YARN. This is important because the Spark Application Master needs to know the location of the Spark installation directory (SPARK_HOME) to configure and manage the Spark application's execution on the YARN cluster. HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 11 Dec 2023 at 01:43, Eugene Miretsky wrote: > Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't > understand a few things > > 1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty, > pyspark.zip is uploaded from the local SPARK_HOME. If it is set to > "local://" the upload is skipped. I would expect the latter to be the > default. What's the use case for uploading the local pyspark.zip every > time? > 2) It seems like the localConfigs are meant to be copied every time (code) > what's the use case for that? Why not just use the cluster config? > > > > On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky wrote: > >> Thanks Mich, >> >> Tried this and still getting >> INF Client: "Uploading resource >> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip -> >> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and >> __spark_conf__.zip). It is working now because I enabled direct >> access to HDFS to allow copying the files. But ideally I would like to not >> have to copy any files directly to HDFS. >> >> 1) We would expect pyspark as well as the relevant configs to already be >> available on the cluster - why are they being copied over? (we can always >> provide the extra libraries needed using py-files the way you did) >> 2) If we wanted users to be able to use custom pyspark, we would rather >> just copy the file HDFS/GCS in other ways, and let users reference it in >> their job >> 3) What are the PYTHONPATH and SPARK_HOME env variables in your script? >> Are they local paths, or paths on the spark cluster? >> >> On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi, >>> >>> How are you submitting your spark job from your client? >>> >>> Your files can either be on HDFS or HCFS such as gs, s3 etc. >>> >>> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I >>> assume you want your >>> >>> spark-submit
Re: [Streaming (DStream) ] : Does Spark Streaming supports pause/resume consumption of message from Kafka?
Ok pause/continue to throw some challenges. The implication is to pause gracefully and resume the same' First have a look at this SPIP of mine [SPARK-42485] SPIP: Shutting down spark structured streaming when the streaming process completed current process - ASF JIRA (apache.org) <https://issues.apache.org/jira/browse/SPARK-42485> <https://issues.apache.org/jira/browse/SPARK-42485>Then we can assume a graceful pause/restart As a suggestion, to implement conditional pausing and resuming, you can introduce a flag or control signal within your DStream processing logic. When the condition for pausing is met, the stop() method is called to temporarily halt message processing. Conversely, when the condition for resuming is met, the start() method is invoked to re-enable message consumption. Let us have a go at it is_paused = False def process_stream(message): global is_paused if not is_paused: # Perform processing logic here print(message) # Check for pausing condition if should_pause(message): is_paused = True stream.stop() # Check for resuming condition if should_resume() and is_paused: is_paused = False stream.start() stream = DStream(source) stream.foreach(process_stream) stream.start() HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 1 Dec 2023 at 12:56, Saurabh Agrawal (180813) wrote: > Hi Spark Team, > > I am using Spark 3.4.0 version in my application which is use to consume > messages from Kafka topics. > > I have below queries: > > 1. Does DStream support pause/resume streaming message consumption at > runtime on particular condition? If yes, please provide details. > > 2. I tried to revoke partition from consumer at runtime which cause error. > > > > *throw new IllegalStateException(s"Previously tracked partitions " +* > > *s"${revokedPartitions.mkString("[", ",", "]")} been revoked by > Kafka because of consumer " +* > > *s"rebalance. This is mostly due to another stream with same group > id joined, " +* > > *s"please check if there're different streaming application > misconfigure to use same " +* > > *s"group id. Fundamentally different stream should use different > group id")* > > > > > > 3. Does Spark support Blue/Green Deployment. I need to implement > Blue/Green Deployment scenario with Spark. Facing problem as need to deploy > both Blue and Green deployment with same consumer-group-id. As I read, > spark does not support 2 deployment with same consumer group-id, this > implementation is failing. Please guide how this can be implemented with > Spark. > > 4. Does Spark support Active-Active deployment. > > > > It will be great if you can reply on above queries please. > > > > -- > > > * Regards,* > > *Saurabh Agrawal* > > [image: Image] > > Software Development Specialist, IPaaS R > [image: A picture containing logoDescription automatically generated] > > > > > > *This email and the information contained herein is proprietary and > confidential and subject to the Amdocs Email Terms of Service, which you > may review at* *https://www.amdocs.com/about/email-terms-of-service* > <https://www.amdocs.com/about/email-terms-of-service> >
Re: Spark-submit without access to HDFS
Hi, How are you submitting your spark job from your client? Your files can either be on HDFS or HCFS such as gs, s3 etc. With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I assume you want your spark-submit --verbose \ --deploy-mode cluster \ --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \ --conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \ --conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \ --py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \ --conf "spark.driver.memory"=4G \ --conf "spark.executor.memory"=4G \ --conf "spark.num.executors"=4 \ --conf "spark.executor.cores"=2 \ $CODE_DIRECTORY_CLOUD/${APPLICATION} in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage CODE_DIRECTORY="/home/hduser/dba/bin/python/" CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes" cd $CODE_DIRECTORY [ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip echo `date` ", ===> creating source zip directory from ${source_code}" # zip needs to be done at root directory of code zip -rq ${source_code}.zip ${source_code} gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION} $CODE_DIRECTORY_CLOUD So in summary I create a zip file of my project and copy it across to the cloud storage and then put the application (py file) there as well and use them in spark-submit I trust this answers your question. HTH Mich Talebzadeh, Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky wrote: > Hey All, > > We are running Pyspark spark-submit from a client outside the cluster. The > client has network connectivity only to the Yarn Master, not the HDFS > Datanodes. How can we submit the jobs? The idea would be to preload all the > dependencies (job code, libraries, etc) to HDFS, and just submit the job > from the client. > > We tried something like this > 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master > yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py' > > The error we are getting is > " > > org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while > waiting for channel to be ready for connect. ch : > java.nio.channels.SocketChannel[connection-pending remote=/ > 10.117.110.19:9866] > > org.apache.hadoop.ipc.RemoteException(java.io.IOException): File > /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip* > could only be written to 0 of the 1 minReplication nodes. There are 2 > datanode(s) running and 2 node(s) are excluded in this operation. > " > > A few question > 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf > files? Why would the client send them to the cluster? (the cluster already > has all that info - this would make sense in client mode, but not cluster > mode ) > 2) Is it possible to use spark-submit without HDFS access? > 3) How would we fix this? > > Cheers, > Eugene > > -- > > *Eugene Miretsky* > Managing Partner | Badal.io | Book a meeting /w me! > <http://calendly.com/eugene-badal> > mobile: 416-568-9245 > email: eug...@badal.io >
Re: Spark master shuts down when one of zookeeper dies
Hi, Spark standalone mode does not use or rely on ZooKeeper by default. The Spark master and workers communicate directly with each other without using ZooKeeper. However, it appears that in your case you are relying on ZooKeeper to provide high availability for your standalone cluster. By configuring Spark to use ZooKeeper for leader election, you can ensure that there is always a Spark master running, even if one of the ZooKeeper servers goes down. To use ZooKeeper for high availability in Spark standalone mode, you need to configure the following properties: spark.deploy.recoveryMode: Set to ZOOKEEPER to enable high availability spark.deploy.zookeeper.url: The ZooKeeper cluster URL Now the Spark master shuts down when a Zookeeper instance is down because it loses its leadership. Zookeeper uses a leader election algorithm to ensure that there is always a single leader in the cluster. When a Zookeeper instance goes down, the remaining Zookeeper instances will elect a new leader. The original master that was down never comes up because it has lost its state. The Spark master stores its state in Zookeeper. When the Zookeeper instance that the master was connected to goes down, the master loses its state. This means that the master cannot restart without losing data. To avoid this problem, you can run multiple Spark masters in high availability mode. This means that you will have at least two Spark masters running at all times. When a Zookeeper instance goes down, the remaining Spark masters will continue to run and serve applications. As stated, to run Spark masters in high availability mode, you will need to configure the spark.deploy.recoveryMode property to ZOOKEEPER. You will also need to configure the spark.deploy.zookeeper.url property to point to your Zookeeper cluster. HTH, Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom Mich Talebzadeh (Ph.D.) | LinkedIn <https://www.linkedin.com/in/michtalebzadeh/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 6 Nov 2023 at 15:19, Kaustubh Ghode wrote: > I am using spark-3.4.1 I have a setup with three ZooKeeper servers, Spark > master shuts down when a Zookeeper instance is down a new master is elected > as leader and the cluster is up. But the original master that was down > never comes up. can you please help me with this issue? > > Stackoverflow link:- https://stackoverflow.com/questions/77431515 > > Thanks, > Kaustubh >
Re: Parser error when running PySpark on Windows connecting to GCS
General The reason why os.path.join is appending double backslash on Windows is because that is how Windows paths are represented. However, GCS paths (a Hadoop Compatible File System (HCFS) use forward slashes like in Linux. This can cause problems if you are trying to use a Windows path in a Spark job, *because Spark assumes that all paths are Linux paths*. A way to avoid this problem is to use the os.path.normpath function to normalize the path before passing it to Spark. This will ensure that the path is in a format that is compatible with Spark. *In Python* import os # example path = "gs://etcbucket/data-file" normalized_path = os.path.normpath(path) # Pass the normalized path to Spark *In Scala* import java.io.File val path = "gs://etcbucket/data-file" val normalizedPath = new File(path).getCanonicalPath() // Pass the normalized path to Spark HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 4 Nov 2023 at 12:28, Richard Smith wrote: > Hi All, > > I've just encountered and worked around a problem that is pretty obscure > and unlikely to affect many people, but I thought I'd better report it > anyway > > All the data I'm using is inside Google Cloud Storage buckets (path starts > with gs://) and I'm running Spark 3.5.0 locally (for testing, real thing is > on serverless Dataproc) on a Windows 10 laptop. The job fails when reading > metadata via the machine learning scripts. > > The error is *org.apache.hadoop.shaded.com.google.rej2.PatternSyntaxException: > error parsing regexp: invalid escape sequence: '\m'* > > I tracked it down to *site-packages/pyspark/ml/util.py* line 578 > > metadataPath = os.path.join(path,"metadata") > > which seems innocuous but what's happening is because I'm on Windows, > os.path.join is appending double backslash, whilst the gcs path uses > forward slashes like Linux. > > I hacked the code to explicitly use forward slash if path contains gs: and > the job now runs successfully. > > Richard >
Re: Data analysis issues
Hi, Your mileage varies so to speak.Whether or not the data you use to analyze in Spark through RStudio will be seen by Spark's back-end depends on how you deploy Spark and RStudio. If you are deploying Spark and RStudio on your own premises or in a private cloud environment, then the data you use will only be accessible to the roles that have access to your environment. However, if you are using a managed Spark service such as Google Dataproc or Amazon EMR etc, then the data you use may be accessible to Spark's back-end. This is because managed Spark services typically store your data on their own servers. Try using encryption combined with RBAC (who can access what), to protect your data privacy. Also beware of security risks associated with third-party libraries if you are deploying them. HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Nov 2023 at 22:46, Jauru Lin wrote: > Hello all, > > I have a question about Apache Spark, > I would like to ask if I use Rstudio to connect to Spark to analyze data, > will the data I use be seen by Spark's back-end personnel? > > Hope someone can solve my problem. > Thanks! >