Re: Validate spark sql

2023-12-26 Thread Gourav Sengupta
Dear friend,

thanks a ton was looking for linting for SQL for a long time, looks like
https://sqlfluff.com/ is something that can be used :)

Thank you so much, and wish you all a wonderful new year.

Regards,
Gourav

On Tue, Dec 26, 2023 at 4:42 AM Bjørn Jørgensen 
wrote:

> You can try sqlfluff  it's a linter for SQL code
> and it seems to have support for sparksql
> 
>
>
> man. 25. des. 2023 kl. 17:13 skrev ram manickam :
>
>> Thanks Mich, Nicholas. I tried looking over the stack overflow post and
>> none of them
>> Seems to cover the syntax validation. Do you know if it's even possible
>> to do syntax validation in spark?
>>
>> Thanks
>> Ram
>>
>> On Sun, Dec 24, 2023 at 12:49 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well not to put too finer point on it, in a public forum, one ought to
>>> respect the importance of open communication. Everyone has the right to ask
>>> questions, seek information, and engage in discussions without facing
>>> unnecessary patronization.
>>>
>>>
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 24 Dec 2023 at 18:27, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 This is a user-list question, not a dev-list question. Moving this
 conversation to the user list and BCC-ing the dev list.

 Also, this statement

 > We are not validating against table or column existence.

 is not correct. When you call spark.sql(…), Spark will lookup the table
 references and fail with TABLE_OR_VIEW_NOT_FOUND if it cannot find them.

 Also, when you run DDL via spark.sql(…), Spark will actually run it. So
 spark.sql(“drop table my_table”) will actually drop my_table. It’s not a
 validation-only operation.

 This question of validating SQL is already discussed on Stack Overflow
 . You may find some
 useful tips there.

 Nick


 On Dec 24, 2023, at 4:52 AM, Mich Talebzadeh 
 wrote:


 Yes, you can validate the syntax of your PySpark SQL queries without
 connecting to an actual dataset or running the queries on a cluster.
 PySpark provides a method for syntax validation without executing the
 query. Something like below
   __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 3.4.0
   /_/

 Using Python version 3.9.16 (main, Apr 24 2023 10:36:11)
 Spark context Web UI available at http://rhes75:4040
 Spark context available as 'sc' (master = local[*], app id =
 local-1703410019374).
 SparkSession available as 'spark'.
 >>> from pyspark.sql import SparkSession
 >>> spark = SparkSession.builder.appName("validate").getOrCreate()
 23/12/24 09:28:02 WARN SparkSession: Using an existing Spark session;
 only runtime SQL configurations will take effect.
 >>> sql = "SELECT * FROM  WHERE  = some value"
 >>> try:
 ...   spark.sql(sql)
 ...   print("is working")
 ... except Exception as e:
 ...   print(f"Syntax error: {e}")
 ...
 Syntax error:
 [PARSE_SYNTAX_ERROR] Syntax error at or near '<'.(line 1, pos 14)

 == SQL ==
 SELECT * FROM  WHERE  = some value
 --^^^

 Here we only check for syntax errors and not the actual existence of
 query semantics. We are not validating against table or column existence.

 This method is useful when you want to catch obvious syntax errors
 before submitting your PySpark job to a cluster, especially when you don't
 have access to the actual data.

 In summary

- Theis method validates syntax but will not catch semantic errors
- If you need more comprehensive validation, consider using a
testing framework and a small dataset.
- For complex queries, using a linter or code analysis tool can
help identify potential issues.

 HTH


 Mich Talebzadeh,
 Dad | Technologist | Solutions Architect | Engineer
 London
 United Kingdom

view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh


 *Disclaimer:* Use it at 

Re: Contributing to Spark MLLib

2023-07-17 Thread Gourav Sengupta
Hi,

Holden Karau has some fantastic videos in her channel which will be quite
helpful.

Thanks
Gourav

On Sun, 16 Jul 2023, 19:15 Brian Huynh,  wrote:

> Good morning Dipayan,
>
> Happy to see another contributor!
>
> Please go through this document for contributors. Please note the
> MLlib-specific contribution guidelines section in particular.
>
> https://spark.apache.org/contributing.html
>
> Since you are looking for something to start with, take a look at this
> Jira query for starter issues.
>
>
> https://issues.apache.org/jira/browse/SPARK-38719?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20%22starter%22%20AND%20status%20%3D%20Open
>
> Cheers,
> Brian
>
> On Sun, Jul 16, 2023 at 8:49 AM Dipayan Dev 
> wrote:
>
>> Hi Spark Community,
>>
>> A very good morning to you.
>>
>> I am using Spark from last few years now, and new to the community.
>>
>> I am very much interested to be a contributor.
>>
>> I am looking to contribute to Spark MLLib. Can anyone please suggest me
>> how to start with contributing to any new MLLib feature? Is there any new
>> features in line and the best way to explore this?
>> Looking forward to little guidance to start with.
>>
>>
>> Thanks
>> Dipayan
>> --
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>> Author of *Deep Learning with Hadoop
>> *
>> M.Tech (AI), IISc, Bangalore
>>
>
>
> --
> From Brian H.
>


Re: [Spark Core] [Advanced] [How-to] How to map any external field to job ids spawned by Spark.

2022-12-28 Thread Gourav Sengupta
Hi Khalid,

just out of curiosity, does the API help us in setting JOB ID's or just job
Descriptions?

Regards,
Gourav Sengupta

On Wed, Dec 28, 2022 at 10:58 AM Khalid Mammadov 
wrote:

> There is a feature in SparkContext to set localProperties
> (setLocalProperty) where you can set your Request ID and then using
> SparkListener instance read that ID with Job ID using onJobStart event.
>
> Hope this helps.
>
> On Tue, 27 Dec 2022, 13:04 Dhruv Toshniwal,
>  wrote:
>
>> TL;Dr -
>> how-to-map-external-request-ids-to-spark-job-ids-for-spark-instrumentation
>> <https://stackoverflow.com/questions/74794579/how-to-map-external-request-ids-to-spark-job-ids-for-spark-instrumentation>
>>
>> Hi team,
>>
>> We are the engineering team of Mindtickle Inc. and we have a use-case
>> where we want to store a map of request Ids (unique API call ID) to Spark
>> Job Ids. Architecturally, we have created a system where our users use
>> various Analytics tools on the frontend which in turn run Spark Jobs
>> internally and then serve computed data back to them. We receive various
>> API calls from upstream and serve it via Apache Spark computing on the
>> backend.
>> However, as our customer base has grown, we have come to receive lots of
>> parallel requests. We have observed that Spark Jobs take different time for
>> the same API requests from upstream. Therefore, for Spark instrumentation
>> purposes we wish to maintain a map of requestID generated at our end to the
>> job IDs that Spark internally generates in relation to these requesrIDs.
>> This will enable us to go back in time via the history server or custom
>> SparkListeners to debug and improve our system. Any leads in this direction
>> would be greatly appreciated. I would love to explain our use case in
>> greater detail if required.
>>
>> Thanks and Regards,
>> Dhruv Toshniwal
>> SDE-2
>> Mindtickle Inc.
>>
>


Re: Profiling data quality with Spark

2022-12-27 Thread Gourav Sengupta
Hi Sean,

the entire narrative of SPARK being a unified analytics tool falls flat as
what should have been an engine on SPARK is now deliberately floated off as
a separate company called as Ray, and all the unified narrative rings
hollow.

SPARK is nothing more than a SQL engine as per SPARKs own conference where
they said that around more than 95% (in case I am not wrong) users use the
SQL interface :)

I have seen engineers split their hair for simple operations which takes
minutes in Snowflake or Redshift just because of SPARK configurations,
shuffle, and other operations.

This matters in the industry today, sadly, because people are being laid
off from their jobs because a 1 dollar simple solution has to be run as a
rocket science.

So my suggestion is to decouple your data quality solution from SPARK,
sooner or later everyone is going to see that saving jobs and saving money
makes sense :)

Regards,
Gourav Sengupta

On Wed, Dec 28, 2022 at 4:13 AM Sean Owen  wrote:

> I think this is kind of mixed up. Data warehouses are simple SQL
> creatures; Spark is (also) a distributed compute framework. Kind of like
> comparing maybe a web server to Java.
> Are you thinking of Spark SQL? then I dunno sure you may well find it more
> complicated, but it's also just a data warehousey SQL surface.
>
> But none of that relates to the question of data quality tools. You could
> use GE with Redshift, or indeed with Spark - are you familiar with it? It's
> probably one of the most common tools people use with Spark for this in
> fact. It's just a Python lib at heart and you can apply it with Spark, but
> _not_ with a data warehouse, so I'm not sure what you're getting at.
>
> Deequ is also commonly seen. It's actually built on Spark, so again,
> confused about this "use Redshift or Snowflake not Spark".
>
> On Tue, Dec 27, 2022 at 9:55 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> SPARK is just another querying engine with a lot of hype.
>>
>> I would highly suggest using Redshift (storage and compute decoupled
>> mode) or Snowflake without all this super complicated understanding of
>> containers/ disk-space, mind numbing variables, rocket science tuning, hair
>> splitting failure scenarios, etc. After that try to choose solutions like
>> Athena, or Trino/ Presto, and then come to SPARK.
>>
>> Try out solutions like  "great expectations" if you are looking for data
>> quality and not entirely sucked into the world of SPARK and want to keep
>> your options open.
>>
>> Dont get me wrong, SPARK used to be great in 2016-2017, but there are
>> superb alternatives now and the industry, in this recession, should focus
>> on getting more value for every single dollar they spend.
>>
>> Best of luck.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Dec 27, 2022 at 7:30 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well, you need to qualify your statement on data quality. Are you
>>> talking about data lineage here?
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 27 Dec 2022 at 19:25, rajat kumar 
>>> wrote:
>>>
>>>> Hi Folks
>>>> Hoping you are doing well, I want to implement data quality to detect
>>>> issues in data in advance. I have heard about few frameworks like GE/Deequ.
>>>> Can anyone pls suggest which one is good and how do I get started on it?
>>>>
>>>> Regards
>>>> Rajat
>>>>
>>>


Re: Profiling data quality with Spark

2022-12-27 Thread Gourav Sengupta
Hi,

SPARK is just another querying engine with a lot of hype.

I would highly suggest using Redshift (storage and compute decoupled mode)
or Snowflake without all this super complicated understanding of
containers/ disk-space, mind numbing variables, rocket science tuning, hair
splitting failure scenarios, etc. After that try to choose solutions like
Athena, or Trino/ Presto, and then come to SPARK.

Try out solutions like  "great expectations" if you are looking for data
quality and not entirely sucked into the world of SPARK and want to keep
your options open.

Dont get me wrong, SPARK used to be great in 2016-2017, but there are
superb alternatives now and the industry, in this recession, should focus
on getting more value for every single dollar they spend.

Best of luck.

Regards,
Gourav Sengupta

On Tue, Dec 27, 2022 at 7:30 PM Mich Talebzadeh 
wrote:

> Well, you need to qualify your statement on data quality. Are you talking
> about data lineage here?
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 27 Dec 2022 at 19:25, rajat kumar 
> wrote:
>
>> Hi Folks
>> Hoping you are doing well, I want to implement data quality to detect
>> issues in data in advance. I have heard about few frameworks like GE/Deequ.
>> Can anyone pls suggest which one is good and how do I get started on it?
>>
>> Regards
>> Rajat
>>
>


Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi Leszek,

spot on,  therefore EMR being created and dynamically scaled up and down
and being ephemeral proves that there is actually no advantage of using
containers for large jobs.

It is utterly pointless and I have attended interviews and workshops where
no one has ever been able to prove its value inplace of EMR's.

That said, if you have a large system, and want to use two different
containers in it for two different jobs with different binaries in them,
then container is the way to go. Also it may be useful jobs which requires
 low memory and disk space and requires low latency response time like real
time processing - but in that case why use SPARK and not something like
Kafka SQL, etc.

So this entire containers nonsense is utterly pointless, and useless from
everyway other than devops engineers making pointless arguments to create
and spend huge budget and money as very few organisations need low latency
variable throughput multiple binary hosting systems


Regards,
Gourav

On Fri, Sep 30, 2022 at 5:32 AM Leszek Reimus 
wrote:

> Hi Everyone,
>
> To add my 2 cents here:
>
> Advantage of containers, to me, is that it leaves the host system pristine
> and clean, allowing standardized devops deployment of hardware for any
> purpose. Way back before - when using bare metal / ansible, reusing hw
> always involved full reformat of base system. This alone is worth the ~1-2%
> performance tax cgroup containers have.
>
> Advantage of kubernetes is more on the deployment side of things. Unified
> deployment scripts that can be written by devs. Same deployment yaml (or
> helm chart) can be used on local Dev Env / QA / Integration Env and finally
> Prod (with some tweaks).
>
> Depending on the networking CNI, and storage backend - Kubernetes can have
> a very close to bare metal performance. In the end it is always a
> trade-off. You gain some, you pay with extra overhead.
>
> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
> legacy MapReduce jobs too though) . Finding it much more manageable to
> allocate larger memory/cpu chunks to yarn pods and then have run
> auto-scaler to scale out YARN if needed; than to manage individual
> memory/cpu requirements on Spark on Kubernetes deployment.
>
> As far as I tested, Spark on Kubernetes is immature when reliability is
> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
> yet). When a node dies / goes down, I find executors not getting
> rescheduled to other nodes - the driver just gets stuck for the executors
> to come back. This does not happen on YARN / Standalone deployment (even
> when ran on same k8s cluster)
>
> Sincerely,
>
> Leszek Reimus
>
>
>
>
> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> dont containers finally run on systems, and the only advantage of
>> containers is that you can do better utilisation of system resources by
>> micro management of jobs running in it? Some say that containers have their
>> own binaries which isolates environment, but that is a lie, because in a
>> kubernetes environments that is running your SPARK jobs you will have the
>> same environment for all your kubes.
>>
>> And as you can see there are several other configurations, disk mounting,
>> security, etc issues to handle as an overhead as well.
>>
>> And the entire goal of all those added configurations is that someone in
>> your devops team feels using containers makes things more interesting
>> without any real added advantage to large volume jobs.
>>
>> But I may be wrong, and perhaps we need data, and not personal attacks
>> like the other person in the thread did.
>>
>> In case anyone does not know EMR does run on containers as well, and in
>> EMR running on EC2 nodes you can put all your binaries in containers and
>> use those for running your jobs.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
>> wrote:
>>
>>> Igor,
>>>
>>> what exact instance types do you use? Unless you use local instance
>>> storage and have actually configured your Kubernetes and Spark to use
>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>> investigate that by going to an instance, then to volume, and see
>>> monitoring charts.
>>>
>>> Another thought is that you're essentially giving 4GB per core. That
>>> sounds pretty low, in my experience.
>>>
>>>
>>>
>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I'm running spark 3

Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi,

dont containers finally run on systems, and the only advantage of
containers is that you can do better utilisation of system resources by
micro management of jobs running in it? Some say that containers have their
own binaries which isolates environment, but that is a lie, because in a
kubernetes environments that is running your SPARK jobs you will have the
same environment for all your kubes.

And as you can see there are several other configurations, disk mounting,
security, etc issues to handle as an overhead as well.

And the entire goal of all those added configurations is that someone in
your devops team feels using containers makes things more interesting
without any real added advantage to large volume jobs.

But I may be wrong, and perhaps we need data, and not personal attacks like
the other person in the thread did.

In case anyone does not know EMR does run on containers as well, and in EMR
running on EC2 nodes you can put all your binaries in containers and use
those for running your jobs.

Regards,
Gourav Sengupta

On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
wrote:

> Igor,
>
> what exact instance types do you use? Unless you use local instance
> storage and have actually configured your Kubernetes and Spark to use
> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
> investigate that by going to an instance, then to volume, and see
> monitoring charts.
>
> Another thought is that you're essentially giving 4GB per core. That
> sounds pretty low, in my experience.
>
>
>
> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>
>
> --
> Vladimir Prus
> http://vladimirprus.com
>


Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi,

why not use EMR or data proc, kubernetes does not provide any benefit at
all for such scale of work. It is a classical case of over engineering and
over complication just for the heck of it.

Also I think that in case you are in AWS, Redshift Spectrum or Athena for
90% of use cases are way optimal.

Regards,
Gourav

On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria 
wrote:

> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>


Re: Spark SQL

2022-09-15 Thread Gourav Sengupta
Okay, so for the problem to the solution  that is powerful

On Thu, 15 Sept 2022, 14:48 Mayur Benodekar,  wrote:

> Hi Gourav,
>
> It’s the way the framework is
>
>
> Sent from my iPhone
>
> On Sep 15, 2022, at 02:02, Gourav Sengupta 
> wrote:
>
> 
> Hi,
>
> Why spark and why scala?
>
> Regards,
> Gourav
>
> On Wed, 7 Sept 2022, 21:42 Mayur Benodekar,  wrote:
>
>>  am new to scala and spark both .
>>
>> I have a code in scala which executes quieres in while loop one after the
>> other.
>>
>> What we need to do is if a particular query takes more than a certain
>> time , for example # 10 mins we should be able to stop the query execution
>> for that particular query and move on to the next one
>>
>> for example
>>
>> do {
>> var f = Future(
>>spark.sql("some query"))
>>)
>>
>> f onSucess {
>>   case suc - > println("Query ran in 10mins")
>> }
>>
>> f failure {
>>  case fail -> println("query took more than 10mins")
>> }
>>
>> }while(some condition)
>>
>> var result = Await.ready(f,Duration(10,TimeUnit.MINUTES))
>>
>> I understand that when we call spark.sql the control is sent to spark
>> which i need to kill/stop when the duration is over so that i can get back
>> the resources
>>
>> I have tried multiple things but I am not sure how to solve this. Any
>> help would be welcomed as i am stuck with this.
>>
>> --
>> Cheers,
>> Mayur
>>
>


Re: Spark SQL

2022-09-15 Thread Gourav Sengupta
Hi,

Why spark and why scala?

Regards,
Gourav

On Wed, 7 Sept 2022, 21:42 Mayur Benodekar,  wrote:

>  am new to scala and spark both .
>
> I have a code in scala which executes quieres in while loop one after the
> other.
>
> What we need to do is if a particular query takes more than a certain time
> , for example # 10 mins we should be able to stop the query execution for
> that particular query and move on to the next one
>
> for example
>
> do {
> var f = Future(
>spark.sql("some query"))
>)
>
> f onSucess {
>   case suc - > println("Query ran in 10mins")
> }
>
> f failure {
>  case fail -> println("query took more than 10mins")
> }
>
> }while(some condition)
>
> var result = Await.ready(f,Duration(10,TimeUnit.MINUTES))
>
> I understand that when we call spark.sql the control is sent to spark
> which i need to kill/stop when the duration is over so that i can get back
> the resources
>
> I have tried multiple things but I am not sure how to solve this. Any help
> would be welcomed as i am stuck with this.
>
> --
> Cheers,
> Mayur
>


Re: Pipelined execution in Spark (???)

2022-09-11 Thread Gourav Sengupta
Hi,

for some tasks as repartitionbyrange, it is indeed quite annoying sometimes
to wait for the maps to complete before reduce starts.

@Sean Owen   do you have any comments?

Regards,
Gourav Sengupta

On Thu, Sep 8, 2022 at 12:10 AM Russell Jurney 
wrote:

> I could be wrong , but… just start it. If you have the capacity, it takes
> a lot of time on large datasets to reduce the entire dataset. If you have
> the resources, start combining and reducing on partial map results. As soon
> as you’ve got one record out of the map, it has a reduce key in the plan,
> so send it to that reducer. You can’t finish the reduce until you’re done
> with the map, but you can start it immediately. This depends on reducers
> being algebraic, of course, and learning to think in MapReduce isn’t even
> possible for a lot of people. Some people say it is impossible to do it
> well but I disagree :)
>
> On Wed, Sep 7, 2022 at 3:51 PM Sean Owen  wrote:
>
>> Wait, how do you start reduce tasks before maps are finished? is the idea
>> that some reduce tasks don't depend on all the maps, or at least you can
>> get started?
>> You can already execute unrelated DAGs in parallel of course.
>>
>> On Wed, Sep 7, 2022 at 5:49 PM Sungwoo Park  wrote:
>>
>>> You are right -- Spark can't do this with its current architecture. My
>>> question was: if there was a new implementation supporting pipelined
>>> execution, what kind of Spark jobs would benefit (a lot) from it?
>>>
>>> Thanks,
>>>
>>> --- Sungwoo
>>>
>>> On Thu, Sep 8, 2022 at 1:47 AM Russell Jurney 
>>> wrote:
>>>
>>>> I don't think Spark can do this with its current architecture. It has
>>>> to wait for the step to be done, speculative execution isn't possible.
>>>> Others probably know more about why that is.
>>>>
>>>> Thanks,
>>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>>>> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>>>> <http://facebook.com/jurney> datasyndrome.com
>>>>
>>>>
>>>> On Wed, Sep 7, 2022 at 7:42 AM Sungwoo Park  wrote:
>>>>
>>>>> Hello Spark users,
>>>>>
>>>>> I have a question on the architecture of Spark (which could lead to a
>>>>> research problem). In its current implementation, Spark finishes executing
>>>>> all the tasks in a stage before proceeding to child stages. For example,
>>>>> given a two-stage map-reduce DAG, Spark finishes executing all the map
>>>>> tasks before scheduling reduce tasks.
>>>>>
>>>>> We can think of another 'pipelined execution' strategy in which tasks
>>>>> in child stages can be scheduled and executed concurrently with tasks in
>>>>> parent stages. For example, for the two-stage map-reduce DAG, while map
>>>>> tasks are being executed, we could schedule and execute reduce tasks in
>>>>> advance if the cluster has enough resources. These reduce tasks can also
>>>>> pre-fetch the output of map tasks.
>>>>>
>>>>> Has anyone seen Spark jobs for which this 'pipelined execution'
>>>>> strategy would be desirable while the current implementation is not quite
>>>>> adequate? Since Spark tasks usually run for a short period of time, I 
>>>>> guess
>>>>> the new strategy would not have a major performance improvement. However,
>>>>> there might be some category of Spark jobs for which this new strategy
>>>>> would be clearly a better choice.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> --- Sungwoo
>>>>>
>>>>> --
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>


Re: Profiling PySpark Pandas UDF

2022-08-29 Thread Gourav Sengupta
Hi,

I do send back those metrics in as columns in the pandas datagrams in case
required, but the true thing is that we need to finally be able to find out
the time for java object conversion along with the udf calls and actual
python memory and other details which we can all do by tweaking udf.

But I am 100 percent sure that your work will be useful and required.

Regards,
Gourav

On Mon, 29 Aug 2022, 10:36 Luca Canali,  wrote:

> Hi Abdeali,
>
>
>
> Thanks for the support. Indeed you can go ahead and test and review  my
> latest PR for SPARK-34265
>
> (Instrument Python UDF execution using SQL Metrics) if you want to:
> https://github.com/apache/spark/pull/33559
>
> Currently I reduced the scope of the instrumentation to just 3 simple
> metrics to implement: "data sent to Python workers",
>
> "data returned from Python workers", "number of output rows".
> In a previous attempt I had also instrumented the time for UDF execution,
> although there are some subtle points there,
>
> and I may need to go back to testing that at a later stage.
>
> It definitely would be good to know if people using PySpark and Python
> UDFs find this proposed improvement useful.
>
> I see the proposed additional instrumentation as complementary to the
> Python/Pandas UDF Profiler introduced in Spark 3.3.
>
>
>
> Best,
>
> Luca
>
>
>
> *From:* Abdeali Kothari 
> *Sent:* Friday, August 26, 2022 15:59
> *To:* Luca Canali 
> *Cc:* Russell Jurney ; Gourav Sengupta <
> gourav.sengu...@gmail.com>; Sean Owen ; Takuya UESHIN <
> ues...@happy-camper.st>; user ; Subash
> Prabanantham 
> *Subject:* Re: Profiling PySpark Pandas UDF
>
>
>
> Hi Luca, I see you pushed some code to the PR 3 hrs ago.
>
> That's awesome. If I can help out in any way - do let me know
>
> I think that's an amazing feature and would be great if it can get into
> spark
>
>
>
> On Fri, 26 Aug 2022, 12:41 Luca Canali,  wrote:
>
> @Abdeali as for “lightweight profiling”, there is some work in progress on
> instrumenting Python UDFs with Spark metrics, see
> https://issues.apache.org/jira/browse/SPARK-34265
>
> However it is a bit stuck at the moment, and needs to be revived I
> believe.
>
>
>
> Best,
>
> Luca
>
>
>
> *From:* Abdeali Kothari 
> *Sent:* Friday, August 26, 2022 06:36
> *To:* Subash Prabanantham 
> *Cc:* Russell Jurney ; Gourav Sengupta <
> gourav.sengu...@gmail.com>; Sean Owen ; Takuya UESHIN <
> ues...@happy-camper.st>; user 
> *Subject:* Re: Profiling PySpark Pandas UDF
>
>
>
> The python profiler is pretty cool !
>
> Ill try it out to see what could be taking time within the UDF with it.
>
>
>
> I'm wondering if there is also some lightweight profiling (which does not
> slow down my processing) for me to get:
>
>
>
>  - how much time the UDF took (like how much time was spent inside the UDF)
>
>  - how many times the UDF was called
>
>
>
> I can see the overall time a stage took in the Spark UI - would be cool if
> I could find the time a UDF takes too
>
>
>
> On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, 
> wrote:
>
> Wow, lots of good suggestions. I didn’t know about the profiler either.
> Great suggestion @Takuya.
>
>
>
>
>
> Thanks,
>
> Subash
>
>
>
> On Thu, 25 Aug 2022 at 19:30, Russell Jurney 
> wrote:
>
> YOU know what you're talking about and aren't hacking a solution. You are
> my new friend :) Thank you, this is incredibly helpful!
>
>
>
>
> Thanks,
>
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>
>
>
>
>
> On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN 
> wrote:
>
> Hi Subash,
>
> Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
> -
> https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf
>
> Hope it can help you.
>
> Thanks.
>
>
>
> On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney 
> wrote:
>
> Subash, I’m here to help :)
>
>
>
> I started a test script to demonstrate a solution last night but got a
> cold and haven’t finished it. Give me another day and I’ll get it to you.
> My suggestion is that you run PySpark locally in pytest with a fixture to
> generate and yield your SparckContext and SparkSession and the. Write tests
> that load some test data, perform some count operation and checkpoint to
> ensure that data is loaded, start a timer, run your UDF on the DataFrame,
> checkpoint ag

Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Gourav Sengupta
Hi,

May be I am jumping to conclusions and making stupid guesses, but have you
tried koalas now that it is natively integrated with pyspark??

Regards
Gourav

On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, 
wrote:

> Hi All,
>
> I was wondering if we have any best practices on using pandas UDF ?
> Profiling UDF is not an easy task and our case requires some drilling down
> on the logic of the function.
>
>
> Our use case:
> We are using func(Dataframe) => Dataframe as interface to use Pandas UDF,
> while running locally only the function, it runs faster but when executed
> in Spark environment - the processing time is more than expected. We have
> one column where the value is large (BinaryType -> 600KB), wondering
> whether this could make the Arrow computation slower ?
>
> Is there any profiling or best way to debug the cost incurred using pandas
> UDF ?
>
>
> Thanks,
> Subash
>
>


Re: Spark streaming

2022-08-20 Thread Gourav Sengupta
Hi,
spark is just an unnecessary overengineered overkill for that kind of a
job. I know they are trying to make SPARK a one stop solution for
everything but that is a marketing attempt to capture market share, rather
than the true blue engineering creativity that led to the creation of SPARK
- so please be aware.

Are you in AWS? Please try DMS. If you are then that might be the best
solution depending on what you are looking for ofcourse.

If you are not in AWS, please let me know your environment, and I can help
you out.



Regards,
Gourav Sengupta

On Fri, Aug 19, 2022 at 1:13 PM sandra sukumaran <
sandrasukumara...@gmail.com> wrote:

> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>


Re: Spark with GPU

2022-08-13 Thread Gourav Sengupta
One of the best things that could have happened to SPARK (now mostly an
overhyped ETL tool with small incremental optimisation changes and no
large scale innovation) is the release by NVIDIA for GPU processing. You
need some time to get your head around it, but it is supported quite easily
in AWS EMR with a few configuration changes.  You can see massive gains,
given AWS has different varieties of GPU's,

We can end up saving a lot of time and money running a few selected
processes on the GPU. There is an easier fall back options on CPU
obviously.

If you are in AWS, try to use athena, or redshift, or snowflake, they get a
lot more done with less overheads and heart aches. I particularly like how
native integration between ML systems like sagemaker works via redshift
queries, and aurora postgres - that is true unified data analytics at work.


Regards,
Gourav Sengupta


Regards,
Gourav Sengupta


On Sat, Aug 13, 2022 at 6:16 PM Alessandro Bellina 
wrote:

> This thread may be better suited as a discussion in our Spark plug-in’s
> repo:
> https://github.com/NVIDIA/spark-rapids/discussions.
>
> Just to answer the questions that were asked so far:
>
> I would recommend checking our documentation for what is supported as of
> our latest release (22.06):
> https://nvidia.github.io/spark-rapids/docs/supported_ops.html, as we have
> quite a bit of support for decimal and also nested types and keep adding
> coverage.
>
> For UDFs, if you are willing to rewrite it to use the RAPIDS cuDF API, we
> do have support and examples on how to do this, please check out this:
>
> https://nvidia.github.io/spark-rapids/docs/additional-functionality/rapids-udfs.html.
> Automatically translating UDFs to GPUs is not easy. We have a Scala UDF to
> catalyst transpiler that will be able to handle very simple UDFs where
> every operation has a corresponding catalyst expression, that may be worth
> checking out:
>
> https://nvidia.github.io/spark-rapids/docs/additional-functionality/udf-to-catalyst-expressions.html.
> This transpiler falls back if it can’t translate any part of the UDF.
>
> The plug-in will not fail in case where it can’t run part of a query on
> the GPU, it will fall back and run on the CPU for the parts of the query
> that are not supported. It will also output what it can’t optimize on the
> driver (on .explain), which should help narrow down an expression or exec
> that should be looked at further.
>
> There are other resources all linked from here:
> https://nvidia.github.io/spark-rapids/ (of interest may be the
> Qualification Tool, and our Getting Started guide for different cloud
> providers and distros).
>
> I’d say let’s continue this in the discussions or as issues in the
> spark-rapids repo if you have further questions or run into issues, as it’s
> not specific to Apache Spark.
>
> Thanks!
>
> Alessandro
>
> On Sat, Aug 13, 2022 at 10:53 AM Sean Owen  wrote:
>
>> This isn't a Spark question, but rather a question about whatever Spark
>> application you are talking about. RAPIDS?
>>
>> On Sat, Aug 13, 2022 at 10:35 AM rajat kumar 
>> wrote:
>>
>>> Thanks Sean.
>>>
>>> Also, I observed that lots of things are not supported in GPU by NVIDIA.
>>> E.g. nested types/decimal type/Udfs etc.
>>> So, will it use CPU automatically for running those tasks which require
>>> nested types or will it run on GPU and fail.
>>>
>>> Thanks
>>> Rajat
>>>
>>> On Sat, Aug 13, 2022, 18:54 Sean Owen  wrote:
>>>
>>>> Spark does not use GPUs itself, but tasks you run on Spark can.
>>>> The only 'support' there is is for requesting GPUs as resources for
>>>> tasks, so it's just a question of resource management. That's in OSS.
>>>>
>>>> On Sat, Aug 13, 2022 at 8:16 AM rajat kumar 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have been hearing about GPU in spark3.
>>>>>
>>>>> For batch jobs , will it help to improve GPU performance. Also is GPU
>>>>> support available only on Databricks or on cloud based Spark clusters ?
>>>>>
>>>>> I am new , if anyone can share insight , it will help
>>>>>
>>>>> Thanks
>>>>> Rajat
>>>>>
>>>>


Re: log transfering into hadoop/spark

2022-08-02 Thread Gourav Sengupta
hi,
I do it with simple bash scripts to transfer to s3. Takes less than 1
minute to write it, and another 1 min to include it bootstrap scripts.

Never saw the need for so much hype for such simple tasks.

Regards,
Gourav Sengupta

On Tue, Aug 2, 2022 at 2:16 PM ayan guha  wrote:

> ELK or Splunk agents typically.
>
> Or if you are in cloud then there are cloud native solutions which can
> forward logs to object store, which can then be read like hdfs.
>
> On Tue, 2 Aug 2022 at 4:43 pm, pengyh  wrote:
>
>> since flume is not continued to develop.
>> what's the current opensource tool to transfer webserver logs into
>> hdfs/spark?
>>
>> thank you.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Use case idea

2022-08-01 Thread Gourav Sengupta
Hi,

my comments were for purposes of SQL, also most of other technologies like
snowflake, and Redshift, and using KSQL directly to other sinks quite
easily, without massive engineering, infact databricks is trying to play a
catchup game in this market by coming out with GIU based ETL tools :)

Perhaps you should try other systems in the market first, that will give an
unbiased view of databricks and SPARK being just over glamourised tool. The
hope of extending SPARK with a separate easy to use query engine for deep
learning and other AI systems is gone now with Ray, SPARK community now
just defends the lack of support, and direction in this matter largely,
which is a joke.

Thanks and Regards,
Gourav Sengupta

On Mon, Aug 1, 2022 at 4:54 AM pengyh  wrote:

>
> I don't think so. we were using spark integarted with Kafka for
> streaming computing and realtime reports. that just works.
>
>
> > SPARK is now just an overhyped and overcomplicated ETL tool, nothing
> > more, there is another distributed AI called as Ray, which should be the
> > next billion dollar company instead of just building those features in
> > SPARK natively using a different computation engine :)
>


Re: Use case idea

2022-07-31 Thread Gourav Sengupta
Hi,
SPARK is now just an overhyped and overcomplicated ETL tool, nothing more,
there is another distributed AI called as Ray, which should be the next
billion dollar company instead of just building those features in SPARK
natively using a different computation engine :)

So the only promise of SPARK growing into a truly unified platform is gone
now, who wants to build features into SPARK anymore when you can build
another billion dollar hype :)

Use solutions like snowflake, or BigQuery (may turn out to be expensive),
or EMR based hive/ trino, or Redshift, Redshift Spectrum, Athena, and so on.


Thanks and Regards,
Gourav Sengupta

On Mon, Aug 1, 2022 at 1:58 AM pengyh  wrote:

>
> I am afraid the most sql functions spark has the other BI tools also have.
>
> spark is used for high performance computing, not for SQL function
> comparisoin.
>
> Thanks.
>
> > In other terms: what analytics funcionality, that no One erp has, Spark
> offers ?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: PySpark cores

2022-07-29 Thread Gourav Sengupta
Hi,

Agree with above response, but in case you are using arrow and transferring
data from JVM to python and back, then please try to check how are
things getting executed in python.

Please let me know what is the processing you are trying to do while using
arrow.

Regards,
Gourav Sengupta

On Fri, Jul 29, 2022 at 9:48 AM Jacob Lynn  wrote:

> I think you are looking for the spark.task.cpus configuration parameter.
>
> Op vr 29 jul. 2022 om 07:41 schreef Andrew Melo :
>
>> Hello,
>>
>> Is there a way to tell Spark that PySpark (arrow) functions use
>> multiple cores? If we have an executor with 8 cores, we would like to
>> have a single PySpark function use all 8 cores instead of having 8
>> single core python functions run.
>>
>> Thanks!
>> Andrew
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: external table with parquet files: problem querying in sparksql since data is stored as integer while hive schema expects a timestamp

2022-07-24 Thread Gourav Sengupta
Hi,

please try to query the table directly by loading the hive metastore (we
can do that quite easily in AWS EMR, but we can do things quite easily with
everything in AWS), rather than querying the s3 location directly.


Regards,
Gourav

On Wed, Jul 20, 2022 at 9:51 PM Joris Billen 
wrote:

> Hi,
> below sounds like something that someone will have experienced...
> I have external tables of parquet files with a hive table defined on top
> of the data. I dont manage/know the details of how the data lands.
> For some tables no issues when querying through spark.
> But for others there is an issue: looks like the datatype for hive is
> timestamp, but the parquet file contains an integer number =microseconds:
> if I access the table in spark sql I get:
>
> *Unable to create Parquet converter for data type “timestamp” whose
> Parquet type is optional int64 airac_wef (TIMESTAMP(MICROS,false))*
>
> OR
>
> *Parquet column cannot be converted in file
> abfs://somelocation/table/partition=484/00_0. Column: [atimefield],
> Expected: timestamp, Found: INT64*
>
>
>
> Anyone encountered this? I tried several sorts of CAST but no success yet.
> I see similar problems on forums (like this:
> https://stackoverflow.com/questions/59096125/spark-2-4-parquet-column-cannot-be-converted-in-file-column-impressions-exp
> ) but no solution.
>
>
> Thanks for input!
>


Re: reading each JSON file from dataframe...

2022-07-13 Thread Gourav Sengupta
Hi,

I think that this is a pure example of over engineering.

Ayan's advice is the best. Please use SPARK SQL function called as
input_file_name() to join the tables. People do not think in terms of RDD
anymore unless absolutely required.

Also if you have different JSON schemas, just use the SPARK system to read
it as a string first or use 100% scanning of the files to have a full
schema.

Regards,
Gourav Sengupta

On Wed, Jul 13, 2022 at 12:41 AM Muthu Jayakumar  wrote:

> Hello Ayan,
>
> Thank you for the suggestion. But, I would lose correlation of the JSON
> file with the other identifier fields. Also, if there are too many files,
> will it be an issue? Plus, I may not have the same schema across all the
> files.
>
> Hello Enrico,
>
> >how does RDD's mapPartitions make a difference regarding
> I guess, in the question above I do have to process row-wise and RDD may
> be more efficient?
>
> Thanks,
> Muthu
>
> On Tue, 12 Jul 2022 at 14:55, ayan guha  wrote:
>
>> Another option is:
>>
>> 1. collect the dataframe with file path
>> 2. create a list of paths
>> 3. create a new dataframe with spark.read.json and pass the list of path
>>
>> This will save you lots of headache
>>
>> Ayan
>>
>>
>> On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack 
>> wrote:
>>
>>> Hi,
>>>
>>> how does RDD's mapPartitions make a difference regarding 1. and 2.
>>> compared to Dataset's mapPartitions / map function?
>>>
>>> Enrico
>>>
>>>
>>> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>>>
>>> Hello Enrico,
>>>
>>> Thanks for the reply. I found that I would have to use `mapPartitions`
>>> API of RDD to perform this safely as I have to
>>> 1. Read each file from GCS using HDFS FileSystem API.
>>> 2. Parse each JSON record in a safe manner.
>>>
>>> For (1) to work, I do have to broadcast HadoopConfiguration from
>>> sparkContext. I did try to use GCS Java API to read content, but ran into
>>> many JAR conflicts as the HDFS wrapper and the JAR library uses different
>>> dependencies.
>>> Hope this findings helps others as well.
>>>
>>> Thanks,
>>> Muthu
>>>
>>>
>>> On Mon, 11 Jul 2022 at 14:11, Enrico Minack 
>>> wrote:
>>>
>>>> All you need to do is implement a method readJson that reads a single
>>>> file given its path. Than, you map the values of column file_path to
>>>> the respective JSON content as a string. This can be done via an UDF or
>>>> simply Dataset.map:
>>>>
>>>> case class RowWithJsonUri(entity_id: String, file_path: String,
>>>> other_useful_id: String)
>>>> case class RowWithJsonContent(entity_id: String, json_content: String,
>>>> other_useful_id: String)
>>>>
>>>> val ds = Seq(
>>>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>>>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>>>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>>>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>>>> "id-2-01g4he5cbh52che104rwy603sr"),
>>>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>>>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>>>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>>>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>>>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>>>> ).toDS()
>>>>
>>>> ds.show(false)
>>>>
>>>> +-+---+---+
>>>> |entity_id
>>>> |file_path
>>>> |other_useful_id|
>>>>
>>>> +-+---+---+
>>>>
>>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>>
>>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>>
>>>> |id-

Re: about cpu cores

2022-07-11 Thread Gourav Sengupta
Hi,
please see Sean's answer and please read about parallelism in spark.

Regards,
Gourav Sengupta

On Mon, Jul 11, 2022 at 10:12 AM Tufan Rakshit  wrote:

> so as an average every 4 core , you get back 3.6 core in Yarn , but you
> can use only 3 .
> in Kubernetes you get back 3.6 and also can use 3.6
>
> Best
> Tufan
>
> On Mon, 11 Jul 2022 at 11:02, Yong Walt  wrote:
>
>> We were using Yarn. thanks.
>>
>> On Sun, Jul 10, 2022 at 9:02 PM Tufan Rakshit  wrote:
>>
>>> Mainly depends what your cluster manager Yarn or kubernates ?
>>> Best
>>> Tufan
>>>
>>> On Sun, 10 Jul 2022 at 14:38, Sean Owen  wrote:
>>>
>>>> Jobs consist of tasks, each of which consumes a core (can be set to >1
>>>> too, but that's a different story). If there are more tasks ready to
>>>> execute than available cores, some tasks simply wait.
>>>>
>>>> On Sun, Jul 10, 2022 at 3:31 AM Yong Walt  wrote:
>>>>
>>>>> given my spark cluster has 128 cores totally.
>>>>> If the jobs (each job was assigned only one core) I submitted to the
>>>>> cluster are over 128, what will happen?
>>>>>
>>>>> Thank you.
>>>>>
>>>>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-05 Thread Gourav Sengupta
Hi,

SPARK is just one of the technologies out there now, there are several
other technologies far outperforming SPARK or at least as good as SPARK.



Regards,
Gourav

On Sat, Jul 2, 2022 at 7:42 PM Sid  wrote:

> So as per the discussion, shuffle stages output is also stored on disk and
> not in memory?
>
> On Sat, Jul 2, 2022 at 8:44 PM krexos  wrote:
>
>>
>> thanks a lot!
>>
>> --- Original Message ---
>> On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen 
>> wrote:
>>
>> I think that is more accurate yes. Though, shuffle files are local, not
>> on distributed storage too, which is an advantage. MR also had map only
>> transforms and chained mappers, but harder to use. Not impossible but you
>> could also say Spark just made it easier to do the more efficient thing.
>>
>> On Sat, Jul 2, 2022, 9:34 AM krexos 
>> wrote:
>>
>>>
>>> You said Spark performs IO only when reading data and writing final data
>>> to the disk. I though by that you meant that it only reads the input files
>>> of the job and writes the output of the whole job to the disk, but in
>>> reality spark does store intermediate results on disk, just in less places
>>> than MR
>>>
>>> --- Original Message ---
>>> On Saturday, July 2nd, 2022 at 5:27 PM, Sid 
>>> wrote:
>>>
>>> I have explained the same thing in a very layman's terms. Go through it
>>> once.
>>>
>>> On Sat, 2 Jul 2022, 19:45 krexos,  wrote:
>>>

 I think I understand where Spark saves IO.

 in MR we have map -> reduce -> map -> reduce -> map -> reduce ...

 which writes results do disk at the end of each such "arrow",

 on the other hand in spark we have

 map -> reduce + map -> reduce + map -> reduce ...

 which saves about 2 times the IO

 thanks everyone,
 krexos

 --- Original Message ---
 On Saturday, July 2nd, 2022 at 1:35 PM, krexos 
 wrote:

 Hello,

 One of the main "selling points" of Spark is that unlike Hadoop
 map-reduce that persists intermediate results of its computation to HDFS
 (disk), Spark keeps all its results in memory. I don't understand this as
 in reality when a Spark stage finishes it writes all of the data into
 shuffle files stored on the disk
 .
 How then is this an improvement on map-reduce?

 Image from https://youtu.be/7ooZ4S7Ay6Y


 thanks!



>>>
>>


Re: Need help with the configuration for AWS glue jobs

2022-06-23 Thread Gourav Sengupta
Please use EMR, Glue is not made for heavy processing jobs.

On Thu, Jun 23, 2022 at 6:36 AM Sid  wrote:

> Hi Team,
>
> Could anyone help me in the below problem:
>
>
> https://stackoverflow.com/questions/72724999/how-to-calculate-number-of-g-1-workers-in-aws-glue-for-processing-1tb-data
>
> Thanks,
> Sid
>


Re: input file size

2022-06-19 Thread Gourav Sengupta
Hi,

Just so that we understand the intention why do you need to know the
file size? Are you not using splittable file format?

If you use spark streaming to read the files, using just once, then you
will be able to get the metadata of the files I believe.



Regards,
Gourav Sengupta

On Sun, Jun 19, 2022 at 8:00 AM Enrico Minack 
wrote:

> Given you already know your input files (input_file_name), why not getting
> their size and summing this up?
>
> import java.io.Fileimport java.net.URIimport 
> org.apache.spark.sql.functions.input_file_name
> ds.select(input_file_name.as("filename"))
>   .distinct.as[String]
>   .map(filename => new File(new URI(filename).getPath).length)
>   .select(sum($"value"))
>   .show()
>
>
> Enrico
>
>
> Am 19.06.22 um 03:16 schrieb Yong Walt:
>
> import java.io.Fileval someFile = new File("somefile.txt")val fileSize = 
> someFile.length
>
> This one?
>
>
> On Sun, Jun 19, 2022 at 4:33 AM mbreuer  wrote:
>
>> Hello Community,
>>
>> I am working on optimizations for file sizes and number of files. In the
>> data frame there is a function input_file_name which returns the file
>> name. I miss a counterpart to get the size of the file. Just the size,
>> like "ls -l" returns. Is there something like that?
>>
>> Kind regards,
>> Markus
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,

>> spark.range(1).createOrReplaceTempView("test")
>> maximum_records_per_api_call = 40
>> batch_count = spark.sql("SELECT * FROM test").count() /
maximum_records_per_api_call
>> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
batch_id FROM
test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")


the above code should be able to then be run with a udf as long as we are
able to control the parallelism with the help of executor count and task
cpi configuration.

But once again, this is just an unnecessary overkill.


Regards,
Gourav Sengupta

On Mon, Jun 13, 2022 at 10:41 AM Sid  wrote:

> Hi Gourav,
>
> Could you please provide me with some examples?
>
> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> try to use mod of a monotonically increasing field and then use
>> repartitionbyrange function, and see whether SPARK automatically serialises
>> it based on the number of executors that you put in the job.
>>
>> But once again, this is kind of an overkill, for fetching data from a
>> API, creating a simple python program works quite well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>>
>>> Hi Gourav,
>>>
>>> Do you have any examples or links, please? That would help me to
>>> understand.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I think that serialising data using spark is an overkill, why not use
>>>> normal python.
>>>>
>>>> Also have you tried repartition by range, that way you can use modulus
>>>> operator to batch things up?
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>>
>>>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>>>
>>>>> My end goal is to achieve is something like the below:
>>>>>
>>>>>
>>>>>1.  Generate the data
>>>>>2. Send the data in the batch of 4k records in one batch since the
>>>>>API can accept the 4k records at once.
>>>>>3. The record would be as the below:
>>>>>4.
>>>>>
>>>>>{
>>>>>"Token": "",
>>>>>"CustomerName": "",
>>>>>"Object": "",
>>>>>"Data": [{"A":"1"},{"A":"2"}]
>>>>>}
>>>>>
>>>>>5. Token will be generated first then it would be passed to the
>>>>>'Token' key in the data.
>>>>>
>>>>> For the above goal, I initially wrote something like the below which
>>>>> gives a heap error because the data frame is getting created on the driver
>>>>> side, and the size of the records is a minimum of 1M.
>>>>>df = modifiedData # Assume it to be query results stored as
>>>>> a DF
>>>>>
>>>>> df = df.withColumn("uniqueID", lit("1"))
>>>>>
>>>>> df = df.withColumn("row_num", row_number().over(
>>>>>
>>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>>> ))
>>>>> tokenUrl = ""
>>>>> # tokenUrl = ""
>>>>> policyUrl = ""
>>>>> tokenBody = {"Username": "", "Password": "",
>>>>> "CustomerName": ""}
>>>>>
>>>>> def get_token(url, payload):
>>>>> try:
>>>>> print("Getting Token")
>>>>> response = requests.request("POST", url,
>>>>> data=payload)
>>>>> data = response.json()
>>>>> if data['ErrorDescription'] == 'Success':
>>>>> token = data['Token']
>>>>> 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,

try to use mod of a monotonically increasing field and then use
repartitionbyrange function, and see whether SPARK automatically serialises
it based on the number of executors that you put in the job.

But once again, this is kind of an overkill, for fetching data from a API,
creating a simple python program works quite well.


Regards,
Gourav

On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:

> Hi Gourav,
>
> Do you have any examples or links, please? That would help me to
> understand.
>
> Thanks,
> Sid
>
> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>> I think that serialising data using spark is an overkill, why not use
>> normal python.
>>
>> Also have you tried repartition by range, that way you can use modulus
>> operator to batch things up?
>>
>> Regards,
>> Gourav
>>
>>
>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>
>>> My end goal is to achieve is something like the below:
>>>
>>>
>>>1.  Generate the data
>>>2. Send the data in the batch of 4k records in one batch since the
>>>API can accept the 4k records at once.
>>>3. The record would be as the below:
>>>4.
>>>
>>>{
>>>"Token": "",
>>>"CustomerName": "",
>>>"Object": "",
>>>"Data": [{"A":"1"},{"A":"2"}]
>>>}
>>>
>>>5. Token will be generated first then it would be passed to the
>>>'Token' key in the data.
>>>
>>> For the above goal, I initially wrote something like the below which
>>> gives a heap error because the data frame is getting created on the driver
>>> side, and the size of the records is a minimum of 1M.
>>>df = modifiedData # Assume it to be query results stored as a
>>> DF
>>>
>>> df = df.withColumn("uniqueID", lit("1"))
>>>
>>> df = df.withColumn("row_num", row_number().over(
>>>
>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>> ))
>>> tokenUrl = ""
>>> # tokenUrl = ""
>>> policyUrl = ""
>>> tokenBody = {"Username": "", "Password": "", "CustomerName":
>>> ""}
>>>
>>> def get_token(url, payload):
>>> try:
>>> print("Getting Token")
>>> response = requests.request("POST", url,
>>> data=payload)
>>> data = response.json()
>>> if data['ErrorDescription'] == 'Success':
>>> token = data['Token']
>>> print(":::Token Generated")
>>> else:
>>> print('TokenNotGeneratedFrom: ')
>>> # raise TokenNotGeneratedFrom(500, 'Token not
>>> Generated from ')
>>> return token
>>> except Exception as e:
>>> print('TokenNotGeneratedFrom: ' + str(e))
>>> # raise TokenNotGeneratedFrom(500, str(e))
>>>
>>> def call_to_cust_bulk_api(url, payload):
>>> print("Calling Bulk API")
>>> try:
>>> # TODO: write code...
>>> headers = {'content-type': 'application/json'}
>>> print(":::jsn load")
>>> # print(json.dumps(payload))
>>> # print(payload)
>>> response = requests.post(url,
>>> data=json.dumps(payload), headers=headers)
>>> # print(json.dumps(payload))
>>> data = response.json()
>>> return data
>>> except Exception as e:
>>> print('ExceptionInPushingDataTo: ' + str(e))
>>> # raise ExceptionInPushingDataTo(500, str(e))
>>>
>>> total_count = df.count()
>>> 

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Gourav Sengupta
Hi,
I think that serialising data using spark is an overkill, why not use
normal python.

Also have you tried repartition by range, that way you can use modulus
operator to batch things up?

Regards,
Gourav


On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:

> Hi Team,
>
> I am trying to hit the POST APIs for the very first time using Pyspark.
>
> My end goal is to achieve is something like the below:
>
>
>1.  Generate the data
>2. Send the data in the batch of 4k records in one batch since the API
>can accept the 4k records at once.
>3. The record would be as the below:
>4.
>
>{
>"Token": "",
>"CustomerName": "",
>"Object": "",
>"Data": [{"A":"1"},{"A":"2"}]
>}
>
>5. Token will be generated first then it would be passed to the
>'Token' key in the data.
>
> For the above goal, I initially wrote something like the below which gives
> a heap error because the data frame is getting created on the driver side,
> and the size of the records is a minimum of 1M.
>df = modifiedData # Assume it to be query results stored as a DF
>
> df = df.withColumn("uniqueID", lit("1"))
>
> df = df.withColumn("row_num", row_number().over(
>
> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
> ))
> tokenUrl = ""
> # tokenUrl = ""
> policyUrl = ""
> tokenBody = {"Username": "", "Password": "", "CustomerName":
> ""}
>
> def get_token(url, payload):
> try:
> print("Getting Token")
> response = requests.request("POST", url, data=payload)
> data = response.json()
> if data['ErrorDescription'] == 'Success':
> token = data['Token']
> print(":::Token Generated")
> else:
> print('TokenNotGeneratedFrom: ')
> # raise TokenNotGeneratedFrom(500, 'Token not
> Generated from ')
> return token
> except Exception as e:
> print('TokenNotGeneratedFrom: ' + str(e))
> # raise TokenNotGeneratedFrom(500, str(e))
>
> def call_to_cust_bulk_api(url, payload):
> print("Calling Bulk API")
> try:
> # TODO: write code...
> headers = {'content-type': 'application/json'}
> print(":::jsn load")
> # print(json.dumps(payload))
> # print(payload)
> response = requests.post(url,
> data=json.dumps(payload), headers=headers)
> # print(json.dumps(payload))
> data = response.json()
> return data
> except Exception as e:
> print('ExceptionInPushingDataTo: ' + str(e))
> # raise ExceptionInPushingDataTo(500, str(e))
>
> total_count = df.count()
> i = 1
> while i < total_count:
> rangeNum = i + 3999
> print("Range Num:::")
> print(rangeNum)
> df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
> rangeNum))
> df1.cache()
> maxValue =
> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
> finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
> print("finalDF count:::", finalDF.count())
> token = get_token(tokenUrl, tokenBody)
>
> result =
> json.loads((finalDF.toPandas().to_json(orient="records")))
> # token = get_token(tokenUrl, tokenBody)
> custRequestBody = {
> "Token": token,
> "CustomerName": "",
> "Object": "",
> "Data": result
> }
>
> # print("Customer Request Body::")
> # print(json.dumps(custRequestBody))
> response = call_to_cust_bulk_api(policyUrl,
> custRequestBody)
> print(response)
> finalDFStatus = finalDF.withColumn("edl_timestamp",
> to_timestamp(lit(F.TimeNow(.withColumn(
> "status_for_each_batch",
> lit(str(response)))
>
>
> print("Max Value:::")
> print(maxValue)
> print("Next I:::")
> i = rangeNum + 1
> print(i)
>
> This is my very first approach to hitting the APIs with Spark. So, could
> anyone please help me to redesign the approach, or can share some links or
> references using which I can go to the depth of this and rectify myself.
> How can I scale this?
>
>
> Any help is much appreciated.
>
> TIA,
> Sid
>


Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

2022-05-31 Thread Gourav Sengupta
Hi,
just to elaborate what Ranadip has pointed out here correctly, gzip files
are read only by one executor, where as a bzip file can be read by multiple
executors therefore their reading speed will be parallelised and higher.

try to use bzip2 for kafka connect.

Regards,
Gourav Sengupta

On Mon, May 30, 2022 at 10:06 PM Ranadip Chatterjee 
wrote:

> Gzip files are not splittable. Hence using very large (i.e. non
> partitioned) gzip files lead to contention at reading the files as readers
> cannot scale beyond the number of gzip files to read.
>
> Better to use a splittable compression format instead to allow frameworks
> to scale up. Or manually manage scaling by using partitions, as you are
> doing now.
>
> On Mon, 30 May 2022, 08:54 Ori Popowski,  wrote:
>
>> Thanks.
>>
>> Eventually the problem was solved. I am still not 100% sure what caused
>> it but when I said the input was identical I simplified a bit because it
>> was not (sorry for misleading, I thought this information would just be
>> noise). Explanation: the input to the EMR job was gzips created by Firehose
>> and partitioned hourly. The input to the Dataproc job is gzips created by
>> Kafka Connect and is not partitioned hourly. Otherwise the *content* itself
>> is identical.
>>
>> When we started partitioning the files hourly the problem went away
>> completely.
>>
>> I am still not sure what's going on exactly. If someone has some insight
>> it's welcome.
>>
>> Thanks!
>>
>> On Fri, May 27, 2022 at 9:45 PM Aniket Mokashi 
>> wrote:
>>
>>> +cloud-dataproc-discuss
>>>
>>> On Wed, May 25, 2022 at 12:33 AM Ranadip Chatterjee 
>>> wrote:
>>>
>>>> To me, it seems like the data being processed on the 2 systems is not
>>>> identical. Can't think of any other reason why the single task stage will
>>>> get a different number of input records in the 2 cases. 700gb of input to a
>>>> single task is not good, and seems to be the bottleneck.
>>>>
>>>> On Wed, 25 May 2022, 06:32 Ori Popowski,  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Both jobs use spark.dynamicAllocation.enabled so there's no need to
>>>>> change the number of executors. There are 702 executors in the Dataproc
>>>>> cluster so this is not the problem.
>>>>> About number of partitions - this I didn't change and it's still 400.
>>>>> While writing this now, I am realising that I have more partitions than
>>>>> executors, but the same situation applies to EMR.
>>>>>
>>>>> I am observing 1 task in the final stage also on EMR. The difference
>>>>> is that on EMR that task receives 50K volume of data and on Dataproc it
>>>>> receives 700gb. I don't understand why it's happening. It can mean that 
>>>>> the
>>>>> graph is different. But the job is exactly the same. Could it be because
>>>>> the minor version of Spark is different?
>>>>>
>>>>> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <
>>>>> ranadi...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ori,
>>>>>>
>>>>>> A single task for the final step can result from various scenarios
>>>>>> like an aggregate operation that results in only 1 value (e.g count) or a
>>>>>> key based aggregate with only 1 key for example. There could be other
>>>>>> scenarios as well. However, that would be the case in both EMR and 
>>>>>> Dataproc
>>>>>> if the same code is run on the same data in both cases.
>>>>>>
>>>>>> On a separate note, since you have now changed the size and number of
>>>>>> nodes, you may need to re-optimize the number and size of executors for 
>>>>>> the
>>>>>> job and perhaps the number of partitions as well to optimally use the
>>>>>> cluster resources.
>>>>>>
>>>>>> Regards,
>>>>>> Ranadip
>>>>>>
>>>>>> On Tue, 24 May 2022, 10:45 Ori Popowski,  wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>
>>>>>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark
>>>>>>> 2.4.8. I am creating a cluster with the exact same configuration, where 
>>>>>>> the
>>>>>>> only difference is that the original cluster u

Re: Complexity with the data

2022-05-26 Thread Gourav Sengupta
Hi,
can you please give us a simple map of what the input is and what the
output should be like? From your description it looks a bit difficult to
figure out what exactly or how exactly you want the records actually parsed.


Regards,
Gourav Sengupta

On Wed, May 25, 2022 at 9:08 PM Sid  wrote:

> Hi Experts,
>
> I have below CSV data that is getting generated automatically. I can't
> change the data manually.
>
> The data looks like below:
>
> 2020-12-12,abc,2000,,INR,
> 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
> 2020-12-09,fgh,,software_developer,I only manage the development part.
>
> Since I don't have much experience with the other domains.
>
> It is handled by the other people.,INR
> 2020-12-12,abc,2000,,USD,
>
> The third record is a problem. Since the value is separated by the new
> line by the user while filling up the form. So, how do I handle this?
>
> There are 6 columns and 4 records in total. These are the sample records.
>
> Should I load it as RDD and then may be using a regex should eliminate the
> new lines? Or how it should be? with ". /n" ?
>
> Any suggestions?
>
> Thanks,
> Sid
>


Re: Problem with implementing the Datasource V2 API for Salesforce

2022-05-24 Thread Gourav Sengupta
Hi,

in the spirit of not fitting the solution to the problem, would it not be
better to first create a producer for your job and use a broker like Kafka
or Kinesis or Pulsar?


Regards,
Gourav Sengupta

On Sat, May 21, 2022 at 3:46 PM Rohit Pant  wrote:

> Hi all,
>
> I am trying to implement a custom Spark Datasource for Salesforce by
> implementing the Spark Datasource V2 interfaces. For querying Salesforce
> data parallelly I am planning to use the Salesforce Bulk V1 API
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
> .
>
> I am aware that there is an existing Salesforce Spark library by Springml,
> but it doesn't support the authentication types I need and isn't compatible
> with Spark 3.x.
>
> I am not using the Bulk V2 Salesforce API as it is serial in nature. You
> submit a query, Salesforce automatically creates the batches, you then need
> to poll for results and iterate over the batches using batch locators
> returned in the header.
>
> I am planning to make it work like this -
>
>
>1. The user specifies the options numPartitions and dbtable. Using
>this, internally I will fetch the record counts for that Salesforce object
>and deduce the chunk size to be used for querying the data using PK
>chunking.
>
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
>2. The Bulk API is asynchronous. You need to create a Job and then
>create query batches within the job. If you specify that you want to use PK
>Chunking along with the chunk size you want then Salesforce automatically
>creates the batches for you. You then need to poll for and fetch the
>results for each batch using a batch-specific URL.
>3. I am planning to pass the batch ID to each executor using an
>InputPartition object. Each executor will then poll for and fetch the
>results.
>
> I am having trouble deciding how I would go about creating the Bulk job
> and submitting the batches on the driver node before dispatching the batch
> ids to the executors. I tried doing this in the implementation of the
> planInputPartitions method for the Batch interface, but it turns out that
> it is called 2-3 times per each action(show, collect etc.), thus creating
> unnecessary Bulk jobs.
>
> One potential solution that might work is maintaining a set of hashed user
> options in the static scope for the implementation of the Batch interface
> (using a companion object) and only creating the job if it doesn't exist in
> the set. However, I find this solution to be very clumsy. Also, what
> happens if a user submits multiple actions on a dataframe. I could maybe
> also have a TTL for the set entries, but you see how it gets complicated.
>
> Would really appreciate any pointers on the ideal way to achieve what I
> want.
>
> Regards,
>
> Rohit Pant
>


Re: Spark error with jupyter

2022-05-04 Thread Gourav Sengupta
Hi,
looks like spark listener is not working? Is your session still running?

Try to see the SPARK UI to find out whether the session is still active or
not

Regards,
Gourav

On Tue, May 3, 2022 at 7:37 PM Bjørn Jørgensen 
wrote:

> I use jupyterlab and spark and I have not seen this before.
>
> Jupyter has a docker stack with pyspark
> 
>  you
> can try it.
>
> tor. 21. apr. 2022 kl. 11:07 skrev Wassim Yaich :
>
>> Hi Folks,
>> I am working on spark in jupyter but I have a small error for each
>> running .
>> anyone have the same error or have a solution , please tell me .
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: structured streaming- checkpoint metadata growing indefinetely

2022-04-29 Thread Gourav Sengupta
Hi,

this may not solve the problem, but have you tried to stop the job
gracefully, and then restart without much delay by pointing to a new
checkpoint location? The approach will have certain uncertainties for
scenarios where the source system can loose data, or we do not expect
duplicates to be committed, etc.

It will be good to know what kind of processing you are doing as well.


Regards,
Gourav

On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk 
wrote:

> Update for the scenario of deleting compact files: it recovers from the
> recent (not compacted) checkpoint file, but when it comes to compaction of
> checkpoint then it fails with missing recent compaction file. I use Spark
> 3.1.2
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>
>
> pt., 29 kwi 2022 o 07:00 Wojciech Indyk 
> napisał(a):
>
>> Hello!
>> I use spark struture streaming. I need to use s3 for storing checkpoint
>> metadata (I know, it's not optimal storage for checkpoint metadata).
>> Compaction interval is 10 (default) and I set
>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>> few weeks then checkpointing time increased significantly (cause a few
>> minutes dalay on processing). I looked at checkpoint metadata structure.
>> There is one heavy path there: checkpoint/source/0. Single .compact file
>> weights 25GB. I looked into its content and it contains all entries since
>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>> already processed data from the compact file, namely:
>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work.
>> As I've seen in the code it's related to previous version of streaming,
>> isn't it?
>> "spark.sql.streaming.fileSource.log.deletion"=true and
>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>> The compact file store full history even if all data were processed
>> (except for the most recent checkpoint), so I expect most of entries would
>> be deleted. Is there any parameter to remove entries from compact file or
>> remove compact file gracefully from time to time? Now I am testing scenario
>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>> recovers correctly from recent checkpoint. It looks like possible
>> workaround of my problem, but this scenario with manual delete of
>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>
>> --
>> Kind regards/ Pozdrawiam,
>> Wojciech Indyk
>>
>


Re: Dealing with large number of small files

2022-04-27 Thread Gourav Sengupta
Hi,
did that result in valid JSON in the output file?

Regards,
Gourav Sengupta

On Tue, Apr 26, 2022 at 8:18 PM Sid  wrote:

> I have .txt files with JSON inside it. It is generated by some API calls
> by the Client.
>
> On Wed, Apr 27, 2022 at 12:39 AM Bjørn Jørgensen 
> wrote:
>
>> What is that you have? Is it txt files or json files?
>> Or do you have txt files with JSON inside?
>>
>>
>>
>> tir. 26. apr. 2022 kl. 20:41 skrev Sid :
>>
>>> Thanks for your time, everyone :)
>>>
>>> Much appreciated.
>>>
>>> I solved it using jq utility since I was dealing with JSON. I have
>>> solved it using below script:
>>>
>>> find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt
>>>
>>>
>>> Thanks,
>>>
>>> Sid
>>>
>>>
>>> On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen <
>>> bjornjorgen...@gmail.com> wrote:
>>>
>>>> and the bash script seems to read txt files not json
>>>>
>>>> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>>>>
>>>>
>>>>
>>>> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
>>>> gourav.sengu...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> what is the version of spark are you using? And where is the data
>>>>> stored.
>>>>>
>>>>> I am not quite sure that just using a bash script will help because
>>>>> concatenating all the files into a single file creates a valid JSON.
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Can somebody help me with the below problem?
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Bjørn Jørgensen
>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>> Norge
>>>>
>>>> +47 480 94 297
>>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Re: Dealing with large number of small files

2022-04-26 Thread Gourav Sengupta
Hi,

what is the version of spark are you using? And where is the data stored.

I am not quite sure that just using a bash script will help because
concatenating all the files into a single file creates a valid JSON.

Regards,
Gourav

On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:

> Hello,
>
> Can somebody help me with the below problem?
>
>
> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>
>
> Thanks,
> Sid
>


Re: Streaming write to orc problem

2022-04-23 Thread Gourav Sengupta
Hi,
can you please take a screen shot and show us the number of records that
the streaming programme is reading from the source? If I am not mistaken it
should be able to write out records to the output location every 5 mins.

Also, it may be of help to check whether  you have permissions to write to
the output location?


Thanks and Regards,
Gourav Sengupta

On Fri, Apr 22, 2022 at 3:57 PM hsy...@gmail.com  wrote:

> Hello all,
>
> I’m just trying to build a pipeline reading data from a streaming source
> and write to orc file. But I don’t see any file that is written to the
> file system nor any exceptions
>
> Here is an example
>
> val df = spark.readStream.format(“...")
>   .option(
> “Topic",
> "Some topic"
>   )
>   .load()
> val q = df.writeStream.format("orc").option("path",
> "gs://testdata/raw")
>   .option("checkpointLocation",
> "gs://testdata/raw_chk").trigger(Trigger.ProcessingTime(5,
> TimeUnit.SECONDS)).start
> q.awaitTermination(120)
> q.stop()
>
>
> I couldn’t find any file until 1200 seconds are over
> Does it mean all the data is cached in memory. If I keep the pipeline
> running I see no file would be flushed in the file system.
>
> How do I control how often spark streaming write to disk?
>
> Thanks!
>


Re: Question about bucketing and custom partitioners

2022-04-11 Thread Gourav Sengupta
Hi,
have you checked skew settings in SPARK 3.2?

I am also not quite sure why you need a custom partitioner? While RDD still
remains a valid option you must try to explore the recent ways of thinking
and framing better solutions using SPARK.

Regards,
Gourav Sengupta

On Mon, Apr 11, 2022 at 4:47 PM David Diebold 
wrote:

> Hello,
>
> I have a few questions related to bucketing and custom partitioning in
> dataframe api.
>
> I am considering bucketing to perform one-side free shuffle join in
> incremental jobs, but there is one thing that I'm not happy with.
> Data is likely to grow/skew over time. At some point, i would need to
> change amount of buckets which would provoke shuffle.
>
> Instead of this, I would like to use a custom partitioner, that would
> replace shuffle by narrow transformation.
> That is something that was feasible with RDD developer api. For example, I
> could use such partitioning scheme:
> partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
> (Int.maxValue - Int.minValue)
> When I multiply amount of partitions by 2 each new partition depends only
> on one partition from parent (=> narrow transformation)
>
> So, here are my questions :
>
> 1/ Is it possible to use custom partitioner when saving a dataframe with
> bucketing ?
> 2/ Still with the API dataframe, is it possible to apply custom
> partitioner to a dataframe ?
> Is it possible to repartition the dataframe with a narrow
> transformation like what could be done with RDD ?
> Is there some sort of dataframe developer API ? Do you have any
> pointers on this ?
>
> Thanks !
> David
>


Re: Spark 3.0.1 and spark 3.2 compatibility

2022-04-08 Thread Gourav Sengupta
Hi,
absolutely agree with Sean, besides that please see the release notes as
well for SPARK versions, they do mention about any issues around
compatibility

Regards,
Gourav

On Thu, Apr 7, 2022 at 6:32 PM Sean Owen  wrote:

> (Don't cross post please)
> Generally you definitely want to compile and test vs what you're running
> on.
> There shouldn't be many binary or source incompatibilities -- these are
> avoided in a major release where possible. So it may need no code change.
> But I would certainly recompile just on principle!
>
> On Thu, Apr 7, 2022 at 12:28 PM Pralabh Kumar 
> wrote:
>
>> Hi spark community
>>
>> I have quick question .I am planning to migrate from spark 3.0.1 to spark
>> 3.2.
>>
>> Do I need to recompile my application with 3.2 dependencies or
>> application compiled with 3.0.1 will work fine on 3.2 ?
>>
>>
>> Regards
>> Pralabh kumar
>>
>>


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Gourav Sengupta
Hi,
super duper.

Please try to see if you can write out the data to S3, and then write a
load script to load that data from S3 to HBase.


Regards,
Gourav Sengupta


On Wed, Apr 6, 2022 at 4:39 PM Joris Billen 
wrote:

> HI,
> thanks for your reply.
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>
> Here more details:
>
> **Spark: 2.4
> **operation: many spark sql statements followed by writing data to a nosql
> db from spark
> like this:
> df=read(fromhdfs)
> df2=spark.sql(using df 1)
> ..df10=spark.sql(using df9)
> spark.sql(CACHE TABLE df10)
> df11 =spark.sql(using df10)
> df11.write
> Df12 =spark.sql(using df10)
> df12.write
> df13 =spark.sql(using df10)
> df13.write
> **caching: yes one df that I will use to eventually write 3 x to a db
> (those 3 are different)
> **Loops: since I need to process several years, and processing 1 day is
> already a complex process (40 minutes on 9 node cluster running quite a bit
> of executors). So in the end it will do all at one go and there is a limit
> of how much data I can process in one go with the available resources.
> Some people here pointed out they believe this looping should not be
> necessary. But what is the alternative?
> —> Maybe I can write to disk somewhere in the middle, and read again from
> there so that in the end not all must happen in one go in memory.
>
>
>
>
>
>
>
> On 5 Apr 2022, at 14:58, Gourav Sengupta 
> wrote:
>
> Hi,
>
> can you please give details around:
> spark version, what is the operation that you are running, why in loops,
> and whether you are caching in any data or not, and whether you are
> referencing the variables to create them like in the following expression
> we are referencing x to create x, x = x + 1
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Clear-probably not a good idea.
>>
>> But a previous comment said “you are doing everything in the end in one
>> go”.
>> So this made me wonder: in case your only action is a write in the end
>> after lots of complex transformations, then what is the alternative for
>> writing in the end which means doing everything all at once in the end? My
>> understanding is that if there is no need for an action earlier, you will
>> do all at the end, which means there is a limitation to how many days you
>> can process at once. And hence the solution is to loop over a couple days,
>> and submit always the same spark job just for other input.
>>
>>
>> Thanks!
>>
>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>
>> This feels like premature optimization, and not clear it's optimizing,
>> but maybe.
>> Caching things that are used once is worse than not caching. It looks
>> like a straight-line through to the write, so I doubt caching helps
>> anything here.
>>
>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> as said thanks for little discussion over mail.
>>> I understand that the action is triggered in the end at the write and
>>> then all of a sudden everything is executed at once. But I dont really need
>>> to trigger an action before. I am caching somewherew a df that will be
>>> reused several times (slightly updated pseudocode below).
>>>
>>> Question: is it then better practice to already trigger some actions on
>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>> actions will not be that expensive yet, and the actions to write at the end
>>> will require less resources, which would allow to process more days in one
>>> go? LIke what is added in red in improvement section in the pseudo code
>>> below?
>>>
>>>
>>>
>>> *pseudocode:*
>>>
>>>
>>> *loop over all days:*
>>> *spark submit 1 day*
>>>
>>>
>>>
>>> with spark submit (overly simplified)=
>>>
>>>
>>> *  df=spark.read(hfs://somepath)*
>>> *  …*
>>> *   ##IMPROVEMENT START*
&g

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-05 Thread Gourav Sengupta
Hi,

can you please give details around:
spark version, what is the operation that you are running, why in loops,
and whether you are caching in any data or not, and whether you are
referencing the variables to create them like in the following expression
we are referencing x to create x, x = x + 1

Thanks and Regards,
Gourav Sengupta

On Mon, Apr 4, 2022 at 10:51 AM Joris Billen 
wrote:

> Clear-probably not a good idea.
>
> But a previous comment said “you are doing everything in the end in one
> go”.
> So this made me wonder: in case your only action is a write in the end
> after lots of complex transformations, then what is the alternative for
> writing in the end which means doing everything all at once in the end? My
> understanding is that if there is no need for an action earlier, you will
> do all at the end, which means there is a limitation to how many days you
> can process at once. And hence the solution is to loop over a couple days,
> and submit always the same spark job just for other input.
>
>
> Thanks!
>
> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>
> This feels like premature optimization, and not clear it's optimizing, but
> maybe.
> Caching things that are used once is worse than not caching. It looks like
> a straight-line through to the write, so I doubt caching helps anything
> here.
>
> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
> wrote:
>
>> Hi,
>> as said thanks for little discussion over mail.
>> I understand that the action is triggered in the end at the write and
>> then all of a sudden everything is executed at once. But I dont really need
>> to trigger an action before. I am caching somewherew a df that will be
>> reused several times (slightly updated pseudocode below).
>>
>> Question: is it then better practice to already trigger some actions on
>>  intermediate data frame (like df4 and df8), and cache them? So that these
>> actions will not be that expensive yet, and the actions to write at the end
>> will require less resources, which would allow to process more days in one
>> go? LIke what is added in red in improvement section in the pseudo code
>> below?
>>
>>
>>
>> *pseudocode:*
>>
>>
>> *loop over all days:*
>> *spark submit 1 day*
>>
>>
>>
>> with spark submit (overly simplified)=
>>
>>
>> *  df=spark.read(hfs://somepath)*
>> *  …*
>> *   ##IMPROVEMENT START*
>> *   df4=spark.sql(some stuff with df3)*
>> *   spark.sql(CACHE TABLE df4)*
>> *   …*
>> *   df8=spark.sql(some stuff with df7)*
>> *   spark.sql(CACHE TABLE df8)*
>> *  ##IMPROVEMENT END*
>> *   ...*
>> *   df12=df11.spark.sql(complex stufff)*
>> *  spark.sql(CACHE TABLE df10)*
>> *   ...*
>> *  df13=spark.sql( complex stuff with df12)*
>> *  df13.write *
>> *  df14=spark.sql( some other complex stuff with df12)*
>> *  df14.write *
>> *  df15=spark.sql( some completely other complex stuff with df12)*
>> *  df15.write *
>>
>>
>>
>>
>>
>>
>> THanks!
>>
>>
>>
>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>>
>> If that is your loop unrolled, then you are not doing parts of work at a
>> time. That will execute all operations in one go when the write finally
>> happens. That's OK, but may be part of the problem. For example if you are
>> filtering for a subset, processing, and unioning, then that is just a
>> harder and slower way of applying the transformation to all data at once.
>>
>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Thanks for reply :-)
>>>
>>> I am using pyspark. Basicially my code (simplified is):
>>>
>>> df=spark.read.csv(hdfs://somehdfslocation)
>>> df1=spark.sql (complex statement using df)
>>> ...
>>> dfx=spark.sql(complex statement using df x-1)
>>> ...
>>> dfx15.write()
>>>
>>>
>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>> cached dataframes at the end and stopping the spark context explicitly:
>>> sc.stop()?
>>>
>>>
>>> FOr processing many years at once versus a chunk in a loop: I see that
>>> if I go up to certain number of days, one iteration will start to have
>>> tasks that fail. So I only take a limited number of days, and do this
>>> process several times. Isnt this normal as you are always somehow limited
>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>> in theory you cou

Re: [Spark SQL] Structured Streaming in pyhton can connect to cassandra ?

2022-03-25 Thread Gourav Sengupta
Hi,

completely agree with Alex, also if you are just writing to Cassandra then
what is the purpose of writing to Kafka broker?

Generally people just find it sound as if adding more components to their
architecture is great, but sadly it is not. Remove the Kafka broker, incase
you are not broadcasting your messages to a set of wider solutions. Also
SPARK is an overkill in the way you are using it.

There are fantastic solutions available in the market like Presto SQL, Big
Query, Redshift, Athena, Snowflake, etc and SPARK is just one of the tools
and often a difficult one to configure and run.

Regards,
Gourav Sengupta

On Fri, Mar 25, 2022 at 1:19 PM Alex Ott  wrote:

> You don't need to use foreachBatch to write to Cassandra. You just need to
> use Spark Cassandra Connector version 2.5.0 or higher - it supports native
> writing of stream data into Cassandra.
>
> Here is an announcement:
> https://www.datastax.com/blog/advanced-apache-cassandra-analytics-now-open-all
>
> guillaume farcy  at "Mon, 21 Mar 2022 16:33:51 +0100" wrote:
>  gf> Hello,
>
>  gf> I am a student and I am currently doing a big data project.
>  gf> Here is my code:
>  gf> https://gist.github.com/Balykoo/262d94a7073d5a7e16dfb0d0a576b9c3
>
>  gf> My project is to retrieve messages from a twitch chat and send them
> into kafka then spark
>  gf> reads the kafka topic to perform the processing in the provided gist.
>
>  gf> I will want to send these messages into cassandra.
>
>  gf> I tested a first solution on line 72 which works but when there are
> too many messages
>  gf> spark crashes. Probably due to the fact that my function connects to
> cassandra each time
>  gf> it is called.
>
>  gf> I tried the object approach to mutualize the connection object but
> without success:
>  gf> _pickle.PicklingError: Could not serialize object: TypeError: cannot
> pickle
>  gf> '_thread.RLock' object
>
>  gf> Can you please tell me how to do this?
>  gf> Or at least give me some advice?
>
>  gf> Sincerely,
>  gf> FARCY Guillaume.
>
>
>
>  gf> -
>  gf> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> With best wishes,Alex Ott
> http://alexott.net/
> Twitter: alexott_en (English), alexott (Russian)
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Continuous ML model training in stream mode

2022-03-17 Thread Gourav Sengupta
Dear friends,

a few years ago, I was in a London meetup seeing Sean (Owen) demonstrate
how we can try to predict the gender of individuals who are responding to
tweets after accepting privacy agreements, in case I am not wrong.

It was real time, it was spectacular, and it was the presentation that set
me into data science and its applications.

Thanks Sean! :)

Regards,
Gourav Sengupta




On Tue, Mar 15, 2022 at 9:39 PM Artemis User  wrote:

> Thanks Sean!  Well, it looks like we have to abandon our structured
> streaming model to use DStream for this, or do you see possibility to use
> structured streaming with ml instead of mllib?
>
> On 3/15/22 4:51 PM, Sean Owen wrote:
>
> There is a streaming k-means example in Spark.
> https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means
>
> On Tue, Mar 15, 2022, 3:46 PM Artemis User  wrote:
>
>> Has anyone done any experiments of training an ML model using stream
>> data? especially for unsupervised models?   Any suggestions/references
>> are highly appreciated...
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Question on List to DF

2022-03-16 Thread Gourav Sengupta
Hi Jayesh,

thanks found your email quite interesting :)


Regards,
Gourav

On Wed, Mar 16, 2022 at 8:02 AM Bitfox  wrote:

> Thank you. that makes sense.
>
> On Wed, Mar 16, 2022 at 2:03 PM Lalwani, Jayesh 
> wrote:
>
>> The toDF function in scala uses a bit of Scala magic that allows you to
>> add methods to existing classes. Here’s a link to explanation
>> https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html
>>
>>
>>
>> In short, you can implement a class that extends the List class and add
>> methods to your  list class, and you can implement an implicit converter
>> that converts from List to your class. When the Scala compiler sees that
>> you are calling a function on a List object that doesn’t exist in the List
>> class, it will look for implicit converters that convert List object to
>> another object that has the function, and will automatically call it.
>>
>> So, if you have a class
>>
>> Class MyList extends List {
>> def toDF(colName: String): DataFrame{
>> …..
>> }
>> }
>>
>> and a implicit converter
>> implicit def convertListToMyList(list: List): MyList {
>>
>> ….
>> }
>>
>> when you do
>> List("apple","orange","cherry").toDF("fruit")
>>
>>
>>
>> Internally, Scala will generate the code as
>> convertListToMyList(List("apple","orange","cherry")).toDF("fruit")
>>
>>
>>
>>
>>
>> *From: *Bitfox 
>> *Date: *Wednesday, March 16, 2022 at 12:06 AM
>> *To: *"user @spark" 
>> *Subject: *[EXTERNAL] Question on List to DF
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> I am wondering why the list in scala spark can be converted into a
>> dataframe directly?
>>
>>
>>
>> scala> val df = List("apple","orange","cherry").toDF("fruit")
>>
>> *df*: *org.apache.spark.sql.DataFrame* = [fruit: string]
>>
>>
>>
>> scala> df.show
>>
>> +--+
>>
>> | fruit|
>>
>> +--+
>>
>> | apple|
>>
>> |orange|
>>
>> |cherry|
>>
>> +--+
>>
>>
>>
>> I don't think pyspark can convert that as well.
>>
>>
>>
>> Thank you.
>>
>


Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark SQL

2022-03-07 Thread Gourav Sengupta
Hi,

are all users using the same cluster of data proc?

Regards,
Gourav

On Mon, Mar 7, 2022 at 9:28 AM Saurabh Gulati 
wrote:

> Thanks for the response, Gourav.
>
> Queries range from simple to large joins. We expose the data to our
> analytics users so that they can develop their models and they use superset
> as the SQL interface for testing.
>
> Hive-metastore will *not* do a full scan *if* we specify the partitioning
> column.
> But that's something users might/do forget, so we were thinking of
> enforcing a way to make sure people *do* specify partitioning column in
> their queries.
>
> The only way we see for now is to parse the query in superset to check if
> partition column is being used. But we are not sure of a way which will
> work for all types of queries.
>
> For example, we can parse the SQL and see if count (where​) == count(
> partition_column​ ), but this may not work for complex queries.
>
>
> Regards
> Saurabh
> --
> *From:* Gourav Sengupta 
> *Sent:* 05 March 2022 11:06
> *To:* Saurabh Gulati 
> *Cc:* Mich Talebzadeh ; Kidong Lee <
> mykid...@gmail.com>; user@spark.apache.org 
> *Subject:* Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in
> Spark SQL
>
> Hi,
>
> I completely agree with Saurabh, the use of BQ with SPARK does not make
> sense at all, if you are trying to cut down your costs. I think that costs
> do matter to a few people at the end.
>
> Saurabh, is there any chance you can see what actual queries are hitting
> the thrift server? Using hive metastore is something that I have been doing
> in AWS EMR for the last 5 years and for sure it does not cause full table
> scan.
>
> Hi Sean,
> for some reason, I am not able to receive any emails from the spark user
> group. My account should be a very old one, is there any chance you can
> kindly have a look into it and kindly let me know if there is something
> blocking me? I will be sincerely obliged.
>
> Regards,
> Gourav Sengupta
>
>
> On Tue, Feb 22, 2022 at 3:58 PM Saurabh Gulati
>  wrote:
>
> Hey Mich,
> We use spark 3.2 now. We are using BQ but migrating away because:
>
>- Its not reflective of our current lake structure with all
>deltas/history tables/models outputs etc
>- Its pretty expensive to load everything in BQ and essentially it
>will be a copy of all data in gcs. External tables in BQ didnt work for us.
>Currently we store only latest snapshots in BQ. This breaks idempotency of
>models which need to time travel and run in the past.
>- We might move to a different cloud provider in future so we want to
>be cloud agnostic.
>
> So we need to have an execution engine which has the same overview of data
> as we have in gcs.
> We tried presto but performance was similar and presto didn't support auto
> scaling.
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:49
> *To:* Kidong Lee ; Saurabh Gulati <
> saurabh.gul...@fedex.com>
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in
> Spark SQL
>
> Ok interesting.
>
> I am surprised why you are not using BigQuery and using Hive. My
> assumption is that your Spark is version 3.1.1 with standard GKE on
> auto-scaler. What benefits are you getting from Using Hive here? As you
> have your hive tables on gs buckets, you can easily download your hive
> tables into BigQuery and run spark on BigQuery?
>
> HTH
>
> On Tue, 22 Feb 2022 at 15:34, Saurabh Gulati 
> wrote:
>
> Thanks Sean for your response.
>
> @Mich Talebzadeh  We run all workloads on GKE
> as docker containers. So to answer your questions, Hive is running in a
> container as K8S service and spark thrift-server in another container as a
> service and Superset in a third container.
>
> We use Spark on GKE setup to run thrift-server which spawns workers
> depending on the load. For buckets we use gcs.
>
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:05
> *To:* Saurabh Gulati 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark
> SQL
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> Is your hive on prem with external tables in cloud storage?
>
> Where is your spark running from and what cloud buckets are you using?
>
> HTH
>
> On Tue, 22 Feb 2022 at 12:36, Saurabh Gulati
>  wrote:
>
> Hello,
> We are trying to setup Spark as the execut

Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Gourav Sengupta
Hi Anil,

superb, when I said increase the number of partitions, I was implying
shuffle partitions because you are doing de duplicates by default I think
that should be around 200, which can create issues in case your data volume
is large.

I always prefer to SPARK SQL instead of SPARK dataframes. And the number of
records per file configuration should be mentioned in the following link as
maxrecordsperfile or something like that :
https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration
.



Regards,
Gourav Sengupta

On Sat, Mar 5, 2022 at 5:09 PM Anil Dasari  wrote:

> I am not sure how to set the records limit. Let me check. I couldn’t find
> parquet row group size configuration in spark.
>
> For now, I increased the number if shuffle partitions to reduce the
> records processed by task to avoid OOM.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Saturday, March 5, 2022 at 1:59 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> any chance you tried setting the limit on the number of records to be
> written out at a time?
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari  wrote:
>
> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari  wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table()
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates()
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4

Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark SQL

2022-03-05 Thread Gourav Sengupta
Hi,

I completely agree with Saurabh, the use of BQ with SPARK does not make
sense at all, if you are trying to cut down your costs. I think that costs
do matter to a few people at the end.

Saurabh, is there any chance you can see what actual queries are hitting
the thrift server? Using hive metastore is something that I have been doing
in AWS EMR for the last 5 years and for sure it does not cause full table
scan.

Hi Sean,
for some reason, I am not able to receive any emails from the spark user
group. My account should be a very old one, is there any chance you can
kindly have a look into it and kindly let me know if there is something
blocking me? I will be sincerely obliged.

Regards,
Gourav Sengupta


On Tue, Feb 22, 2022 at 3:58 PM Saurabh Gulati
 wrote:

> Hey Mich,
> We use spark 3.2 now. We are using BQ but migrating away because:
>
>- Its not reflective of our current lake structure with all
>deltas/history tables/models outputs etc
>- Its pretty expensive to load everything in BQ and essentially it
>will be a copy of all data in gcs. External tables in BQ didnt work for us.
>Currently we store only latest snapshots in BQ. This breaks idempotency of
>models which need to time travel and run in the past.
>- We might move to a different cloud provider in future so we want to
>be cloud agnostic.
>
> So we need to have an execution engine which has the same overview of data
> as we have in gcs.
> We tried presto but performance was similar and presto didn't support auto
> scaling.
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:49
> *To:* Kidong Lee ; Saurabh Gulati <
> saurabh.gul...@fedex.com>
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in
> Spark SQL
>
> Ok interesting.
>
> I am surprised why you are not using BigQuery and using Hive. My
> assumption is that your Spark is version 3.1.1 with standard GKE on
> auto-scaler. What benefits are you getting from Using Hive here? As you
> have your hive tables on gs buckets, you can easily download your hive
> tables into BigQuery and run spark on BigQuery?
>
> HTH
>
> On Tue, 22 Feb 2022 at 15:34, Saurabh Gulati 
> wrote:
>
> Thanks Sean for your response.
>
> @Mich Talebzadeh  We run all workloads on GKE
> as docker containers. So to answer your questions, Hive is running in a
> container as K8S service and spark thrift-server in another container as a
> service and Superset in a third container.
>
> We use Spark on GKE setup to run thrift-server which spawns workers
> depending on the load. For buckets we use gcs.
>
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:05
> *To:* Saurabh Gulati 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark
> SQL
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> Is your hive on prem with external tables in cloud storage?
>
> Where is your spark running from and what cloud buckets are you using?
>
> HTH
>
> On Tue, 22 Feb 2022 at 12:36, Saurabh Gulati
>  wrote:
>
> Hello,
> We are trying to setup Spark as the execution engine for exposing our data
> stored in lake. We have hive metastore running along with Spark thrift
> server and are using Superset as the UI.
>
> We save all tables as External tables in hive metastore with storge being
> on Cloud.
>
> We see that right now when users run a query in Superset SQL Lab it scans
> the whole table. What we want is to limit the data scan by setting
> something like hive.mapred.mode=strict​ in spark, so that user gets an
> exception if they don't specify a partition column.
>
> We tried setting spark.hadoop.hive.mapred.mode=strict ​in
> spark-defaults.conf​ in thrift server  but it still scans the whole table.
> Also tried setting hive.mapred.mode=strict​ in hive-defaults.conf for
> metastore container.
>
> We use Spark 3.2 with hive-metastore version 3.1.2
>
> Is there a way in spark settings to make it happen.
>
>
> TIA
> Saurabh
>
> --
>
>
>
>view my Linkedin profile
> <https://urldefense.com/v3/__https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!AhGNFqKB8wRZstQ!UkIXXdMGzZQ1fweFWq7S_xng9u_1Pjbpz9cBjBrs_ajvgZ05vnA7VLJ1gTZbg4rhI9Q$>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https://en.everybodywiki.com/Mich_Talebzadeh__;!!AhGNFqKB8wRZstQ!UkIXXdMGzZQ1fweFWq7S_xng9u_1Pjbpz9cBjBrs_ajvgZ05vnA7VLJ1gTZbyZfziHU$>

Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Gourav Sengupta
Hi Anil,

any chance you tried setting the limit on the number of records to be
written out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari  wrote:

> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari  wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table()
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates()
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) 
> *Cc: *Anil Dasari , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the da

Re: {EXT} Re: Spark Parquet write OOM

2022-03-03 Thread Gourav Sengupta
Hi,

I do not think that you are doing anything very particularly concerning
here.

There is a setting in SPARK which limits the number of records that we can
write out at a time you can try that. The other thing that you can try is
to ensure that the number of partitions are more (just like you suggested)
let me know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari  wrote:

> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table()
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates()
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) 
> *Cc: *Anil Dasari , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF)  wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySi

Re: {EXT} Re: Spark Parquet write OOM

2022-03-03 Thread Gourav Sengupta
Hi Anil,

I was trying to work out things for a while yesterday, but may need your
kind help.

Can you please share the code for the following steps?
-
Create DF from hive (from step #c)
- Deduplicate spark DF by primary key
- Write DF to s3 in parquet format
- Write metadata to s3
Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:

> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) 
> *Cc: *Anil Dasari , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF)  wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari 
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" 
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>1. increase the default sql shuffle partitions to reduce load on
>parquet writer tasks to avoid OOM and
>2. Increase user memory (reduce memory fraction) to have more memory
>for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql

Re: Spark Parquet write OOM

2022-03-02 Thread Gourav Sengupta
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or
running joins, or UDFs thus increasing the size of the data before writing
out?
3. Is your pipeline going to change or evolve soon, or the data volumes
going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF)  wrote:

> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari 
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" 
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>1. increase the default sql shuffle partitions to reduce load on
>parquet writer tasks to avoid OOM and
>2. Increase user memory (reduce memory fraction) to have more memory
>for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>  at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>  at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>  at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>  at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>  at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>  at sun.misc.Unsafe.allocateMemory(Native Method)
>
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
>
>  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>  at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>  at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>  at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>  at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>  at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>  at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>  at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>  at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>  at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>  at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>  at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>  at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>  at
> org.apache.parquet.hadoop.Inte

Re: can dataframe API deal with subquery

2022-03-01 Thread Gourav Sengupta
Hi,

why would you want to do that?

Regards,
Gourav

On Sat, Feb 26, 2022 at 8:00 AM  wrote:

> such as this table definition:
>
> > desc people;
> +---+---+--+
> | col_name  | data_type | comment  |
> +---+---+--+
> | name  | string|  |
> | born  | date  |  |
> | sex   | struct  |  |
> | contact   | map|  |
> | jobs  | array |  |
> +---+---+--+
>
> And this sql statement:
>
>   with t1 as (
>   select name,
>   case when sex.id=0 then "female" else "male" end as sex,
>   jobs[1] as lastjob
>   from people)
>   select * from t1 limit 10;
>
>
> how does dataframe run with this kind of subquery?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-28 Thread Gourav Sengupta
Hi Karan,

If you are running at least once operation, then you can restart the failed
job with a new checkpoint area, and you will end up with duplicates in your
target but the job will run fine.

Since you are using stateful operations, if your keys are large to manage
in a state try to use RocksDB, it was introduced by Tathagata Das a few
years ago in the Databricks version, and it has now been made available in
the open source version, it really works well.

Let me know how things go, and what was your final solution.


Regards,
Gourav Sengupta

On Mon, Feb 28, 2022 at 6:02 AM karan alang  wrote:

> Hi Gourav,
>
> Pls see my responses below :
>
> Can you please let us know:
> 1. the SPARK version, and the kind of streaming query that you are
> running?
>
> KA : Apache Spark 3.1.2 - on Dataproc using Ubunto 18.04 (the highest
> Spark version supported on dataproc is 3.1.2) ,
>
> 2. whether you are using at least once, utmost once, or only once concepts?
>
> KA : default value - at-least once delivery semantics
> (per my understanding, i don't believe delivery semantics is related to
> the issue, though)
>
> 3. any additional details that you can provide, regarding the storage
> duration in Kafka, etc?
>
> KA : storage duration - 1 day ..
> However, as I mentioned in the stackoverflow ticket, on readStream ->
> "failOnDataLoss" = "false", so the log retention should not cause this
> issue.
>
> 4. are your running stateful or stateless operations? If you are using
> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
> integrated with SPARK :)
>
> KA : Stateful - since i'm using windowing+watermark in the aggregation
> queries.
>
> Also, thnx - will check the links you provided.
>
> regds,
> Karan Alang
>
> On Sat, Feb 26, 2022 at 3:31 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> Can you please let us know:
>> 1. the SPARK version, and the kind of streaming query that you are
>> running?
>> 2. whether you are using at least once, utmost once, or only once
>> concepts?
>> 3. any additional details that you can provide, regarding the storage
>> duration in Kafka, etc?
>> 4. are your running stateful or stateless operations? If you are using
>> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
>> integrated with SPARK :)
>>
>> Besides the mail sent by Mich, the following are useful:
>> 1.
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>> (see the stop operation, and awaitTermination... operation)
>> 2. Try to always ensure that you are doing exception handling based on
>> the option mentioned in the above link, long running streaming programmes
>> in distributed systems do have issues, and handling exceptions is important
>> 3. There is another thing which I do, and it is around reading the
>> streaming metrics and pushing them for logging, that helps me to know in
>> long running system whether there are any performance issues or not (
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
>> or
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
>> . The following is an interesting reading on the kind of metrics to look
>> out for and the way to interpret them (
>> https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
>> )
>>
>>
>> Regards,
>> Gourav
>>
>>
>> On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Check the thread I forwarded on how to gracefully shutdown spark
>>> structured streaming
>>>
>>> HTH
>>>
>>> On Fri, 25 Feb 2022 at 22:31, karan alang  wrote:
>>>
>>>> Hello All,
>>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>>> data from Kafka, does some processing and puts processed data back into
>>>> Kafka. The program was running fine, when I killed it (to make minor
>>>> changes), and then re-started it.
>>>>
>>>> It is giving me the error -
>>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>>
>>>> Here is the error:
>>>>
>>>> 22/02/25 22:14:08 ERROR 
>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>>>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>>>>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-26 Thread Gourav Sengupta
Hi,

May be the purpose of the article is different, but:
instead of:  sources (trail files) --> kafka --> flume --> write to cloud
storage -->> SSS
a much simpler solution is:  sources (trail files) -->  write to cloud
storage -->> SSS

Putting additional components and hops just does sound a bit difficult for
me to understand.

Regards,
Gourav

On Sat, Feb 26, 2022 at 5:12 PM Mich Talebzadeh 
wrote:

> Besides, is the structure of your checkpoint as in this article of mine?
>
> Processing Change Data Capture with Spark Structured Streaming
> 
>
> Section on "The concept of checkpointing and its value with trigger once"
>
>
> see also
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
>
> You can try to clear the checkpoint directories content, run your spark
> job for a while and try to CTRL c etc to kill and check what are the
> entries under sources sub-directory
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh 
> wrote:
>
>> Check the thread I forwarded on how to gracefully shutdown spark
>> structured streaming
>>
>> HTH
>>
>> On Fri, 25 Feb 2022 at 22:31, karan alang  wrote:
>>
>>> Hello All,
>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>> data from Kafka, does some processing and puts processed data back into
>>> Kafka. The program was running fine, when I killed it (to make minor
>>> changes), and then re-started it.
>>>
>>> It is giving me the error -
>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>
>>> Here is the error:
>>>
>>> 22/02/25 22:14:08 ERROR 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>> at scala.Option.getOrElse(Option.scala:189)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>> at 
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>> at 
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>> at 
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>> at 
>>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>> at 
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>> at 
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Traceback (most recent call last):
>>>   File 
>>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>>  line 609, in 
>>> query.awaitTermination()
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>>> line 101, in awaitTermination
>>>   File 
>>> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
>>> 1304, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>>> 117, in deco
>>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>>
>>>
>>> Question - what is the 

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

2022-02-26 Thread Gourav Sengupta
Hi,

Can you please let us know:
1. the SPARK version, and the kind of streaming query that you are running?
2. whether you are using at least once, utmost once, or only once concepts?
3. any additional details that you can provide, regarding the storage
duration in Kafka, etc?
4. are your running stateful or stateless operations? If you are using
stateful operations and SPARK 3.2 try to use RocksDB which is now natively
integrated with SPARK :)

Besides the mail sent by Mich, the following are useful:
1.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
(see the stop operation, and awaitTermination... operation)
2. Try to always ensure that you are doing exception handling based on the
option mentioned in the above link, long running streaming programmes in
distributed systems do have issues, and handling exceptions is important
3. There is another thing which I do, and it is around reading the
streaming metrics and pushing them for logging, that helps me to know in
long running system whether there are any performance issues or not (
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
or
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
. The following is an interesting reading on the kind of metrics to look
out for and the way to interpret them (
https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
)


Regards,
Gourav


On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh 
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang  wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
>> 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 
>> 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>> at scala.Option.getOrElse(Option.scala:189)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>> at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>> at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>> at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File 
>> "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
>>  line 609, in 
>> query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
>> line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
>> line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
>> 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets 

Re: How to gracefully shutdown Spark Structured Streaming

2022-02-26 Thread Gourav Sengupta
Dear Mich,

a super duper note of thanks, I had to spend around two weeks to figure
this out :)



Regards,
Gourav Sengupta

On Sat, Feb 26, 2022 at 10:43 AM Mich Talebzadeh 
wrote:

>
>
> On Mon, 26 Apr 2021 at 10:21, Mich Talebzadeh 
> wrote:
>
>>
>> Spark Structured Streaming AKA SSS is a very useful tool in dealing with
>> Event Driven Architecture. In an Event Driven Architecture, there is
>> generally a main loop that listens for events and then triggers a call-back
>> function when one of those events is detected. In a streaming application
>> the application waits to receive the source messages in a set interval or
>> whenever they happen and reacts accordingly.
>>
>> There are occasions that you may want to stop the Spark program
>> gracefully. Gracefully meaning that Spark application handles the last
>> streaming message completely and terminates the application. This is
>> different from invoking interrupt such as CTRL-C. Of course one can
>> terminate the process based on the following
>>
>>
>>1.
>>
>>query.awaitTermination() # Waits for the termination of this query,
>>with stop() or with error
>>2.
>>
>>query.awaitTermination(timeoutMs) # Returns true if this query is
>>terminated within the timeout in milliseconds.
>>
>> So the first one above waits until an interrupt signal is received. The
>> second one will count the timeout and will exit when timeout in
>> milliseconds is reached
>>
>> The issue is that one needs to predict how long the streaming job needs
>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>> may end up the processing terminated without a proper completion of the
>> streaming process.
>>
>> I have devised a method that allows one to terminate the spark
>> application internally after processing the last received message. Within
>> say 2 seconds of the confirmation of shutdown, the process will invoke
>>
>> How to shutdown the topic doing work for the message being processed,
>> wait for it to complete and shutdown the streaming process for a given
>> topic.
>>
>>
>> I thought about this and looked at options. Using sensors to
>> implement this like airflow would be expensive as for example reading a
>> file from object storage or from an underlying database would have incurred
>> additional I/O overheads through continuous polling.
>>
>>
>> So the design had to be incorporated into the streaming process itself.
>> What I came up with was an addition of a control topic (I call it newtopic
>> below), which keeps running triggered every 2 seconds say and is in json
>> format with the following structure
>>
>>
>> root
>>
>>  |-- newtopic_value: struct (nullable = true)
>>
>>  ||-- uuid: string (nullable = true)
>>
>>  ||-- timeissued: timestamp (nullable = true)
>>
>>  ||-- queue: string (nullable = true)
>>
>>  ||-- status: string (nullable = true)
>>
>> In above the queue refers to the business topic) and status is set to
>> 'true', meaning carry on processing the business stream. This control topic
>> streaming  can be restarted anytime, and status can be set to false if we
>> want to stop the streaming queue for a given business topic
>>
>> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
>> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
>> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>>
>> 64a8321c-1593-428b-ae65-89e45ddf0640
>> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
>> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>>
>> So how can I stop the business queue when the current business topic
>> message has been processed? Let us say the source is sending data for a
>> business topic every 30 seconds. Our control topic sends a one liner as
>> above every 2 seconds.
>>
>> In your writestream add the following line to be able to identify topic
>> name
>>
>> trigger(processingTime='30 seconds'). \
>> *queryName('md'). *\
>>
>> Next the controlling topic (called newtopic)  has the following
>>
>> foreachBatch(*sendToControl*). \
>> trigger(processingTime='2 seconds'). \
>> queryName('newtopic'). \
>>
>> That method sendToControl does what is needed
>>
>> def sendToControl(dfnewtopic, batchId):
>> 

Re: Non-Partition based Workload Distribution

2022-02-25 Thread Gourav Sengupta
Hi,

not quite sure here, but can you please share your code?

Regards,
Gourav Sengupta

On Thu, Feb 24, 2022 at 8:25 PM Artemis User  wrote:

> We got a Spark program that iterates through a while loop on the same
> input DataFrame and produces different results per iteration. I see
> through Spark UI that the workload is concentrated on a single core of
> the same worker.  Is there anyway to distribute the workload to
> different cores/workers, e.g. per iteration, since each iteration is not
> dependent from each other?
>
> Certainly this type of problem could be easily implemented using
> threads, e.g. spawn a child thread for each iteration, and wait at the
> end of the loop.  But threads apparently don't go beyond the worker
> boundary.  We also thought about using MapReduce, but it won't be
> straightforward since mapping only deals with rows, not at the dataframe
> level.  Any thoughts/suggestions are highly appreciated..
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

2022-02-25 Thread Gourav Sengupta
Hi,

can you please let us know the following:
1. the spark version
2. a few samples of input data
3. a few samples of what is the expected output that you want


Regards,
Gourav Sengupta

On Wed, Feb 23, 2022 at 8:43 PM karan alang  wrote:

> Hello All,
>
> I'm using StructuredStreaming, and am trying to use UDF to parse each row.
> Here is the requirement:
>
>- we can get alerts of a particular KPI with type 'major' OR 'critical'
>- for a KPI, if we get alerts of type 'major' eg _major, and we have a
>critical alert as well _critical, we need to ignore the _major alert, and
>consider _critical alert only
>
> There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)
>
> UDF Code (draft):
>
> @udf(returnType=StringType())def convertStructToStr(APP_CAUSE, tenantName, 
> window,,__major,__major, __critical, five__major, 
> __critical):
>
> res = "{window: "+ str(window) + "type: 10m, applianceName: "+ 
> str(APP_CAUSE)+","
> first = True
> for curr_alarm in AlarmKeys.alarms_all:
> alsplit = curr_alarm.split('__')
> if len(alsplit) == 2:
> # Only account for critical row if both major & critical are there
> if alsplit[1] == 'major':
> critical_alarm = alsplit[0] + "__critical"
> if int(col(critical_alarm)) > 0:
> continue
> if int(col(curr_alarm)) > 0:
> if first:
> mystring = "{} {}({})".format(mystring, 
> AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
> first = False
> else:
> mystring = "{}, {}({})".format(mystring, 
> AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
> res+="insight: "+mystring +"}"
>
> # structured streaming using udf, this is printing data on console# 
> eventually, i'll put data into Kafka instead
> df.select(convertStructToStr(*df.columns)) \
> .write \
> .format("console") \
> .option("numRows",100)\
> .option("checkpointLocation", 
> "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
> .option("outputMode", "complete")\
> .save("output")
>
> Additional Details in stackoverflow :
>
> https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t
>
>
> Question is -
>
> Can this be done using UDF ? Since I'm passing column values to the UDF, I
> have no way to check if a particular KPI of type 'critical' is available in
> the dataframe ?
>
> Any suggestions on the best way to solve this problem ?
> tia!
>
>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-24 Thread Gourav Sengupta
Dear Sean,

I do agree with you to a certain extent, makes sense. Perhaps I am wrong in
asking for native integrations and not depending on over engineered
external solutions which have their own performance issues, and bottlenecks
in live production environment. But asking and stating ones opinion should
be fine I think.

Just like inspite of having Pandas UDF we went for Koalas, similarly SPARK
native integrations which are light weight and easy to use and extend to
deep learning frameworks perhaps makes sense according to me.

Regards,
Gourav Sengupta

Regards,
Gourav Sengupta

On Thu, Feb 24, 2022 at 2:06 PM Sean Owen  wrote:

> On the contrary, distributed deep learning is not data parallel. It's
> dominated by the need to share parameters across workers.
> Gourav, I don't understand what you're looking for. Have you looked at
> Petastorm and Horovod? they _use Spark_, not another platform like Ray. Why
> recreate this which has worked for years? what would it matter if it were
> in the Spark project? I think you're on a limb there.
> One goal of Spark is very much not to build in everything that could exist
> as a library, and distributed deep learning remains an important but niche
> use case. Instead it provides the infra for these things, like barrier mode.
>
> On Thu, Feb 24, 2022 at 7:21 AM Bitfox  wrote:
>
>> I have been using tensorflow for a long time, it's not hard to implement
>> a distributed training job at all, either by model parallelization or data
>> parallelization. I don't think there is much need to develop spark to
>> support tensorflow jobs. Just my thoughts...
>>
>>
>> On Thu, Feb 24, 2022 at 4:36 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I do not think that there is any reason for using over engineered
>>> platforms like Petastorm and Ray, except for certain use cases.
>>>
>>> What Ray is doing, except for certain use cases, could have been easily
>>> done by SPARK, I think, had the open source community got that steer. But
>>> maybe I am wrong and someone should be able to explain why the SPARK open
>>> source community cannot develop the capabilities which are so natural to
>>> almost all use cases of data processing in SPARK where the data gets
>>> consumed by deep learning frameworks and we are asked to use Ray or
>>> Petastorm?
>>>
>>> For those of us who are asking what does native integrations means
>>> please try to compare delta between release 2.x and 3.x and koalas before
>>> 3.2 and after 3.2.
>>>
>>> I am sure that the SPARK community can push for extending the dataframes
>>> from SPARK to deep learning and other frameworks by natively integrating
>>> them.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-24 Thread Gourav Sengupta
Hi Bitfox,

yes distributed training using Pytorch and Tensorflow is really superb and
great and you are spot on. There is actually absolutely no need for
solutions like Ray/ Petastorm etc...

But in case I want to pre process data in SPARK and push the results to
these deep learning libraries, then what do we do? Because creating
professional quality data loaders is a very big job, therefore, these
solutions try to occupy that space as an entry point.


Regards,
Gourav Sengupta



On Thu, Feb 24, 2022 at 1:21 PM Bitfox  wrote:

> I have been using tensorflow for a long time, it's not hard to implement a
> distributed training job at all, either by model parallelization or data
> parallelization. I don't think there is much need to develop spark to
> support tensorflow jobs. Just my thoughts...
>
>
> On Thu, Feb 24, 2022 at 4:36 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> I do not think that there is any reason for using over engineered
>> platforms like Petastorm and Ray, except for certain use cases.
>>
>> What Ray is doing, except for certain use cases, could have been easily
>> done by SPARK, I think, had the open source community got that steer. But
>> maybe I am wrong and someone should be able to explain why the SPARK open
>> source community cannot develop the capabilities which are so natural to
>> almost all use cases of data processing in SPARK where the data gets
>> consumed by deep learning frameworks and we are asked to use Ray or
>> Petastorm?
>>
>> For those of us who are asking what does native integrations means please
>> try to compare delta between release 2.x and 3.x and koalas before 3.2 and
>> after 3.2.
>>
>> I am sure that the SPARK community can push for extending the dataframes
>> from SPARK to deep learning and other frameworks by natively integrating
>> them.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Wed, Feb 23, 2022 at 4:42 PM Dennis Suhari 
>> wrote:
>>
>>> Currently we are trying AnalyticsZoo and Ray
>>>
>>>
>>> Von meinem iPhone gesendet
>>>
>>> Am 23.02.2022 um 04:53 schrieb Bitfox :
>>>
>>> 
>>> tensorflow itself can implement the distributed computing via a
>>> parameter server. Why did you want spark here?
>>>
>>> regards.
>>>
>>> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>>>  wrote:
>>>
>>>> Thanks Sean for your response. !!
>>>>
>>>>
>>>>
>>>> Want to add some more background here.
>>>>
>>>>
>>>>
>>>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>>>
>>>> My use case is not for the image data but for the Time-series data
>>>> where I am using LSTM and transformers to forecast.
>>>>
>>>>
>>>>
>>>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>>>> there has been no major development recently on those libraries. I faced
>>>> the issue of version dependencies on those and had a hard time fixing the
>>>> library compatibilities. Hence a couple of below doubts:-
>>>>
>>>>
>>>>
>>>>- Does *Horovod* have any dependencies?
>>>>- Any other library which is suitable for my use case.?
>>>>- Any example code would really be of great help to understand.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Vijayant
>>>>
>>>>
>>>>
>>>> *From:* Sean Owen 
>>>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>>>> *To:* Vijayant Kumar 
>>>> *Cc:* user @spark 
>>>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>>>
>>>>
>>>>
>>>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware
>>>> of Phishing Scams, Report questionable emails to s...@mavenir.com
>>>>
>>>> Sure, Horovod is commonly used on Spark for this:
>>>>
>>>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>>>
>>>>
>>>>
>>>> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
>>>> vijayant.ku...@mavenir.com.invalid> wrote:
>>>>
>>>> Hi All,
>>>>
>>>>
>>>>
>>>> Anyone using Apache spark with TensorFlow for building models. My
>>>> requirement i

Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-24 Thread Gourav Sengupta
Hi,

I do not think that there is any reason for using over engineered platforms
like Petastorm and Ray, except for certain use cases.

What Ray is doing, except for certain use cases, could have been easily
done by SPARK, I think, had the open source community got that steer. But
maybe I am wrong and someone should be able to explain why the SPARK open
source community cannot develop the capabilities which are so natural to
almost all use cases of data processing in SPARK where the data gets
consumed by deep learning frameworks and we are asked to use Ray or
Petastorm?

For those of us who are asking what does native integrations means please
try to compare delta between release 2.x and 3.x and koalas before 3.2 and
after 3.2.

I am sure that the SPARK community can push for extending the dataframes
from SPARK to deep learning and other frameworks by natively integrating
them.


Regards,
Gourav Sengupta


On Wed, Feb 23, 2022 at 4:42 PM Dennis Suhari 
wrote:

> Currently we are trying AnalyticsZoo and Ray
>
>
> Von meinem iPhone gesendet
>
> Am 23.02.2022 um 04:53 schrieb Bitfox :
>
> 
> tensorflow itself can implement the distributed computing via a
> parameter server. Why did you want spark here?
>
> regards.
>
> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>  wrote:
>
>> Thanks Sean for your response. !!
>>
>>
>>
>> Want to add some more background here.
>>
>>
>>
>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>
>> My use case is not for the image data but for the Time-series data where
>> I am using LSTM and transformers to forecast.
>>
>>
>>
>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>> there has been no major development recently on those libraries. I faced
>> the issue of version dependencies on those and had a hard time fixing the
>> library compatibilities. Hence a couple of below doubts:-
>>
>>
>>
>>- Does *Horovod* have any dependencies?
>>- Any other library which is suitable for my use case.?
>>- Any example code would really be of great help to understand.
>>
>>
>>
>> Thanks,
>>
>> Vijayant
>>
>>
>>
>> *From:* Sean Owen 
>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>> *To:* Vijayant Kumar 
>> *Cc:* user @spark 
>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>
>>
>>
>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware of
>> Phishing Scams, Report questionable emails to s...@mavenir.com
>>
>> Sure, Horovod is commonly used on Spark for this:
>>
>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>
>>
>>
>> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
>> vijayant.ku...@mavenir.com.invalid> wrote:
>>
>> Hi All,
>>
>>
>>
>> Anyone using Apache spark with TensorFlow for building models. My
>> requirement is to use TensorFlow distributed model training across the
>> Spark executors.
>>
>> Please help me with some resources or some sample code.
>>
>>
>>
>> Thanks,
>>
>> Vijayant
>> --
>>
>> This e-mail message may contain confidential or proprietary information
>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>> use of the intended recipient(s). If you are not the intended recipient of
>> this message, you are hereby notified that any review, use or distribution
>> of this information is absolutely prohibited and we request that you delete
>> all copies in your control and contact us by e-mailing to
>> secur...@mavenir.com. This message contains the views of its author and
>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>> affiliates, who employ systems to monitor email messages, but make no
>> representation that such messages are authorized, secure, uncompromised, or
>> free from computer viruses, malware, or other defects. Thank You
>>
>> --
>>
>> This e-mail message may contain confidential or proprietary information
>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>> use of the intended recipient(s). If you are not the intended recipient of
>> this message, you are hereby notified that any review, use or distribution
>> of this information is absolutely prohibited and we request that you delete
>> all copies in your control and contact us by e-mailing to
>> secur...@mavenir.com. This message contains the views of its author and
>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>> affiliates, who employ systems to monitor email messages, but make no
>> representation that such messages are authorized, secure, uncompromised, or
>> free from computer viruses, malware, or other defects. Thank You
>>
>


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Gourav Sengupta
Hi,

this looks like a very specific and exact problem in its scope.

Do you think that you can load the data into panda dataframe and load it
back to SPARK using PANDAS UDF?

Koalas is now natively integrated with SPARK, try to see if you can use
those features.


Regards,
Gourav

On Wed, Feb 23, 2022 at 1:31 PM Sid  wrote:

> I have an excel file which unfortunately cannot be converted to CSV format
> and I am trying to load it using pyspark shell.
>
> I tried invoking the below pyspark session with the jars provided.
>
> pyspark --jars
> /home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar
>
> and below is the code to read the excel file:
>
> df = spark.read.format("excel") \
>  .option("dataAddress", "'Sheet1'!") \
>  .option("header", "true") \
>  .option("inferSchema", "true") \
> .load("/home/.../Documents/test_excel.xlsx")
>
> It is giving me the below error message:
>
>  java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>
> I tried several Jars for this error but no luck. Also, what would be the
> efficient way to load it?
>
> Thanks,
> Sid
>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-23 Thread Gourav Sengupta
Hi,

I am sure those who have actually built a data processing pipeline whose
contents have to be then delivered to tensorflow or pytorch (not for POC,
or writing a blog to get clicks, or resolving symptomatic bugs, but in real
life end-to-end application), will perhaps understand some of  the issues
because SPARK dataframes do not natively integrate with tensorflow/
pytorch.

But perhaps I am wrong.

My point of mentioning Ray is simple, it is based on the fact that if SPARK
were to be able to natively scale out and distribute data to tensorflow, or
pytorch then there will be competition between Ray and SPARK.

Regards,
Gourav Sengupta

On Wed, Feb 23, 2022 at 12:35 PM Sean Owen  wrote:

> Spark does do distributed ML, but not Tensorflow. Barrier execution mode
> is an element that things like Horovod uses. Not sure what you are getting
> at?
> Ray is not Spark.
> As I say -- Horovod does this already. The upside over TF distributed is
> that Spark sets up and manages the daemon processes rather than doing it by
> hand.
>
>
> On Wed, Feb 23, 2022 at 2:43 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> the SPARK community should have been able to build distributed ML
>> capabilities, and as far as I remember that was the idea initially behind
>> SPARK 3.x roadmap (barrier execution mode,
>> https://issues.apache.org/jira/browse/SPARK-24579).
>>
>> Ray, another Berkeley Labs output like SPARK, is trying to capture that
>> market space.
>>
>> I am not sure whether there is any steer by the SPARK community leaders
>> to seriously prioritise building those capabilities at all. But I am sure
>> if the brilliant and fantastic minds behind SPARK did actually want to
>> allow building those capabilities, they can easily do so, and achieve that
>> :)
>>
>> I would sincerely request the open source SPARK community to prioritise
>> building the SPARK capabilities to scale ML applications.
>>
>>
>>
>> Thanks and Regards,
>> Gourav Sengupta
>>
>> On Wed, Feb 23, 2022 at 3:53 AM Bitfox  wrote:
>>
>>> tensorflow itself can implement the distributed computing via a
>>> parameter server. Why did you want spark here?
>>>
>>> regards.
>>>
>>> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>>>  wrote:
>>>
>>>> Thanks Sean for your response. !!
>>>>
>>>>
>>>>
>>>> Want to add some more background here.
>>>>
>>>>
>>>>
>>>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>>>
>>>> My use case is not for the image data but for the Time-series data
>>>> where I am using LSTM and transformers to forecast.
>>>>
>>>>
>>>>
>>>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>>>> there has been no major development recently on those libraries. I faced
>>>> the issue of version dependencies on those and had a hard time fixing the
>>>> library compatibilities. Hence a couple of below doubts:-
>>>>
>>>>
>>>>
>>>>- Does *Horovod* have any dependencies?
>>>>- Any other library which is suitable for my use case.?
>>>>- Any example code would really be of great help to understand.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Vijayant
>>>>
>>>>
>>>>
>>>> *From:* Sean Owen 
>>>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>>>> *To:* Vijayant Kumar 
>>>> *Cc:* user @spark 
>>>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>>>
>>>>
>>>>
>>>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware
>>>> of Phishing Scams, Report questionable emails to s...@mavenir.com
>>>>
>>>> Sure, Horovod is commonly used on Spark for this:
>>>>
>>>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>>>
>>>>
>>>>
>>>> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
>>>> vijayant.ku...@mavenir.com.invalid> wrote:
>>>>
>>>> Hi All,
>>>>
>>>>
>>>>
>>>> Anyone using Apache spark with TensorFlow for building models. My
>>>> requirement is to use TensorFlow distributed model training across the
>>>> Spark executors.
>>>>
>>>> Please help me with some resources or some sample code.
>>&g

Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-23 Thread Gourav Sengupta
Hi,

the SPARK community should have been able to build distributed ML
capabilities, and as far as I remember that was the idea initially behind
SPARK 3.x roadmap (barrier execution mode,
https://issues.apache.org/jira/browse/SPARK-24579).

Ray, another Berkeley Labs output like SPARK, is trying to capture that
market space.

I am not sure whether there is any steer by the SPARK community leaders to
seriously prioritise building those capabilities at all. But I am sure if
the brilliant and fantastic minds behind SPARK did actually want to allow
building those capabilities, they can easily do so, and achieve that :)

I would sincerely request the open source SPARK community to prioritise
building the SPARK capabilities to scale ML applications.



Thanks and Regards,
Gourav Sengupta

On Wed, Feb 23, 2022 at 3:53 AM Bitfox  wrote:

> tensorflow itself can implement the distributed computing via a
> parameter server. Why did you want spark here?
>
> regards.
>
> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>  wrote:
>
>> Thanks Sean for your response. !!
>>
>>
>>
>> Want to add some more background here.
>>
>>
>>
>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>
>> My use case is not for the image data but for the Time-series data where
>> I am using LSTM and transformers to forecast.
>>
>>
>>
>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>> there has been no major development recently on those libraries. I faced
>> the issue of version dependencies on those and had a hard time fixing the
>> library compatibilities. Hence a couple of below doubts:-
>>
>>
>>
>>- Does *Horovod* have any dependencies?
>>- Any other library which is suitable for my use case.?
>>- Any example code would really be of great help to understand.
>>
>>
>>
>> Thanks,
>>
>> Vijayant
>>
>>
>>
>> *From:* Sean Owen 
>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>> *To:* Vijayant Kumar 
>> *Cc:* user @spark 
>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>
>>
>>
>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware of
>> Phishing Scams, Report questionable emails to s...@mavenir.com
>>
>> Sure, Horovod is commonly used on Spark for this:
>>
>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>
>>
>>
>> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
>> vijayant.ku...@mavenir.com.invalid> wrote:
>>
>> Hi All,
>>
>>
>>
>> Anyone using Apache spark with TensorFlow for building models. My
>> requirement is to use TensorFlow distributed model training across the
>> Spark executors.
>>
>> Please help me with some resources or some sample code.
>>
>>
>>
>> Thanks,
>>
>> Vijayant
>> --
>>
>> This e-mail message may contain confidential or proprietary information
>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>> use of the intended recipient(s). If you are not the intended recipient of
>> this message, you are hereby notified that any review, use or distribution
>> of this information is absolutely prohibited and we request that you delete
>> all copies in your control and contact us by e-mailing to
>> secur...@mavenir.com. This message contains the views of its author and
>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>> affiliates, who employ systems to monitor email messages, but make no
>> representation that such messages are authorized, secure, uncompromised, or
>> free from computer viruses, malware, or other defects. Thank You
>>
>> --
>>
>> This e-mail message may contain confidential or proprietary information
>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>> use of the intended recipient(s). If you are not the intended recipient of
>> this message, you are hereby notified that any review, use or distribution
>> of this information is absolutely prohibited and we request that you delete
>> all copies in your control and contact us by e-mailing to
>> secur...@mavenir.com. This message contains the views of its author and
>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>> affiliates, who employ systems to monitor email messages, but make no
>> representation that such messages are authorized, secure, uncompromised, or
>> free from computer viruses, malware, or other defects. Thank You
>>
>


Re: Spark Explain Plan and Joins

2022-02-21 Thread Gourav Sengupta
Hi,

I think that the best option is to use the SPARK UI. In SPARK 3.x the UI
and its additional settings are fantastic. Try to also see the settings for
Adaptive Query Execution in SPARK, under certain conditions it really works
wonders.

For certain long queries, the way you are finally triggering the action of
query execution, and whether you are using SPARK Dataframes or SPARK SQL,
and the settings in SPARK (look at the settings for SPARK 3.x) and a few
other aspects you will see that the plan is quite cryptic and difficult to
read sometimes.

Regards,
Gourav Sengupta

On Sun, Feb 20, 2022 at 7:32 PM Sid Kal  wrote:

> Hi Gourav,
>
> Right now I am just trying to understand the query execution plan by
> executing a simple join example via Spark SQL. The overall goal is to
> understand these plans so that going forward if my query runs slow due to
> data skewness or some other issues, I should be able to atleast understand
> what exactly is happening at the master and slave sides like map reduce.
>
> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> what are you trying to achieve by this?
>>
>> If there is a performance deterioration, try to collect the query
>> execution run time statistics from SPARK SQL. They can be seen from the
>> SPARK SQL UI and available over API's in case I am not wrong.
>>
>> Please ensure that you are not trying to over automate things.
>>
>> Reading how to understand the plans may be good depending on what you are
>> trying to do.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal  wrote:
>>
>>> I wrote a query like below and I am trying to understand its query
>>> execution plan.
>>>
>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
>>> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>>> == Parsed Logical Plan ==
>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>>:- 'SubqueryAlias a
>>>:  +- 'UnresolvedRelation [df], [], false
>>>+- 'SubqueryAlias b
>>>   +- 'UnresolvedRelation [df1], [], false
>>>
>>> == Analyzed Logical Plan ==
>>> CustomerID: int, CustomerName: string, state: string
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>:- SubqueryAlias a
>>>:  +- SubqueryAlias df
>>>: +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>+- SubqueryAlias b
>>>   +- SubqueryAlias df1
>>>  +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Optimized Logical Plan ==
>>> Project [CustomerID#640, CustomerName#641, state#988]
>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>:- Project [CustomerID#640, CustomerName#641]
>>>:  +- Filter isnotnull(CustomerID#640)
>>>: +-
>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>> csv
>>>+- Project [CustomerID#978, State#988]
>>>   +- Filter isnotnull(CustomerID#978)
>>>  +-
>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>> csv
>>>
>>> == Physical Plan ==
>>> *(5) Project [CustomerID#640, CustomerName#641, state#988]
>>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
>>>:- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioni

Re: Spark Explain Plan and Joins

2022-02-20 Thread Gourav Sengupta
Hi,

what are you trying to achieve by this?

If there is a performance deterioration, try to collect the query execution
run time statistics from SPARK SQL. They can be seen from the SPARK SQL UI
and available over API's in case I am not wrong.

Please ensure that you are not trying to over automate things.

Reading how to understand the plans may be good depending on what you are
trying to do.


Regards,
Gourav Sengupta

On Sat, Feb 19, 2022 at 10:00 AM Sid Kal  wrote:

> I wrote a query like below and I am trying to understand its query
> execution plan.
>
> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
> == Parsed Logical Plan ==
> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>:- 'SubqueryAlias a
>:  +- 'UnresolvedRelation [df], [], false
>+- 'SubqueryAlias b
>   +- 'UnresolvedRelation [df1], [], false
>
> == Analyzed Logical Plan ==
> CustomerID: int, CustomerName: string, state: string
> Project [CustomerID#640, CustomerName#641, state#988]
> +- Join Inner, (CustomerID#640 = CustomerID#978)
>:- SubqueryAlias a
>:  +- SubqueryAlias df
>: +-
> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
> csv
>+- SubqueryAlias b
>   +- SubqueryAlias df1
>  +-
> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
> csv
>
> == Optimized Logical Plan ==
> Project [CustomerID#640, CustomerName#641, state#988]
> +- Join Inner, (CustomerID#640 = CustomerID#978)
>:- Project [CustomerID#640, CustomerName#641]
>:  +- Filter isnotnull(CustomerID#640)
>: +-
> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
> csv
>+- Project [CustomerID#978, State#988]
>   +- Filter isnotnull(CustomerID#978)
>  +-
> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
> csv
>
> == Physical Plan ==
> *(5) Project [CustomerID#640, CustomerName#641, state#988]
> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
>:- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(CustomerID#640, 200),
> ENSURE_REQUIREMENTS, [id=#451]
>: +- *(1) Filter isnotnull(CustomerID#640)
>:+- FileScan csv [CustomerID#640,CustomerName#641] Batched:
> false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location:
> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
> struct
>+- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(CustomerID#978, 200),
> ENSURE_REQUIREMENTS, [id=#459]
>  +- *(3) Filter isnotnull(CustomerID#978)
> +- FileScan csv [CustomerID#978,State#988] Batched: false,
> DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location:
> InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
> PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
> struct
>
> I know some of the features like Project is like select clause, filters is
> whatever filters we use in the query. Where can I look for the cost
> optimization in this plan? Suppose in future if my query is taking a longer
> time to be executed then by looking at this plan how can I figure what
> exactly is happening and what needs to be modified on the query part? Also
> internally since spark by default uses sort merge join as I can see from
> the plan but when does it opts for Sort-Merge Join and when does it opts
> for Shuffle-Hash Join?
>
> Thanks,
> Sid
>
>


Re: Cast int to string not possible?

2022-02-18 Thread Gourav Sengupta
Hi Rico,

using SQL saves a lot of time, effort, and budget over the long term. But I
guess that there are certain joys in solving self induced complexities.

Thanks for sharing your findings.

Regards,
Gourav Sengupta

On Fri, Feb 18, 2022 at 7:26 AM Rico Bergmann  wrote:

> I found the reason why it did not work:
>
> When returning the Spark data type I was calling new StringType(). When
> changing it to DataTypes.StringType it worked.
>
> Greets,
> Rico.
>
> Am 17.02.2022 um 14:13 schrieb Gourav Sengupta  >:
>
> 
> Hi,
>
> can you please post a screen shot of the exact CAST statement that you are
> using? Did you use the SQL method mentioned by me earlier?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Feb 17, 2022 at 12:17 PM Rico Bergmann 
> wrote:
>
>> hi!
>>
>> Casting another int column that is not a partition column fails with the
>> same error.
>>
>> The Schema before the cast (column names are anonymized):
>>
>> root
>>
>> |-- valueObject: struct (nullable = true)
>>
>> ||-- value1: string (nullable = true)
>>
>> ||-- value2: string (nullable = true)
>>
>> ||-- value3: timestamp (nullable = true)
>>
>> ||-- value4: string (nullable = true)
>>
>> |-- partitionColumn2: string (nullable = true)
>>
>> |-- partitionColumn3: timestamp (nullable = true)
>>
>> |-- partitionColumn1: integer (nullable = true)
>>
>>
>> I wanted to cast partitionColumn1 to String which gives me the described
>> error.
>>
>>
>> Best,
>>
>> Rico
>>
>>
>>
>> Am 17.02.2022 um 09:56 schrieb ayan guha :
>>
>> 
>> Can you try to cast any other Int field which is NOT a partition column?
>>
>> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> This appears interesting, casting INT to STRING has never been an issue
>>> for me.
>>>
>>> Can you just help us with the output of : df.printSchema()  ?
>>>
>>> I prefer to use SQL, and the method I use for casting is: CAST(<>> name>> AS STRING) <>.
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
>>> wrote:
>>>
>>>> Here is the code snippet:
>>>>
>>>> var df = session.read().parquet(basepath);
>>>> for(Column partition : partitionColumnsList){
>>>>   df = df.withColumn(partition.getName(),
>>>> df.col(partition.getName()).cast(partition.getType()));
>>>> }
>>>>
>>>> Column is a class containing Schema Information, like for example the
>>>> name of the column and the data type of the column.
>>>>
>>>> Best, Rico.
>>>>
>>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>>> >
>>>> > Hi Rico, you have any code snippet? I have no problem casting int to
>>>> string.
>>>> >
>>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>>> >>
>>>> >> Hi!
>>>> >>
>>>> >> I am reading a partitioned dataFrame into spark using automatic type
>>>> inference for the partition columns. For one partition column the data
>>>> contains an integer, therefor Spark uses IntegerType for this column. In
>>>> general this is supposed to be a StringType column. So I tried to cast this
>>>> column to StringType. But this fails with AnalysisException “cannot cast
>>>> int to string”.
>>>> >>
>>>> >> Is this a bug? Or is it really not allowed to cast an int to a
>>>> string?
>>>> >>
>>>> >> I’m using Spark 3.1.1
>>>> >>
>>>> >> Best regards
>>>> >>
>>>> >> Rico.
>>>> >>
>>>> >> -
>>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >>
>>>> >
>>>> >
>>>> > -
>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>> --
>> Best Regards,
>> Ayan Guha
>>
>>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-17 Thread Gourav Sengupta
Hi,

The following excellent documentation may help as well:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
fundamental thinking behind these concepts.


Regards,
Gourav Sengupta



On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:

> Thanks, Mich .. will check it out
>
> regds,
> Karan Alang
>
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
> wrote:
>
>> BTW you can check this Linkedin article of mine on Processing Change
>> Data Capture with Spark Structured Streaming
>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>>
>>
>> It covers the concept of triggers including trigger(once = True) or
>> one-time batch in Spark Structured Streaming
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>
>>> Thanks, Mich .. that worked fine!
>>>
>>>
>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> read below
>>>>
>>>> """
>>>>"foreach" performs custom write logic on each row and
>>>> "foreachBatch" performs custom write logic on each micro-batch through
>>>> SendToBigQuery function
>>>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>>> batch --> batchId*
>>>>Using foreachBatch, we write each micro batch to storage
>>>> defined in our custom logic. In this case, we store the output of our
>>>> streaming application to Google BigQuery table.
>>>>Note that we are appending data and column "rowkey" is
>>>> defined as UUID so it can be used as the primary key
>>>> """
>>>> result = streamingDataFrame.select( \
>>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>>, col("parsed_value.ticker").alias("ticker") \
>>>>, col("parsed_value.timeissued").alias("timeissued")
>>>> \
>>>>, col("parsed_value.price").alias("price")). \
>>>>  writeStream. \
>>>>  outputMode('append'). \
>>>>  option("truncate", "false"). \
>>>>  *foreachBatch(SendToBigQuery)*. \
>>>>  trigger(processingTime='2 seconds'). \
>>>>  start()
>>>>
>>>> now you define your function *SendToBigQuery() *
>>>>
>>>>
>>>> *def SendToBigQuery(df, batchId):*
>>>>
>>>> if(len(df.take(1))) > 0:
>>>>
>>>> df.printSchema()
>>>>
>>>> print(f"""batchId is {batchId}""")
>>>>
>>>> rows = df.count()
>>>>
>>>> print(f""" Total records processed in this run = {rows}""")
>>>>
>>>> ..
>>>>
>>>> else:
>>>>
>>>> print("DataFrame is empty")
>>>>
>>>> *HTH*
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>>> transformation on each individual row.
>>>>>
>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>>> Basic question - how is the row passed to the function when foreach is
>>>>> used ?
>>>>>
>>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>>> function called ? How do I access individual rows ?
>>>>>
>>>>> Details are in stackoverflow :
>>>>>
>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>>
>>>>> What is the best approach for this use-case ?
>>>>>
>>>>> tia!
>>>>>
>>>>


Re: Cast int to string not possible?

2022-02-17 Thread Gourav Sengupta
Hi,

can you please post a screen shot of the exact CAST statement that you are
using? Did you use the SQL method mentioned by me earlier?

Regards,
Gourav Sengupta

On Thu, Feb 17, 2022 at 12:17 PM Rico Bergmann  wrote:

> hi!
>
> Casting another int column that is not a partition column fails with the
> same error.
>
> The Schema before the cast (column names are anonymized):
>
> root
>
> |-- valueObject: struct (nullable = true)
>
> ||-- value1: string (nullable = true)
>
> ||-- value2: string (nullable = true)
>
> ||-- value3: timestamp (nullable = true)
>
> ||-- value4: string (nullable = true)
>
> |-- partitionColumn2: string (nullable = true)
>
> |-- partitionColumn3: timestamp (nullable = true)
>
> |-- partitionColumn1: integer (nullable = true)
>
>
> I wanted to cast partitionColumn1 to String which gives me the described
> error.
>
>
> Best,
>
> Rico
>
>
>
> Am 17.02.2022 um 09:56 schrieb ayan guha :
>
> 
> Can you try to cast any other Int field which is NOT a partition column?
>
> On Thu, 17 Feb 2022 at 7:34 pm, Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> This appears interesting, casting INT to STRING has never been an issue
>> for me.
>>
>> Can you just help us with the output of : df.printSchema()  ?
>>
>> I prefer to use SQL, and the method I use for casting is: CAST(<> name>> AS STRING) <>.
>>
>> Regards,
>> Gourav
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann 
>> wrote:
>>
>>> Here is the code snippet:
>>>
>>> var df = session.read().parquet(basepath);
>>> for(Column partition : partitionColumnsList){
>>>   df = df.withColumn(partition.getName(),
>>> df.col(partition.getName()).cast(partition.getType()));
>>> }
>>>
>>> Column is a class containing Schema Information, like for example the
>>> name of the column and the data type of the column.
>>>
>>> Best, Rico.
>>>
>>> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
>>> >
>>> > Hi Rico, you have any code snippet? I have no problem casting int to
>>> string.
>>> >
>>> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
>>> >>
>>> >> Hi!
>>> >>
>>> >> I am reading a partitioned dataFrame into spark using automatic type
>>> inference for the partition columns. For one partition column the data
>>> contains an integer, therefor Spark uses IntegerType for this column. In
>>> general this is supposed to be a StringType column. So I tried to cast this
>>> column to StringType. But this fails with AnalysisException “cannot cast
>>> int to string”.
>>> >>
>>> >> Is this a bug? Or is it really not allowed to cast an int to a string?
>>> >>
>>> >> I’m using Spark 3.1.1
>>> >>
>>> >> Best regards
>>> >>
>>> >> Rico.
>>> >>
>>> >> -
>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >>
>>> >
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>> --
> Best Regards,
> Ayan Guha
>
>


Re: Cast int to string not possible?

2022-02-17 Thread Gourav Sengupta
Hi,

This appears interesting, casting INT to STRING has never been an issue for
me.

Can you just help us with the output of : df.printSchema()  ?

I prefer to use SQL, and the method I use for casting is: CAST(<> AS STRING) <>.

Regards,
Gourav






On Thu, Feb 17, 2022 at 6:02 AM Rico Bergmann  wrote:

> Here is the code snippet:
>
> var df = session.read().parquet(basepath);
> for(Column partition : partitionColumnsList){
>   df = df.withColumn(partition.getName(),
> df.col(partition.getName()).cast(partition.getType()));
> }
>
> Column is a class containing Schema Information, like for example the name
> of the column and the data type of the column.
>
> Best, Rico.
>
> > Am 17.02.2022 um 03:17 schrieb Morven Huang :
> >
> > Hi Rico, you have any code snippet? I have no problem casting int to
> string.
> >
> >> 2022年2月17日 上午12:26,Rico Bergmann  写道:
> >>
> >> Hi!
> >>
> >> I am reading a partitioned dataFrame into spark using automatic type
> inference for the partition columns. For one partition column the data
> contains an integer, therefor Spark uses IntegerType for this column. In
> general this is supposed to be a StringType column. So I tried to cast this
> column to StringType. But this fails with AnalysisException “cannot cast
> int to string”.
> >>
> >> Is this a bug? Or is it really not allowed to cast an int to a string?
> >>
> >> I’m using Spark 3.1.1
> >>
> >> Best regards
> >>
> >> Rico.
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Implementing circuit breaker pattern in Spark

2022-02-16 Thread Gourav Sengupta
Hi,
>From a technical perspective I think that we all agree, there are no
arguments.

>From a design/ architecture point of view, given that big data was supposed
to solve design challenges on volume, velocity, veracity, and variety, and
companies usually investing in data solutions build them to run
economically, with security, costs and other implications for at least 3 to
4 years.

There is an old saying, do not fit the solution to the problem. May be I do
not understand the problem, and therefore saying all wrong things :)


Regards,
Gourav Sengupta


On Wed, Feb 16, 2022 at 3:31 PM Sean Owen  wrote:

> There's nothing wrong with calling microservices this way. Something needs
> to call the service with all the data arriving, and Spark is fine for
> executing arbitrary logic including this kind of thing.
> Kafka does not change that?
>
> On Wed, Feb 16, 2022 at 9:24 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>> once again, just trying to understand the problem first.
>>
>> Why are we using SPARK to place calls to micro services? There are
>> several reasons why this should never happen, including costs/ security/
>> scalability concerns, etc.
>>
>> Is there a way that you can create a producer and put the data into Kafka
>> first?
>>
>> Sorry, I am not suggesting any solutions, just trying to understand the
>> problem first.
>>
>>
>> Regards,
>> Gourav
>>
>>
>>
>> On Wed, Feb 16, 2022 at 2:36 PM S  wrote:
>>
>>> No I want the job to stop and end once it discovers on repeated retries
>>> that the microservice is not responding. But I think I got where you were
>>> going right after sending my previous mail. Basically repeatedly failing of
>>> your tasks on retries ultimately fails your job anyway. So thats an
>>> in-built circuit breaker. So what that essentially means is we should not
>>> be catching those HTTP 5XX exceptions (which we currently do) and let the
>>> tasks fail on their own only for spark to retry them for finite number of
>>> times and then subsequently fail and thereby break the circuit. Thanks.
>>>
>>> On Wed, Feb 16, 2022 at 7:59 PM Sean Owen  wrote:
>>>
>>>> You stop the Spark job by tasks failing repeatedly, that's already how
>>>> it works. You can't kill the driver from the executor other ways, but
>>>> should not need to. I'm not clear, you're saying you want to stop the job,
>>>> but also continue processing?
>>>>
>>>> On Wed, Feb 16, 2022 at 7:58 AM S  wrote:
>>>>
>>>>> Retries have been already implemented. The question is how to stop the
>>>>> spark job by having an executor JVM send a signal to the driver JVM. e.g. 
>>>>> I
>>>>> have a microbatch of 30 messages; 10 in each of the 3 partitions. Let's 
>>>>> say
>>>>> while a partition of 10 messages was being processed, first 3 went through
>>>>> but then the microservice went down. Now when the 4th message in the
>>>>> partition is sent to the microservice it keeps receiving 5XX on every 
>>>>> retry
>>>>> e.g. 5 retries. What I now want is to have that task from that executor 
>>>>> JVM
>>>>> send a signal to the driver JVM to terminate the spark job on the failure
>>>>> of the 5th retry. Currently, what we have in place is retrying it 5 times
>>>>> and then upon failure i.e. 5XX catch the exception and move the message to
>>>>> a DLQ thereby having the flatmap produce a *None* and proceed to the
>>>>> next message in the partition of that microbatch. This approach keeps the
>>>>> pipeline alive and keeps pushing messages to DLQ microbatch after
>>>>> microbatch until the microservice is back up.
>>>>>
>>>>>
>>>>> On Wed, Feb 16, 2022 at 6:50 PM Sean Owen  wrote:
>>>>>
>>>>>> You could use the same pattern in your flatMap function. If you want
>>>>>> Spark to keep retrying though, you don't need any special logic, that is
>>>>>> what it would do already. You could increase the number of task retries
>>>>>> though; see the spark.excludeOnFailure.task.* configurations.
>>>>>>
>>>>>> You can just implement the circuit breaker pattern directly too,
>>>>>> nothing special there, though I don't think that's what you want? you
>>>>>> actually want to retry the failed attempts, not just avoid calling the
>>>>>> micros

Re: Which manufacturers' GPUs support Spark?

2022-02-16 Thread Gourav Sengupta
Hi,

100% agree with Sean, the entire RAPIDS solution is built by wonderful
people from NVIDIA.

Just out of curiosity, if you are using AWS then EMR already supports
RAPIDS, please try to use that. AWS has cheaper GPU's which can be used for
testing solutions.

For certain operations on SPARK the GPU's work fantastically well.


Regards,
Gourav Sengupta

On Wed, Feb 16, 2022 at 1:09 PM Sean Owen  wrote:

> Spark itself does not use GPUs, and is agnostic to what GPUs exist on a
> cluster, scheduled by the resource manager, and used by an application.
> In practice, virtually all GPU-related use cases (for deep learning for
> example) use CUDA, and this is NVIDIA-specific. Certainly, RAPIDS is from
> NVIDIA.
>
> On Wed, Feb 16, 2022 at 7:03 AM 15927907...@163.com <15927907...@163.com>
> wrote:
>
>> Hello,
>> We have done some Spark GPU accelerated work using the spark-rapids
>> component(https://github.com/NVIDIA/spark-rapids). However, we found
>> that this component currently only supports Nvidia GPU, and on the official
>> Spark website, we did not see the manufacturer's description of the GPU
>> supported by spark(
>> https://spark.apache.org/docs/3.2.1/configuration.html#custom-resource-scheduling-and-configuration-overview).
>> So, Can Spark also support GPUs from other manufacturers? such as AMD.
>> Looking forward to your reply.
>>
>> --
>> 15927907...@163.com
>>
>


Re: Implementing circuit breaker pattern in Spark

2022-02-16 Thread Gourav Sengupta
Hi,
once again, just trying to understand the problem first.

Why are we using SPARK to place calls to micro services? There are several
reasons why this should never happen, including costs/ security/
scalability concerns, etc.

Is there a way that you can create a producer and put the data into Kafka
first?

Sorry, I am not suggesting any solutions, just trying to understand the
problem first.


Regards,
Gourav



On Wed, Feb 16, 2022 at 2:36 PM S  wrote:

> No I want the job to stop and end once it discovers on repeated retries
> that the microservice is not responding. But I think I got where you were
> going right after sending my previous mail. Basically repeatedly failing of
> your tasks on retries ultimately fails your job anyway. So thats an
> in-built circuit breaker. So what that essentially means is we should not
> be catching those HTTP 5XX exceptions (which we currently do) and let the
> tasks fail on their own only for spark to retry them for finite number of
> times and then subsequently fail and thereby break the circuit. Thanks.
>
> On Wed, Feb 16, 2022 at 7:59 PM Sean Owen  wrote:
>
>> You stop the Spark job by tasks failing repeatedly, that's already how
>> it works. You can't kill the driver from the executor other ways, but
>> should not need to. I'm not clear, you're saying you want to stop the job,
>> but also continue processing?
>>
>> On Wed, Feb 16, 2022 at 7:58 AM S  wrote:
>>
>>> Retries have been already implemented. The question is how to stop the
>>> spark job by having an executor JVM send a signal to the driver JVM. e.g. I
>>> have a microbatch of 30 messages; 10 in each of the 3 partitions. Let's say
>>> while a partition of 10 messages was being processed, first 3 went through
>>> but then the microservice went down. Now when the 4th message in the
>>> partition is sent to the microservice it keeps receiving 5XX on every retry
>>> e.g. 5 retries. What I now want is to have that task from that executor JVM
>>> send a signal to the driver JVM to terminate the spark job on the failure
>>> of the 5th retry. Currently, what we have in place is retrying it 5 times
>>> and then upon failure i.e. 5XX catch the exception and move the message to
>>> a DLQ thereby having the flatmap produce a *None* and proceed to the
>>> next message in the partition of that microbatch. This approach keeps the
>>> pipeline alive and keeps pushing messages to DLQ microbatch after
>>> microbatch until the microservice is back up.
>>>
>>>
>>> On Wed, Feb 16, 2022 at 6:50 PM Sean Owen  wrote:
>>>
 You could use the same pattern in your flatMap function. If you want
 Spark to keep retrying though, you don't need any special logic, that is
 what it would do already. You could increase the number of task retries
 though; see the spark.excludeOnFailure.task.* configurations.

 You can just implement the circuit breaker pattern directly too,
 nothing special there, though I don't think that's what you want? you
 actually want to retry the failed attempts, not just avoid calling the
 microservice.

 On Wed, Feb 16, 2022 at 3:18 AM S  wrote:

> Hi,
>
> We have a spark job that calls a microservice in the lambda function
> of the flatmap transformation  -> passes to this microservice, the inbound
> element in the lambda function and returns the transformed value or "None"
> from the microservice as an output of this flatMap transform. Of course 
> the
> lambda also takes care of exceptions from the microservice etc.. The
> question is: there are times when the microservice may be down and there 
> is
> no point recording an exception and putting the message in the DLQ for
> every element in our streaming pipeline so long as the microservice stays
> down. Instead we want to be able to do is retry the microservice call for 
> a
> given event for a predefined no. of times and if found to be down then
> terminate the spark job so that this current microbatch is terminated and
> there is no next microbatch and the rest of the messages continue 
> therefore
> continue to be in the source kafka topics unpolled and therefore
> unprocesseed.  until the microservice is back up and the spark job is
> redeployed again. In regular microservices, we can implement this using 
> the
> Circuit breaker pattern. In Spark jobs however this would mean, being able
> to somehow send a signal from an executor JVM to the driver JVM to
> terminate the Spark job. Is there a way to do that in Spark?
>
> P.S.:
> - Having the circuit breaker functionality helps specificize the
> purpose of the DLQ to data or schema issues only instead of infra/network
> related issues.
> - As far as the need for the Spark job to use microservices is
> concerned, think of it as a complex logic being maintained in a
> microservice that does not warrant duplication.
> - 

Re: Deploying Spark on Google Kubernetes (GKE) autopilot, preliminary findings

2022-02-14 Thread Gourav Sengupta
Hi,

sorry in case it appeared otherwise, Mich's takes are super interesting.
Just that while applying solutions on commercial undertakings things are
quite different from research/ development scenarios .



Regards,
Gourav Sengupta





On Mon, Feb 14, 2022 at 5:02 PM ashok34...@yahoo.com.INVALID
 wrote:

> Thanks Mich. Very insightful.
>
>
> AK
> On Monday, 14 February 2022, 11:18:19 GMT, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Good question. However, we ought to look at what options we have so to
> speak.
>
> Let us consider Spark on Dataproc, Spark on Kubernetes and Spark on
> Dataflow
>
>
> Spark on DataProc <https://cloud.google.com/dataproc> is proven and it is
> in use at many organizations, I have deployed it extensively. It is
> infrastructure as a service provided including Spark, Hadoop and other
> artefacts. You have to manage cluster creation, automate cluster creation
> and tear down, submitting jobs etc. However, it is another stack that needs
> to be managed. It now has autoscaling
> <https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling>
> (enables cluster worker VM autoscaling ) policy as well.
>
> Spark on GKE
> <https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview>
> is something newer. Worth adding that the Spark DEV team are working hard
> to improve the performance of Spark on Kubernetes, for example, through 
> Support
> for Customized Kubernetes Scheduler
> <https://docs.google.com/document/d/1xgQGRpaHQX6-QH_J9YV2C2Dh6RpXefUpLM7KGkzL6Fg>.
> As I explained in the first thread, Spark on Kubernetes relies on
> containerisation. Containers make applications more portable. Moreover,
> they simplify the packaging of dependencies, especially with PySpark and
> enable repeatable and reliable build workflows which is cost effective.
> They also reduce the overall devops load and allow one to iterate on the
> code faster. From a purely cost perspective it would be cheaper with Docker 
> *as
> you can share resources* with your other services. You can create Spark
> docker with different versions of Spark, Scala, Java, OS etc. That docker
> file is portable. Can be used on Prem, AWS, GCP etc in container registries
> and devops and data science people can share it as well. Built once used by
> many. Kubernetes with autopilo
> <https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview#:~:text=Autopilot%20is%20a%20new%20mode,and%20yield%20higher%20workload%20availability.>t
> helps scale the nodes of the Kubernetes cluster depending on the load. *That
> is what I am currently looking into*.
>
> With regard to Dataflow <https://cloud.google.com/dataflow/docs>, which I
> believe is similar to AWS Glue
> <https://aws.amazon.com/glue/?whats-new-cards.sort-by=item.additionalFields.postDateTime=desc>,
> it is a managed service for executing data processing patterns. Patterns or
> pipelines are built with the Apache Beam SDK
> <https://beam.apache.org/documentation/runners/spark/>, which is an open
> source programming model that supports Java, Python and GO. It enables
> batch and streaming pipelines. You create your pipelines with an Apache
> Beam program and then run them on the Dataflow service. The Apache Spark
> Runner
> <https://beam.apache.org/documentation/runners/spark/#:~:text=The%20Apache%20Spark%20Runner%20can,Beam%20pipelines%20using%20Apache%20Spark.=The%20Spark%20Runner%20executes%20Beam,same%20security%20features%20Spark%20provides.>
> can be used to execute Beam pipelines using Spark. When you run a job on
> Dataflow, it spins up a cluster of virtual machines, distributes the tasks
> in the job to the VMs, and dynamically scales the cluster based on how the
> job is performing. As I understand both iterative processing and notebooks
> plus Machine learning with Spark ML are not currently supported by Dataflow
>
> So we have three choices here. If you are migrating from on-prem
> Hadoop/spark/YARN set-up, you may go for Dataproc which will provide the
> same look and feel. If you want to use microservices and containers in your
> event driven architecture, you can adopt docker images that run on
> Kubernetes clusters, including Multi-Cloud Kubernetes Cluster. Dataflow is
> probably best suited for green-field projects.  Less operational
> overhead, unified approach for batch and streaming pipelines.
>
> *So as ever your mileage varies*. If you want to migrate from your
> existing Hadoop/Spark cluster to GCP, or take advantage of your existing
> workforce, choose Dataproc or GKE. In many cases, a big consideration is
> that one already has a codebase written against a particular framework, and
> one just wants

Re: Deploying Spark on Google Kubernetes (GKE) autopilot, preliminary findings

2022-02-14 Thread Gourav Sengupta
Hi,

I would still not build any custom solution, and if in GCP use serverless
Dataproc. I think that it is always better to be hands on with AWS Glue
before commenting on it.

Regards,
Gourav Sengupta

On Mon, Feb 14, 2022 at 11:18 AM Mich Talebzadeh 
wrote:

> Good question. However, we ought to look at what options we have so to
> speak.
>
> Let us consider Spark on Dataproc, Spark on Kubernetes and Spark on
> Dataflow
>
>
> Spark on DataProc <https://cloud.google.com/dataproc> is proven and it is
> in use at many organizations, I have deployed it extensively. It is
> infrastructure as a service provided including Spark, Hadoop and other
> artefacts. You have to manage cluster creation, automate cluster creation
> and tear down, submitting jobs etc. However, it is another stack that needs
> to be managed. It now has autoscaling
> <https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling>
> (enables cluster worker VM autoscaling ) policy as well.
>
> Spark on GKE
> <https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview>
> is something newer. Worth adding that the Spark DEV team are working hard
> to improve the performance of Spark on Kubernetes, for example, through 
> Support
> for Customized Kubernetes Scheduler
> <https://docs.google.com/document/d/1xgQGRpaHQX6-QH_J9YV2C2Dh6RpXefUpLM7KGkzL6Fg>.
> As I explained in the first thread, Spark on Kubernetes relies on
> containerisation. Containers make applications more portable. Moreover,
> they simplify the packaging of dependencies, especially with PySpark and
> enable repeatable and reliable build workflows which is cost effective.
> They also reduce the overall devops load and allow one to iterate on the
> code faster. From a purely cost perspective it would be cheaper with Docker 
> *as
> you can share resources* with your other services. You can create Spark
> docker with different versions of Spark, Scala, Java, OS etc. That docker
> file is portable. Can be used on Prem, AWS, GCP etc in container registries
> and devops and data science people can share it as well. Built once used by
> many. Kubernetes with autopilo
> <https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview#:~:text=Autopilot%20is%20a%20new%20mode,and%20yield%20higher%20workload%20availability.>t
> helps scale the nodes of the Kubernetes cluster depending on the load. *That
> is what I am currently looking into*.
>
> With regard to Dataflow <https://cloud.google.com/dataflow/docs>, which I
> believe is similar to AWS Glue
> <https://aws.amazon.com/glue/?whats-new-cards.sort-by=item.additionalFields.postDateTime=desc>,
> it is a managed service for executing data processing patterns. Patterns or
> pipelines are built with the Apache Beam SDK
> <https://beam.apache.org/documentation/runners/spark/>, which is an open
> source programming model that supports Java, Python and GO. It enables
> batch and streaming pipelines. You create your pipelines with an Apache
> Beam program and then run them on the Dataflow service. The Apache Spark
> Runner
> <https://beam.apache.org/documentation/runners/spark/#:~:text=The%20Apache%20Spark%20Runner%20can,Beam%20pipelines%20using%20Apache%20Spark.=The%20Spark%20Runner%20executes%20Beam,same%20security%20features%20Spark%20provides.>
> can be used to execute Beam pipelines using Spark. When you run a job on
> Dataflow, it spins up a cluster of virtual machines, distributes the tasks
> in the job to the VMs, and dynamically scales the cluster based on how the
> job is performing. As I understand both iterative processing and notebooks
> plus Machine learning with Spark ML are not currently supported by Dataflow
>
> So we have three choices here. If you are migrating from on-prem
> Hadoop/spark/YARN set-up, you may go for Dataproc which will provide the
> same look and feel. If you want to use microservices and containers in your
> event driven architecture, you can adopt docker images that run on
> Kubernetes clusters, including Multi-Cloud Kubernetes Cluster. Dataflow is
> probably best suited for green-field projects.  Less operational
> overhead, unified approach for batch and streaming pipelines.
>
> *So as ever your mileage varies*. If you want to migrate from your
> existing Hadoop/Spark cluster to GCP, or take advantage of your existing
> workforce, choose Dataproc or GKE. In many cases, a big consideration is
> that one already has a codebase written against a particular framework, and
> one just wants to deploy it on the GCP, so even if, say, the Beam
> programming mode/dataflow is superior to Hadoop, someone with a lot of
> Hadoop code might still choose Dataproc or GDE for the time being, rather
>

Re: Deploying Spark on Google Kubernetes (GKE) autopilot, preliminary findings

2022-02-13 Thread Gourav Sengupta
Hi,
may be this is useful in case someone is testing SPARK in containers for
developing SPARK.

*From a production scale work point of view:*
But if I am in AWS, I will just use GLUE if I want to use containers for
SPARK, without massively increasing my costs for operations unnecessarily.

Also, in case I am not wrong, GCP already has SPARK running in serverless
mode.  Personally I would never create the overhead of additional costs and
issues to my clients of deploying SPARK when those solutions are already
available by Cloud vendors. Infact, that is one of the precise reasons why
people use cloud - to reduce operational costs.

Sorry, just trying to understand what is the scope of this work.


Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 8:35 PM Mich Talebzadeh 
wrote:

> The equivalent of Google GKE autopilot
> <https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview> 
> in
> AWS is AWS Fargate <https://aws.amazon.com/fargate/>
>
>
> I have not used the AWS Fargate so I can only mension Google's GKE
> Autopilot.
>
>
> This is developed from the concept of containerization and microservices.
> In the standard mode of creating a GKE cluster users can customize their
> configurations based on the requirements, GKE manages the control plane and
> users manually provision and manage their node infrastructure. So you
> choose your hardware type and memory/CPU where your spark containers will
> be running and they will be shown as VM hosts in your account. In GKE
> Autopilot mode, GKE manages the nodes, pre-configures the cluster with
> adds-on for auto-scaling, auto-upgrades, maintenance, Day 2 operations and
> security hardening. So there is a lot there. You don't choose your nodes
> and their sizes. You are effectively paying for the pods you use.
>
>
> Within spark-submit, you still need to specify the number of executors,
> driver and executor memory plus cores for each driver and executor when
> doing spark-submit. The theory is that the k8s cluster will deploy suitable
> nodes and will create enough pods on those nodes. With the standard k8s
> cluster you choose your nodes and you ensure that one core on each node is
> reserved for the OS itself. Otherwise if you allocate all cores to spark
> with --conf spark.executor.cores, you will receive this error
>
>
> kubctl describe pods -n spark
>
> ...
>
> Events:
>
>   Type Reason Age From
> Message
>
>    -- 
> ---
>
>   Warning  FailedScheduling   9s (x17 over 15m)   default-scheduler   0/3
> nodes are available: 3 Insufficient cpu.
>
> So with the standard k8s you have a choice of selecting your core sizes.
> With autopilot this node selection is left to autopilot to deploy suitable
> nodes and this will be a trial and error at the start (to get the
> configuration right). You may be lucky if the history of executions are
> kept current and the same job can be repeated. However, in my experience,
> to procedure the driver pod in "running state" is expensive timewise and
> without an executor in running state, there is no chance of spark job doing
> anything
>
>
> NAME READY   STATUSRESTARTS
>  AGE
>
> randomdatabigquery-cebab77eea6de971-exec-1   0/1 Pending   0
> 31s
>
> randomdatabigquery-cebab77eea6de971-exec-2   0/1 Pending   0
> 31s
>
> randomdatabigquery-cebab77eea6de971-exec-3   0/1 Pending   0
> 31s
>
> randomdatabigquery-cebab77eea6de971-exec-4   0/1 Pending   0
> 31s
>
> randomdatabigquery-cebab77eea6de971-exec-5   0/1 Pending   0
> 31s
>
> randomdatabigquery-cebab77eea6de971-exec-6   0/1 Pending   0
> 31s
>
> sparkbq-37405a7eea6b9468-driver  1/1 Running   0
> 3m4s
>
>
> NAME READY   STATUS
> RESTARTS   AGE
>
> randomdatabigquery-cebab77eea6de971-exec-6   0/1 ContainerCreating
>  0  112s
>
> sparkbq-37405a7eea6b9468-driver  1/1 Running
>  0  4m25s
>
> NAME READY   STATUSRESTARTS
>  AGE
>
> randomdatabigquery-cebab77eea6de971-exec-6   1/1 Running   0
> 114s
>
> sparkbq-37405a7eea6b9468-driver  1/1 Running   0
> 4m27s
>
> Basically I told Spak to have 6 executors but could only bring into
> running state one executor after the driver pod spinning for 4 minutes.
>
> 22/02/11 20:16:18 INFO SparkKubernetesClientFactory: Auto-configuring K8S
> client using current context from users K8S config file
>
> 22/02/11 20:16:19 INFO Utils: Using initial executors = 6, max of
> spark.dynamicA

Re: Unable to access Google buckets using spark-submit

2022-02-12 Thread Gourav Sengupta
Hi,

agree with Holden, have faced quite a few issues with FUSE.

Also trying to understand "spark-submit from local" . Are you submitting
your SPARK jobs from a local laptop or in local mode from a GCP dataproc /
system?

If you are submitting the job from your local laptop, there will be
performance bottlenecks I guess based on the internet bandwidth and volume
of data.

Regards,
Gourav


On Sat, Feb 12, 2022 at 7:12 PM Holden Karau  wrote:

> You can also put the GS access jar with your Spark jars — that’s what the
> class not found exception is pointing you towards.
>
> On Fri, Feb 11, 2022 at 11:58 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> BTW I also answered you in in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 12 Feb 2022 at 08:24, Mich Talebzadeh 
>> wrote:
>>
>>> You are trying to access a Google storage bucket gs:// from your local
>>> host.
>>>
>>> It does not see it because spark-submit assumes that it is a local file
>>> system on the host which is not.
>>>
>>> You need to mount gs:// bucket as a local file system.
>>>
>>> You can use the tool called gcsfuse
>>> https://cloud.google.com/storage/docs/gcs-fuse . Cloud Storage FUSE is
>>> an open source FUSE  adapter that allows
>>> you to mount Cloud Storage buckets as file systems on Linux or macOS
>>> systems. You can download gcsfuse from here
>>> 
>>>
>>>
>>> Pretty simple.
>>>
>>>
>>> It will be installed as /usr/bin/gcsfuse and you can mount it by
>>> creating a local mount file like /mnt/gs as root and give permission to
>>> others to use it.
>>>
>>>
>>> As a normal user that needs to access gs:// bucket (not as root), use
>>> gcsfuse to mount it. For example I am mounting a gcs bucket called
>>> spark-jars-karan here
>>>
>>>
>>> Just use the bucket name itself
>>>
>>>
>>> gcsfuse spark-jars-karan /mnt/gs
>>>
>>>
>>> Then you can refer to it as /mnt/gs in spark-submit from on-premise host
>>>
>>> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 
>>> --jars /mnt/gs/spark-bigquery-with-dependencies_2.12-0.23.2.jar
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Feb 2022 at 04:31, karan alang  wrote:
>>>
 Hello All,

 I'm trying to access gcp buckets while running spark-submit from local,
 and running into issues.

 I'm getting error :
 ```

 22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 Exception in thread "main" 
 org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for 
 scheme "gs"

 ```
 I tried adding the --conf
 spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

 to the spark-submit command, but getting ClassNotFoundException

 Details are in stackoverflow :

 https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit

 Any ideas on how to fix this ?
 tia !

 --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Unable to force small partitions in streaming job without repartitioning

2022-02-12 Thread Gourav Sengupta
hi,

Did you try to sorting while writing out the data? All of this engineering
may not be required in that case.




Regards,
Gourav Sengupta

On Sat, Feb 12, 2022 at 8:42 PM Chris Coutinho 
wrote:

> Setting the option in the cluster configuration solved the issue, and now
> we're able to specify the row group size based on the block size as
> intended.
>
> Thanks!
>
> On Fri, Feb 11, 2022 at 6:59 PM Adam Binford  wrote:
>
>> Writing to Delta might not support the write.option method. We set
>> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>>
>> Adam
>>
>> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
>> wrote:
>>
>>> I tried re-writing the table with the updated block size but it doesn't
>>> appear to have an effect on the row group size.
>>>
>>> ```pyspark
>>> df = spark.read.format("delta").load("/path/to/source1")
>>>
>>> df.write \
>>> .format("delta") \
>>> .mode("overwrite") \
>>> .options(**{
>>>   "parquet.block.size": "1m",
>>> }) \
>>> .partitionBy("date") \
>>> .save("/path/to/source2")
>>> ```
>>>
>>> The files created by this job are about 20m in size. Using
>>> `parquet-tools` I can inspect a single file and see the following 12m file
>>> contains a single row group - not the expected 12 based on the block size:
>>>
>>> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>>>  file meta data 
>>> created_by: parquet-mr version 1.10.1-databricks9 (build
>>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>>> num_columns: 19
>>> num_rows: 369483
>>> num_row_groups: 1
>>> format_version: 1.0
>>> serialized_size: 6364
>>>
>>>  Columns 
>>> ...
>>>
>>> Chris
>>>
>>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>>>
>>>> It should just be parquet.block.size indeed.
>>>> spark.write.option("parquet.block.size", "16m").parquet(...)
>>>> This is an issue in how you write, not read, the parquet.
>>>>
>>>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <
>>>> chrisbcouti...@gmail.com> wrote:
>>>>
>>>>> Hi Adam,
>>>>>
>>>>> Thanks for the explanation on the empty partitions.
>>>>>
>>>>> We have the freedom to adjust how the source table is written, so if
>>>>> there are any improvements we can implement on the source side we'd be
>>>>> happy to look into that.
>>>>>
>>>>> It's not yet clear to me how you can reduce the row group size of the
>>>>> parquet files, I see some mention of `parquet.block.size` online , as well
>>>>> as various map reduce settings regarding file splitting (SO:
>>>>> mapred-min-split-size-in-hdfs
>>>>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
>>>>> however, I don't quite understand the link between the splitting settings,
>>>>> row group configuration, and resulting number of records when reading from
>>>>> a delta table.
>>>>>
>>>>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
>>>>> storage.
>>>>>
>>>>> Best,
>>>>> Chris
>>>>>
>>>>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford 
>>>>> wrote:
>>>>>
>>>>>> The smallest unit of work you can do on a parquet file (under the
>>>>>> delta hood) is based on the parquet row group size, which by default is
>>>>>> 128mb. If you specify maxPartitionBytes of 10mb, what that will basically
>>>>>> do is create a partition for each 10mb of a file, but whatever partition
>>>>>> covers the part of the file where the row group starts will consume the
>>>>>> entire row group. That's why you're seeing a lot of empty partitions and 
>>>>>> a
>>>>>> small number with the rest of the actual data.
>>>>>>
>>>>>> Can't think of any solution other than repartitioning (or rewriting
>>>>>> the input Delta table with a much smaller row group size which wouldn't 
>>>>>> be
>>>&g

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Gourav Sengupta
Hi,

just trying to understand the problem before solving it.

1. you mentioned "The primary key of the static table is non-unique". This
appears to be a design flaw to me.

2. you once again mentioned "The Pandas UDF is then applied to the
resulting stream-static join and stored in a table. To avoid OOM errors on
the executors, we need to start with very small (~10MB) partitions to
account for the expansion. Currently this only seems possible by explicitly
repartitioning the data, incurring the perf cost associated with the
shuffle." Should the shuffle not be happening as it is because you are
joining the records?

Another question:
I am not sure of your notation of "keys", when you are joining the table
are you using single column or multiple columns, are you expecting a
cartesian product to happen during the join, or the number of records
exploding will be at max the number of duplicates in the static table?

Obviously I do not clearly understand the problem, therefore all the
suggestions can be wrong, but without over engineering  have you simply
tried to store the data by sorting it on the PK (the one that is non unique
and in the static table) while running VACCUM?

Ofcourse the above solution assumes that volume of data for a particular
key in the static table fits into an executor memory along with the
subsequent operations

Another thing that you might want to enable is Adaptive Query Execution,
and whether it is enabled properly by reading its settings.


Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 6:00 PM Adam Binford  wrote:

> Writing to Delta might not support the write.option method. We set
> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>
> Adam
>
> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
> wrote:
>
>> I tried re-writing the table with the updated block size but it doesn't
>> appear to have an effect on the row group size.
>>
>> ```pyspark
>> df = spark.read.format("delta").load("/path/to/source1")
>>
>> df.write \
>> .format("delta") \
>> .mode("overwrite") \
>> .options(**{
>>   "parquet.block.size": "1m",
>> }) \
>> .partitionBy("date") \
>> .save("/path/to/source2")
>> ```
>>
>> The files created by this job are about 20m in size. Using
>> `parquet-tools` I can inspect a single file and see the following 12m file
>> contains a single row group - not the expected 12 based on the block size:
>>
>> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>>  file meta data 
>> created_by: parquet-mr version 1.10.1-databricks9 (build
>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>> num_columns: 19
>> num_rows: 369483
>> num_row_groups: 1
>> format_version: 1.0
>> serialized_size: 6364
>>
>>  Columns 
>> ...
>>
>> Chris
>>
>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>>
>>> It should just be parquet.block.size indeed.
>>> spark.write.option("parquet.block.size", "16m").parquet(...)
>>> This is an issue in how you write, not read, the parquet.
>>>
>>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
>>> wrote:
>>>
>>>> Hi Adam,
>>>>
>>>> Thanks for the explanation on the empty partitions.
>>>>
>>>> We have the freedom to adjust how the source table is written, so if
>>>> there are any improvements we can implement on the source side we'd be
>>>> happy to look into that.
>>>>
>>>> It's not yet clear to me how you can reduce the row group size of the
>>>> parquet files, I see some mention of `parquet.block.size` online , as well
>>>> as various map reduce settings regarding file splitting (SO:
>>>> mapred-min-split-size-in-hdfs
>>>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
>>>> however, I don't quite understand the link between the splitting settings,
>>>> row group configuration, and resulting number of records when reading from
>>>> a delta table.
>>>>
>>>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
>>>> storage.
>>>>
>>>> Best,
>>>> Chris
>>>>
>>>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:
>>>>
>>>>> The smallest unit of work you can do on a parquet file (under the
>>>>> delta hood) is based on the parquet row group size,

Re: Using Avro file format with SparkSQL

2022-02-11 Thread Gourav Sengupta
Hi Anna,

Avro libraries should be inbuilt in SPARK in case I am not wrong. Any
particular reason why you are using a deprecated or soon to be deprecated
version of SPARK?

SPARK 3.2.1 is fantastic.

Please do let us know about your set up if possible.


Regards,
Gourav Sengupta

On Thu, Feb 10, 2022 at 3:35 AM Karanika, Anna  wrote:

> Hello,
>
> I have been trying to use spark SQL’s operations that are related to the
> Avro file format,
> e.g., stored as, save, load, in a Java class but they keep failing with
> the following stack trace:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:  Failed
> to find data source: avro. Avro is built-in but external data source module
> since Spark 2.4. Please deploy the application as per the deployment
> section of "Apache Avro Data Source Guide".
> at
> org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindAvroDataSourceError(QueryCompilationErrors.scala:1032)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
> at
> org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at xsys.fileformats.SparkSQLvsAvro.main(SparkSQLvsAvro.java:57)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:564)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> For context, I am invoking spark-submit and adding arguments --packages
> org.apache.spark:spark-avro_2.12:3.2.0.
> Yet, Spark responds as if the dependency was not added.
> I am running spark-v3.2.0 (Scala 2.12).
>
> On the other hand, everything works great with spark-shell or spark-sql.
>
> I would appreciate any advice or feedback to get this running.
>
> Thank you,
> Anna
>
>


Re: data size exceeds the total ram

2022-02-11 Thread Gourav Sengupta
Hi,

I am in a meeting, but you can look out for a setting that tells spark how
many bytes to read from a file at one go.

I use SQL, which  is far better in case you are using dataframes.

As we do not still know what is the SPARK version that you are using, it
may cause issues around skew, and there are different ways to manage that
depending on the SPARK version.



Thanks and Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 11:09 AM frakass  wrote:

> Hello list
>
> I have imported the data into spark and I found there is disk IO in
> every node. The memory didn't get overflow.
>
> But such query is quite slow:
>
>  >>> df.groupBy("rvid").agg({'rate':'avg','rvid':'count'}).show()
>
>
> May I ask:
> 1. since I have 3 nodes (as known as 3 executors?), are there 3
> partitions for each job?
> 2. can I expand the partition by hand to increase the performance?
>
> Thanks
>
>
>
> On 2022/2/11 6:22, frakass wrote:
> >
> >
> > On 2022/2/11 6:16, Gourav Sengupta wrote:
> >> What is the source data (is it JSON, CSV, Parquet, etc)? Where are you
> >> reading it from (JDBC, file, etc)? What is the compression format (GZ,
> >> BZIP, etc)? What is the SPARK version that you are using?
> >
> > it's a well built csv file (no compressed) stored in HDFS.
> > spark 3.2.0
> >
> > Thanks.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: data size exceeds the total ram

2022-02-11 Thread Gourav Sengupta
Hi,

just so that we understand the problem first?

What is the source data (is it JSON, CSV, Parquet, etc)? Where are you
reading it from (JDBC, file, etc)? What is the compression format (GZ,
BZIP, etc)? What is the SPARK version that you are using?


Thanks and Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 9:39 AM Mich Talebzadeh 
wrote:

> Well one experiment is worth many times more than asking what/if scenario
> question.
>
>
>1. Try running it first to see how spark handles it
>2. Go to spark GUI (on port 4044) and look at the storage tab and see
>what it says
>3. Unless you explicitly persist the data, Spark will read the data
>using appropriate partitions given the memory size and cluster count. As
>long as there is sufficient disk space (not memory), Spark will handle
>files larger than the available memory. However, If you do persist,
>you will get an Out of Memory error
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Feb 2022 at 09:23, frakass  wrote:
>
>> Hello
>>
>> I have three nodes with total memory 128G x 3 = 384GB
>> But the input data is about 1TB.
>> How can spark handle this case?
>>
>> Thanks.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: add an auto_increment column

2022-02-08 Thread Gourav Sengupta
Hi,

so do you want to rank apple and tomato both as 2? Not quite clear on the
use case here though.

Regards,
Gourav Sengupta

On Tue, Feb 8, 2022 at 7:10 AM  wrote:

>
> Hello Gourav
>
>
> As you see here orderBy has already give the solution for "equal
> amount":
>
> >>> df =
> >>>
> sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |cherry| 5|
> | apple| 3|
> |tomato| 3|
> |orange| 2|
> +--+--+
>
>
> I want to add a column at the right whose name is "top" and the value
> auto_increment from 1 to N.
>
> Thank you.
>
>
>
> On 08/02/2022 13:52, Gourav Sengupta wrote:
> > Hi,
> >
> > sorry once again, will try to understand the problem first :)
> >
> > As we can clearly see that the initial responses were incorrectly
> > guessing the solution to be monotonically_increasing function
> >
> > What if there are two fruits with equal amount? For any real life
> > application, can we understand what are trying to achieve by the
> > rankings?
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Tue, Feb 8, 2022 at 4:22 AM ayan guha  wrote:
> >
> >> For this req you can rank or dense rank.
> >>
> >> On Tue, 8 Feb 2022 at 1:12 pm,  wrote:
> >>
> >>> Hello,
> >>>
> >>> For this query:
> >>>
> >>>>>> df.select("*").orderBy("amount",ascending=False).show()
> >>> +--+--+
> >>> | fruit|amount|
> >>> +--+--+
> >>> |tomato| 9|
> >>> | apple| 6|
> >>> |cherry| 5|
> >>> |orange| 3|
> >>> +--+--+
> >>>
> >>> I want to add a column "top", in which the value is: 1,2,3...
> >>> meaning
> >>> top1, top2, top3...
> >>>
> >>> How can I do it?
> >>>
> >>> Thanks.
> >>>
> >>> On 07/02/2022 21:18, Gourav Sengupta wrote:
> >>>> Hi,
> >>>>
> >>>> can we understand the requirement first?
> >>>>
> >>>> What is that you are trying to achieve by auto increment id? Do
> >>> you
> >>>> just want different ID's for rows, or you may want to keep track
> >>> of
> >>>> the record count of a table as well, or do you want to do use
> >>> them for
> >>>> surrogate keys?
> >>>>
> >>>> If you are going to insert records multiple times in a table,
> >>> and
> >>>> still have different values?
> >>>>
> >>>> I think without knowing the requirements all the above
> >>> responses, like
> >>>> everything else where solutions are reached before understanding
> >>> the
> >>>> problem, has high chances of being wrong.
> >>>>
> >>>> Regards,
> >>>> Gourav Sengupta
> >>>>
> >>>> On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj
> >>> 
> >>>> wrote:
> >>>>
> >>>>> Monotonically_increasing_id() will give the same functionality
> >>>>>
> >>>>> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
> >>>>>
> >>>>>> For a dataframe object, how to add a column who is
> >>> auto_increment
> >>>>>> like
> >>>>>> mysql's behavior?
> >>>>>>
> >>>>>> Thank you.
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> > -
> >>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >>>
> >>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> --
> >> Best Regards,
> >> Ayan Guha
>


Re: add an auto_increment column

2022-02-07 Thread Gourav Sengupta
Hi,

sorry once again, will try to understand the problem first :)

As we can clearly see that the initial responses were incorrectly guessing
the solution to be monotonically_increasing function

What if there are two fruits with equal amount? For any real life
application, can we understand what are trying to achieve by the rankings?



Regards,
Gourav Sengupta

On Tue, Feb 8, 2022 at 4:22 AM ayan guha  wrote:

> For this req you can rank or dense rank.
>
> On Tue, 8 Feb 2022 at 1:12 pm,  wrote:
>
>> Hello,
>>
>> For this query:
>>
>> >>> df.select("*").orderBy("amount",ascending=False).show()
>> +--+--+
>> | fruit|amount|
>> +--+--+
>> |tomato| 9|
>> | apple| 6|
>> |cherry| 5|
>> |orange| 3|
>> +--+--+
>>
>>
>> I want to add a column "top", in which the value is: 1,2,3... meaning
>> top1, top2, top3...
>>
>> How can I do it?
>>
>> Thanks.
>>
>>
>>
>>
>> On 07/02/2022 21:18, Gourav Sengupta wrote:
>> > Hi,
>> >
>> > can we understand the requirement first?
>> >
>> > What is that you are trying to achieve by auto increment id? Do you
>> > just want different ID's for rows, or you may want to keep track of
>> > the record count of a table as well, or do you want to do use them for
>> > surrogate keys?
>> >
>> > If you are going to insert records multiple times in a table, and
>> > still have different values?
>> >
>> > I think without knowing the requirements all the above responses, like
>> > everything else where solutions are reached before understanding the
>> > problem, has high chances of being wrong.
>> >
>> > Regards,
>> > Gourav Sengupta
>> >
>> > On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj 
>> > wrote:
>> >
>> >> Monotonically_increasing_id() will give the same functionality
>> >>
>> >> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
>> >>
>> >>> For a dataframe object, how to add a column who is auto_increment
>> >>> like
>> >>> mysql's behavior?
>> >>>
>> >>> Thank you.
>> >>>
>> >>>
>> >>
>> > -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: add an auto_increment column

2022-02-07 Thread Gourav Sengupta
Hi,

can we understand the requirement first?

What is that you are trying to achieve by auto increment id? Do you just
want different ID's for rows, or you may want to keep track of the record
count of a table as well, or do you want to do use them for surrogate keys?
If you are going to insert records multiple times in a table, and still
have different values?

I think without knowing the requirements all the above responses, like
everything else where solutions are reached before understanding the
problem, has high chances of being wrong.


Regards,
Gourav Sengupta

On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj  wrote:

> Monotonically_increasing_id() will give the same functionality
>
> On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
>
>> For a dataframe object, how to add a column who is auto_increment like
>> mysql's behavior?
>>
>> Thank you.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: A Persisted Spark DataFrame is computed twice

2022-02-01 Thread Gourav Sengupta
Hi,

Can you please try to use SPARK SQL, instead of dataframes and see the
difference?

You will get a lot of theoretical arguments, and that is fine, but they are
just largely and essentially theories.

Also try to apply the function to the result of the filters as a sub-query
by caching in the data of the filters first.



Regards,
Gourav Sengupta

On Mon, Jan 31, 2022 at 8:00 AM Benjamin Du  wrote:

> I don't think coalesce (by repartitioning I assume you mean coalesce)
> itself and deserialising takes that much time. To add a little bit more
> context, the computation of the DataFrame is CPU intensive instead of
> data/IO intensive. I purposely keep coalesce​ after df.count​ as I want
> to keep the large number of partitions (30k) when computing the DataFrame
> so that I can get a much higher parallelism. After the computation, I
> reduce the number of partitions (to avoid having too many small files on
> HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k
> partitions is used) and write it to disk (without doing repartitioning or
> coalesce). If I manually write the computed DataFrame to disk, read it
> back, coalesce it and then write it back to disk, it also takes about 5
> hours. The code that I pasted in this thread takes forever to run as the
> DataFrame is obviously recomputed at df.coalesce​ and with a parallelism
> of 300 partitions, it is almost impossible to compute the DataFrame in a
> reasonable amount of time.
>
> I tried various ways but none of them worked except manually write to
> disk, read it back, repartition/coalesce it, and then write it back to
> HDFS.
>
>1. checkpoint by itself computer the DataFrame twice. (This is a known
>existing bug of checkpoint).
>
> output_mod = f"{output}/job={mod}"
> spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .checkpoint() \
> .coalesce(300) \
> .write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to Disk) + count computer the DataFrame twice.
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist to memory + count computes the DataFrame twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.MEMORY_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to memory) + checkpoint + coalesce computes the DataFrame
>twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>

Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-30 Thread Gourav Sengupta
Hi,

Can you please try to see if you can increase the number of cores per task,
and therefore give each task more memory per executor?

I do not understand what is the XML, what is the data in it, and what is
the problem that you are trying to solve writing UDF's to parse XML. So
maybe we are not actually solving the problem and just addressing the issue.


Regards,
Gourav Sengupta

On Wed, Jan 26, 2022 at 4:07 PM Sean Owen  wrote:

> Really depends on what your UDF is doing. You could read 2GB of XML into
> much more than that as a DOM representation in memory.
> Remember 15GB of executor memory is shared across tasks.
> You need to get a handle on what memory your code is using to begin with
> to start to reason about whether that's enough, first.
>
> On Wed, Jan 26, 2022 at 10:03 AM Abhimanyu Kumar Singh <
> abhimanyu.kr.sing...@gmail.com> wrote:
>
>> Thanks for your quick response.
>>
>> For some reasons I can't use spark-xml (schema related issue).
>>
>> I've tried reducing number of tasks per executor by increasing the number
>> of executors, but it still throws same error.
>>
>> I can't understand why does even 15gb of executor memory is not
>> sufficient to parse just 2gb XML file.
>> How can I check the max amount of JVM memory utilised for each task?
>>
>> Do I need to tweak some other configurations for increasing JVM memory
>> rather than spark.executor.memory?
>>
>> On Wed, Jan 26, 2022, 9:23 PM Sean Owen  wrote:
>>
>>> Executor memory used shows data that is cached, not the VM usage. You're
>>> running out of memory somewhere, likely in your UDF, which probably parses
>>> massive XML docs as a DOM first or something. Use more memory, fewer tasks
>>> per executor, or consider using spark-xml if you are really just parsing
>>> pieces of it. It'll be more efficient.
>>>
>>> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
>>> abhimanyu.kr.sing...@gmail.com> wrote:
>>>
>>>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>>>
>>>> Dataframe:
>>>> | value |
>>>> | Content of XML File 1 |
>>>> | Content of XML File 2 |
>>>> | Content of XML File N |
>>>>
>>>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>>>
>>>> UDF looks something like:
>>>>
>>>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>>>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>>>
>>>> Parsing requires creation and de-duplication of arrays from the XML
>>>> containing
>>>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>>>> Integers,  )).
>>>>
>>>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>>>> Spark processing fails
>>>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>>>> When I increase the executor size (say 15GB to 25 GB) it works fine.
>>>> One partition can contain only
>>>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>>>
>>>> *My question is which memory is being used by UDF for storing arrays,
>>>> maps or sets while parsing?*
>>>> *And how can I configure it?*
>>>>
>>>> Should I increase *spark*.*memory*.*offHeap*.size,
>>>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>>>
>>>> Thanks a lot,
>>>> Abhimanyu
>>>>
>>>> PS: I know I shouldn't use UDF this way, but I don't have any other
>>>> alternative here.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Gourav Sengupta
Hi,

without getting into suppositions, the best option is to look into the
SPARK UI SQL section.

It is the most wonderful tool to explain what is happening, and why. In
SPARK 3.x they have made the UI even better, with different set of
granularity and details.

On another note, you might want to read the difference between repartition
and coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
wrote:

> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du,  wrote:
>
>> I have some PySpark code like below. Basically, I persist a DataFrame
>> (which is time-consuming to compute) to disk, call the method
>> DataFrame.count to trigger the caching/persist immediately, and then I
>> coalesce the DataFrame to reduce the number of partitions (the original
>> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
>> execution time of job stages and the execution plan, it seems to me that
>> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
>> this happens?
>>
>> df = spark.read.parquet("/input/hdfs/path") \
>> .filter(...) \
>> .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>> .persist(StorageLevel.DISK_ONLY)
>> df.count()
>> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>>
>>
>> BTW, it works well if I manually write the DataFrame to HDFS, read it
>> back, coalesce it and write it back to HDFS.
>> Originally post at
>> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
>> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>>
>> Best,
>>
>> 
>>
>> Ben Du
>>
>> Personal Blog <http://www.legendu.net/> | GitHub
>> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
>> | Docker Hub <https://hub.docker.com/r/dclong/>
>>
>


Re: How to delete the record

2022-01-30 Thread Gourav Sengupta
Hi,

I think it will be useful to understand the problem before solving the
problem.

Can I please ask what this table is? Is it a fact (event store) kind of a
table, or a dimension (master data) kind of table? And what are the
downstream consumptions of this table?

Besides that what is the unique identifier for a record in this table? For
example, some master data tables have unique identifiers as phone numbers,
which can get reallocated to other individuals over a period of time.

Is there any other information that you can provide on this
table, its contents, usage, etc?

There is a third option, which is akin to the second option that Mich was
mentioning, and that is basically a database transaction log, which gets
very large, very expensive to store and query over a period of time. Are
you creating a database transaction log?


Thanks and Regards,
Gourav Sengupta


On Thu, Jan 27, 2022 at 9:03 PM ayan guha  wrote:

> Btw, 2 options Mitch explained are not mutually exclusive. Option 2 can
> and should be implemented over a delta lake table anyway. Especially if you
> need to do hard deletes eventually (eg for regulatory needs)
>
>
>
> On Fri, 28 Jan 2022 at 6:50 am, Sid Kal  wrote:
>
>> Thanks Mich and Sean for your time
>>
>> On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, 
>> wrote:
>>
>>> Yes I believe so.
>>>
>>> Check this article of mine dated early 2019 but will have some relevance
>>> to what I am implying.
>>>
>>>
>>> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 18:46, Sid Kal  wrote:
>>>
>>>> Okay sounds good.
>>>>
>>>> So,  below two options would help me to capture CDC changes:
>>>>
>>>> 1) Delta lake
>>>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>>>
>>>> Correct me if I'm wrong
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, 
>>>> wrote:
>>>>
>>>>> There are two ways of doing it.
>>>>>
>>>>>
>>>>>1. Through snapshot offered meaning an immutable snapshot of the
>>>>>state of the table at a given version. For example, the state
>>>>><https://books.japila.pl/delta-lake-internals/Snapshot/#state> of
>>>>>a Delta table
>>>>><https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>>>the version
>>>>><https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>>>2. Creating your own versioning. Taking your example you define
>>>>>the target storage *with two added columns, namely:* op_type INT
>>>>>(1-inset,2-update,3-delete) and op_timeTIMESTAMP >>>> ingestion_time>.
>>>>>Your example record will be
>>>>>
>>>>>
>>>>> id   op_type  op_time
>>>>>
>>>>> 11 
>>>>>
>>>>> 13 
>>>>>
>>>>>
>>>>>df = rdd.toDF(). \
>>>>>
>>>>> withColumnRenamed("_1", "ID"). \
>>>>>
>>>>> withColumnRenamed("_2", "CLUSTERED"). \
>>>>>
>>>>> withColumnRenamed("_3", "SCATTERED"). \
>>>>>
>>>>> withColumnRenamed("_4", "RANDOMISED"). \
>>>>>
>>>>> withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>>
>>>>> withColumnRenamed("_6", "SMALL_VC"). \
>>>>>
>>>>> withColumnRenamed("_7", "PADDIN

Re: how can I remove the warning message

2022-01-30 Thread Gourav Sengupta
Hi,

I have often found that logging in the warnings is extremely useful, they
are just logs, and provide a lot of insights during upgrades, external
package loading, deprecation, debugging, etc.

Do you have any particular reason to disable the warnings in a submitted
job?

I used to disable warnings in spark-shell  using the
Logger.getLogger("akka").setLevel(Level.OFF) in case I have not completely
forgotten. Other details are mentioned here:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html



Regards,
Gourav Sengupta

On Fri, Jan 28, 2022 at 11:14 AM  wrote:

> When I submitted the job from scala client, I got the warning messages:
>
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future
> release
>
> How can I just remove those messages?
>
> spark: 3.2.0
> scala: 2.13.7
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka to spark streaming

2022-01-30 Thread Gourav Sengupta
Hi Amit,

before answering your question, I am just trying to understand it.

I am not exactly clear how do the Akka application, Kafka and SPARK
Streaming application sit together, and what are you exactly trying to
achieve?

Can you please elaborate?

Regards,
Gourav


On Fri, Jan 28, 2022 at 10:14 PM Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Re: Small optimization questions

2022-01-28 Thread Gourav Sengupta
Hi,

have you looked at the function which says record number in SPARK SQL, or
SPARK functions.

Having more volume of data means that you may get into memory issues as
well. Therefore you scale in and scale out rule has to be responsive, as
mentioned by Sean, along with the number of core per tasks to take care of
memory.

We do not have any other data regarding your clusters or environments
therefore it is difficult to imagine things and provide more information.

Regards,
Gourav Sengupta

On Thu, Jan 27, 2022 at 12:58 PM Aki Riisiö  wrote:

> Ah, sorry for spamming, I found the answer from documentation. Thank you
> for the clarification!
>
> Best regards, Aki Riisiö
>
> On Thu, 27 Jan 2022 at 10:39, Aki Riisiö  wrote:
>
>> Hello.
>>
>> Thank you for the reply again. I just checked how many tasks are spawned
>> when we read the data from S3 and in the latest run, this was a little over
>> 23000. What determines the amount of tasks during the read? Is it directly
>> corresponding to the number of files to be read?
>>
>> Thank you.
>>
>> On Tue, 25 Jan 2022 at 17:35, Sean Owen  wrote:
>>
>>> Yes, you will end up with 80 partitions, and if you write the result,
>>> you end up with 80 files. If you don't have at least 80 partitions, there
>>> is no point in have 80 cores. You will probably see 56 are idle even under
>>> load.
>>> The partitionBy might end up causing the whole job to have more
>>> partitions anyway. I would settle this by actually watching how many tasks
>>> the streaming job spawns. Is it 1, 24, more?
>>>
>>> On Tue, Jan 25, 2022 at 7:57 AM Aki Riisiö  wrote:
>>>
>>>> Thank you for the reply.
>>>> The stream is partitioned by year/month/day/hour, and we read the data
>>>> once a day, so we are reading 24 partitions.
>>>>
>>>> " A crude rule of thumb is to have 2-3x as many tasks as cores" thank
>>>> you very much, I will set this as default. Will this however change, if we
>>>> also partition the data by year/month/day/hour? If I set:
>>>> df.repartition(80),write ... partitionBy("year", "month", "day",
>>>> "hour"), will this cause each hour to have 80 output files?
>>>>
>>>> The output data in a "normal" run is very small, so a big partition
>>>> size would result in a large number of too small files.
>>>> I am not sure how Glue autoscales itself, but I definitely need to look
>>>> that up a bit more.
>>>>
>>>> One of our jobs actually has a requirement to have only one
>>>> output-file, so is the only way to achieve that by repartition(1)? As I
>>>> understand it, this is a major issue in performance.
>>>>
>>>> Thank you!
>>>>
>>>>
>>>> On Tue, 25 Jan 2022 at 15:29, Sean Owen  wrote:
>>>>
>>>>> How many partitions does the stream have? With 80 cores, you need at
>>>>> least 80 tasks to even take advantage of them, so if it's less than 80, at
>>>>> least .repartition(80). A crude rule of thumb is to have 2-3x as many 
>>>>> tasks
>>>>> as cores, to help even out differences in task size by more finely
>>>>> distributing the work. You might even go for more. I'd watch the task
>>>>> length, and as long as the tasks aren't completing in a few seconds or
>>>>> less, you probably don't have too many.
>>>>>
>>>>> This is also a good reason to use autoscaling, so that when not busy
>>>>> you can (for example) scale down to 1 executor, but under load, scale up 
>>>>> to
>>>>> 10 or 20 machines if needed. That is also a good reason to repartition
>>>>> more, so that it's possible to take advantage of more parallelism when
>>>>> needed.
>>>>>
>>>>> On Tue, Jan 25, 2022 at 7:07 AM Aki Riisiö 
>>>>> wrote:
>>>>>
>>>>>> Hello.
>>>>>>
>>>>>> We have a very simple AWS Glue job running with Spark: get some
>>>>>> events from Kafka stream, do minor transformations, and write to S3.
>>>>>>
>>>>>> Recently, there was a change in Kafka topic which suddenly increased
>>>>>> our data size * 10 and at the same time we were testing with different
>>>>>> repartition values during df.repartition(n).write ...
>>>>>> At the time when Kafka 

Re: [Spark ML Pipeline]: Error Loading Pipeline Model with Custom Transformer

2022-01-12 Thread Gourav Sengupta
Hi,

may be I have less time, but can you please add some inline comments in
your code to explain what you are trying to do?

Regards,
Gourav Sengupta



On Tue, Jan 11, 2022 at 5:29 PM Alana Young  wrote:

> I am experimenting with creating and persisting ML pipelines using custom
> transformers (I am using Spark 3.1.2). I was able to create a transformer
> class (for testing purposes, I modeled the code off the SQLTransformer
> class) and save the pipeline model. When I attempt to load the saved
> pipeline model, I am running into the following error:
>
> java.lang.NullPointerException at
> java.base/java.lang.reflect.Method.invoke(Method.java:559) at
> org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:631)
> at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> scala.collection.TraversableLike.map(TraversableLike.scala:238) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
> org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356)
> at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160) at
> org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155) at
> org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355)
> at
> org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
> at scala.util.Try$.apply(Try.scala:213) at
> org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355)
> at
> org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349)
> ... 38 elided
>
>
> Here is a gist
> <https://gist.github.com/ally1221/5acddd9650de3dc67f6399a4687893aa> containing
> the relevant code. Any feedback and advice would be appreciated. Thank
> you.
>


Re: pyspark loop optimization

2022-01-11 Thread Gourav Sengupta
Hi,

I am not sure what you are trying to achieve here are cume_dist and
percent_rank not different?

If am able to follow your question correctly, you are looking for filtering
our NULLs before applying the function on the dataframe, and I think it
will be fine if you just create another dataframe first with the non null
values and then apply the function to that dataframe.

It will be of much help if you can explain what are you trying to achieve
here. Applying loops on dataframe, like you have done in the dataframe is
surely not recommended at all, please see the explain plan of the dataframe
in each iteration to understand the effect of your loops on the explain
plan - that should give some details.


Regards,
Gourav Sengupta

On Mon, Jan 10, 2022 at 10:49 PM Ramesh Natarajan 
wrote:

> I want to compute cume_dist on a bunch of columns in a spark dataframe,
> but want to remove NULL values before doing so.
>
> I have this loop in pyspark. While this works, I see the driver runs at
> 100% while the executors are idle for the most part. I am reading that
> running a loop is an anti-pattern and should be avoided. Any pointers on
> how to optimize this section of pyspark code?
>
> I am running this on  the AWS Glue 3.0 environment.
>
> for column_name, new_col in [
> ("event_duration", "percentile_rank_evt_duration"),
> ("event_duration_pred", "percentile_pred_evt_duration"),
> ("alarm_cnt", "percentile_rank_alarm_cnt"),
> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
> ("event_duration_adj", "percentile_rank_evt_duration_adj"),
> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
> ("encounter_time", "percentile_rank_encounter_time"),
> ("encounter_time_pred", "percentile_pred_encounter_time"),
> ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
> ]:
> win = (
> Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
>  .orderBy(col(column_name))
> )
> df1 = df.filter(F.col(column_name).isNull())
> df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
> new_col, F.round(F.cume_dist().over(win) *
> lit(100)).cast("integer")
> )
> df = df2.unionByName(df1, allowMissingColumns=True)
>
> For some reason this code seems to work faster, but it doesn't remove
> NULLs prior to computing the cume_dist. Not sure if this is also a proper
> way to do this :(
>
> for column_name, new_col in [
> ("event_duration", "percentile_rank_evt_duration"),
> ("event_duration_pred", "percentile_pred_evt_duration"),
> ("alarm_cnt", "percentile_rank_alarm_cnt"),
> ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
> ("event_duration_adj", "percentile_rank_evt_duration_adj"),
> ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
> ("encounter_time", "percentile_rank_encounter_time"),
> ("encounter_time_pred", "percentile_pred_encounter_time"),
> ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
> ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
> ]:
> win = (
> Window().partitionBy(["p_customer_name", "p_site_name",
> "year_month"])
> .orderBy(col(column_name))
> )
> df = df.withColumn(
> new_col,
> F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
> F.round(F.percent_rank().over(win) *
> lit(100)).cast("integer")
> ),
> )
>
> Appreciate if anyone has any pointers on how to go about this..
>
> thanks
> Ramesh
>


Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Gourav Sengupta
Hi,
I do not think we need to do any of that. Please try repartitionbyrange,
dpark 3 has adaptive query execution with configurations to handle skew as
well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson  wrote:

> HI Gourav
>
>
>
> When I join I get OOM. To address this my thought was to split my tables
> into small batches of rows. And then join the batch together then use
> union. My assumption is the union is a narrow transform and as such require
> fewer resources. Let say I have 5 data frames I want to join together and
> each has 300 rows
>
>
>
> I want to create 15 data frames.
>
>
>
> Set1 = {11, 12, 13, 14, 15}
>
>
>
> Set2 = {21, 22, 23, 24, 25}
>
>
>
> Set3 = {31, 32, 33, 34, 35)
>
>
>
> The joined the “batch
>
> S1joinDF = 11.join(12).join(13).join(14).join(15)
>
>
>
> S2joinDF = 21.join(22).join(23).join(24).join(25)
>
>
>
> S3joinDF = 31.join(32).join(33).join(34).join(35)
>
>
>
> resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
>
>
>
> The I originally wrote my code is as follows. Based on my unit test it
> turns out I need to call orderBy on every iteration of the for loop. I
> tried sorting outside of the while loop, did not resolve problem Given the
> size of my dataframes that is going crush performance. My unit test works.
> I never ran it on my real data set.
>
>
>
> # Create a copy of original *dataframe*
>
> copyDF = df.orderBy("Name")
>
> # copyDF.show()
>
>
>
> i = 0
>
> while i < numberOfSplits:
>
> self.logger.warn("i:{}".format(i))
>
> # Get the top `numRows` number of rows
>
> # note take() is an action
>
> # limit() is a transformation
>
> topDF = copyDF.limit( numRows )
>
>
>
> # Truncate the `copy_df` to remove
>
> # the contents fetched for `temp_df`
>
> # original quant.sf files are sorted by name however
>
> # we must use order by, else the row names between
>
> # GTEx sample will not be the same
>
> # we can not simply sort or orderBy once. we have to
>
> # do this on every iteration
>
> copyDF = copyDF.subtract(topDF).orderBy( "Name" )
>
>
>
> retList[i] = topDF
>
>
>
> # Increment the split number
>
> i += 1
>
>
>
> if remainingRows > 0 :
>
> self.logger.info("AEDWIP writing last i:{} 
> *len*(retList):{}".format(i,
> len(retList)))
>
> retList[i] = copyDF
>
> #copyDF.show()
>
> #retList[i].show()
>
>
>
>
>
> okay so that the background. Rather than use order by. I thought if I
> could add a row number I could easily split up mydata frames. My code would
> look a lot like what I would write in pandas or R
>
>
>
> *while* i *<* numBatches:
>
> start *=* i *** numRows
>
> end *=* start *+* numRows
>
> print("\ni:{} start:{} end:{}"*.*format(i, start,end))
>
> df *=* trainDF*.*iloc[ start:end ]
>
>
>
> There does not seem to be an easy way to do this.
>
>
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
>
> The generated ID is guaranteed to be monotonically increasing and unique,
> but not consecutive.
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Monday, January 10, 2022 at 11:03 AM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How to add a row number column with out reordering my data
> frame
>
>
>
> Hi,
>
>
>
> I am a bit confused here, it is not entirely clear to me why are you
> creating the row numbers, and how creating the row numbers helps you with
> the joins?
>
>
>
> Can you please explain with some sample data?
>
>
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
> wrote:
>
> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>

Re: How to add a row number column with out reordering my data frame

2022-01-10 Thread Gourav Sengupta
Hi,

I am a bit confused here, it is not entirely clear to me why are you
creating the row numbers, and how creating the row numbers helps you with
the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
> w = Window().orderBy(lit('A'))
>
> sampleDF = sampleDF.select( ["NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )\
>
> .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>


Re: hive table with large column data size

2022-01-10 Thread Gourav Sengupta
Hi,

As always, before answering the question, can I please ask what are you
trying to achieve by storing the data in a table? How are you planning to
query a binary data?

If you look at any relational theory, then it states that a table is a
relation/ entity and the fields the attributes. You might consider an image
to be an attribute of a tuple (or record) belonging to a particular
relation, but there might be more efficient methods of storing the binary
data, but it all depends on what are you trying to do?

For the data types please look here:
https://spark.apache.org/docs/latest/sql-ref-datatypes.html. Parquet is
definitely a columnar format, and if I am not entirely wrong, it definitely
supports columnar reading of data by default in SPARK.


Regards,
Gourav Sengupta

On Sun, Jan 9, 2022 at 2:34 PM weoccc  wrote:

> Hi ,
>
> I want to store binary data (such as images) into hive table but the
> binary data column might be much larger than other columns per row.  I'm
> worried about the query performance. One way I can think of is to separate
> binary data storage from other columns by creating 2 hive tables and run 2
> separate spark query and join them later.
>
> Later, I found parquet has supported column split into different files as
> shown here:
> https://parquet.apache.org/documentation/latest/
>
> I'm wondering if spark sql already supports that ? If so, how to use ?
>
> Weide
>


Re: pyspark

2022-01-06 Thread Gourav Sengupta
Hi,

I am not sure at all that we need to use SQLContext and HiveContext
anymore.

Can you please check your JAVA_HOME, and SPARK_HOME? I use findspark
library to enable all environment variables for me regarding spark, or use
conda to install pyspark using conda-forge


Regards,
Gourav Sengupta


On Wed, Jan 5, 2022 at 4:08 PM Mich Talebzadeh 
wrote:

> hm,
>
> If I understand correctly
>
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from pyspark.sql import SQLContext, HiveContext
> import sys
>
> def spark_session(appName):
>   return SparkSession.builder \
> .appName(appName) \
> .enableHiveSupport() \
> .getOrCreate()
>
> def sparkcontext():
>   return SparkContext.getOrCreate()
>
> def hivecontext():
>   return HiveContext(sparkcontext())
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Jan 2022 at 16:00, 流年以东” <2538974...@qq.com.invalid> wrote:
>
>>
>> In the process of using pyspark,there is no spark context when opening
>> jupyter and input sc.master show that sc is not define.we want to
>> initialize the spark context with script. this is error.
>> hope to receive your reply
>> --
>> 发自我的iPhone
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to make batch filter

2022-01-02 Thread Gourav Sengupta
Hi Mich,

your notes are really great, it really brought back the old days again :)
thanks.

Just to note a few points that I found useful related to this question:
1. cores and threads - page 5
2. executor cores and number settings - page 6..


I think that the following example may be of use, note that I have one
driver and that has 8 cores as I am running PYSPARK 3.1.2 in local mode,
but this will give a way to find out a bit more possibly:


>>> from pyspark.sql.types import *
>>> #create the filter dataframe, there are easier ways to do the below
>>> spark.createDataFrame(list(map(lambda filter: pyspark.sql.Row(filter),
[0, 1, 2, 4, 7, 9])), StructType([StructField("filter_value",
IntegerType())])).createOrReplaceTempView("filters")
>>> #create the main table
>>> spark.range(100).createOrReplaceTempView("test_base")
>>> spark.sql("SELECT id, FLOOR(RAND() * 10) rand FROM
test_base").createOrReplaceTempView("test")
>>> #see the partitions in the filters and the main table
>>> spark.sql("SELECT * FROM filters").rdd.getNumPartitions()
8
>>> spark.sql("SELECT * FROM test").rdd.getNumPartitions()
8
>>> #see the number of partitions in the filtered join output, I am
assuming implicit casting here
>>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM
filters)").rdd.getNumPartitions()
200
>>> spark.sql("SET spark.sql.shuffle.partitions=10")
DataFrame[key: string, value: string]
>>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM
filters)").rdd.getNumPartitions()
10


Please do refer to the following page for adaptive sql execution in SPARK
3, it will be of massive help particularly in case you are handling skewed
joins, https://spark.apache.org/docs/latest/sql-performance-tuning.html


Thanks and Regards,
Gourav Sengupta

On Sun, Jan 2, 2022 at 11:24 AM Bitfox  wrote:

> Thanks Mich. That looks good.
>
> On Sun, Jan 2, 2022 at 7:10 PM Mich Talebzadeh 
> wrote:
>
>> LOL.
>>
>> You asking these questions takes me back to summer 2016 when I started
>> writing notes on spark. Obviously earlier versions but the notion of RDD,
>> Local, standalone, YARN etc. are still valid. Those days there were no k8s
>> and the public cloud was not widely adopted.  I browsed it and it was
>> refreshing for me. Anyway you may find some points addressing your
>> questions that you tend to ask.
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 2 Jan 2022 at 00:20, Bitfox  wrote:
>>
>>> One more question, for this big filter, given my server has 4 Cores,
>>> will spark (standalone mode) split the RDD to 4 partitions automatically?
>>>
>>> Thanks
>>>
>>> On Sun, Jan 2, 2022 at 6:30 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Create a list of values that you don't want anf filter oon those
>>>>
>>>> >>> DF = spark.range(10)
>>>> >>> DF
>>>> DataFrame[id: bigint]
>>>> >>>
>>>> >>> array = [1, 2, 3, 8]  # don't want these
>>>> >>> DF.filter(DF.id.isin(array) == False).show()
>>>> +---+
>>>> | id|
>>>> +---+
>>>> |  0|
>>>> |  4|
>>>> |  5|
>>>> |  6|
>>>> |  7|
>>>> |  9|
>>>> +---+
>>>>
>>>>  or use binary NOT operator:
>>>>
>>>>
>>>> >>> DF.filter(*~*DF.id.isin(array)).show()
>>>>
>>>> +---+
>>>>
>>>> | id|
>>>>
>>>> +---+
>>>>
>>>> |  0|
>>>>
>>>> |  4|
>>>>
>>>> |  5|
>>>>
>>>> |  6|
>>>>
>>>> |  7|
>>>>
>>>> |  9|
>>>>
>>>> +---+
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 1 Jan 2022 at 20:59, Bitfox  wrote:
>>>>
>>>>> Using the dataframe API I need to implement a batch filter:
>>>>>
>>>>> DF. select(..).where(col(..) != ‘a’ and col(..) != ‘b’ and …)
>>>>>
>>>>> There are a lot of keywords should be filtered for the same column in
>>>>> where statement.
>>>>>
>>>>> How can I make it more smater? UDF or others?
>>>>>
>>>>> Thanks & Happy new Year!
>>>>> Bitfox
>>>>>
>>>>


Re: Pyspark debugging best practices

2021-12-28 Thread Gourav Sengupta
Hi Andrew,

Any chance you might give Databricks a try in GCP?

The above transformations look complicated to me, why are you adding
dataframes to a list?


Regards,
Gourav Sengupta



On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am having trouble debugging my driver. It runs correctly on smaller data
> set but fails on large ones.  It is very hard to figure out what the bug
> is. I suspect it may have something do with the way spark is installed and
> configured. I am using google cloud platform dataproc pyspark
>
>
>
> The log messages are not helpful. The error message will be something like
> "User application exited with status 1"
>
>
>
> And
>
>
>
> jsonPayload: {
>
> class: "server.TThreadPoolServer"
>
> filename: "hive-server2.log"
>
> message: "Error occurred during processing of message."
>
> thread: "HiveServer2-Handler-Pool: Thread-40"
>
> }
>
>
>
> I am able to access the spark history server however it does not capture
> anything if the driver crashes. I am unable to figure out how to access
> spark web UI.
>
>
>
> My driver program looks something like the pseudo code bellow. A long list
> of transforms with a single action, (i.e. write) at the end. Adding log
> messages is not helpful because of lazy evaluations. I am tempted to add
> something like
>
>
>
> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline
> some sort of diagnostic message.
>
>
>
> What do you think?
>
>
>
> Is there a better way to debug this?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> def run():
>
> listOfDF = []
>
> for filePath in listOfFiles:
>
> df = spark.read.load( filePath, ...)
>
> listOfDF.append(df)
>
>
>
>
>
> list2OfDF = []
>
> for df in listOfDF:
>
> df2 = df.select(  )
>
> lsit2OfDF.append( df2 )
>
>
>
> # will setting list to None free cache?
>
> # or just driver memory
>
> listOfDF = None
>
>
>
>
>
> df3 = list2OfDF[0]
>
>
>
> for i in range( 1, len(list2OfDF) ):
>
> df = list2OfDF[i]
>
> df3 = df3.join(df ...)
>
>
>
> # will setting to list to None free cache?
>
> # or just driver memory
>
> List2OfDF = None
>
>
>
>
>
> lots of narrow transformations on d3
>
>
>
> return df3
>
>
>
> def main() :
>
> df = run()
>
> df.write()
>
>
>
>
>
>
>


Re: Dataframe's storage size

2021-12-24 Thread Gourav Sengupta
Hi,

even the cached data has different memory for the dataframes with exactly
the same data depending on a lot of conditions.

I generally tend to try to understand the problem before jumping into
conclusions through assumptions, sadly a habit I cannot overcome.

Is there a way to understand what is the person trying to achieve here by
knowing the size of dataframe?



Regards,
Gourav

On Fri, Dec 24, 2021 at 2:49 PM Sean Owen  wrote:

> I assume it means size in memory when cached, which does make sense.
> Fastest thing is to look at it in the UI Storage tab after it is cached.
>
> On Fri, Dec 24, 2021, 4:54 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> This question, once again like the last one, does not make much sense at
>> all. Where are you trying to store the data frame, and how?
>>
>> Are you just trying to write a blog, as you were mentioning in an earlier
>> email, and trying to fill in some gaps? I think that the questions are
>> entirely wrong.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Dec 24, 2021 at 2:04 AM  wrote:
>>
>>> Hello
>>>
>>> Is it possible to know a dataframe's total storage size in bytes? such
>>> as:
>>>
>>> >>> df.size()
>>> Traceback (most recent call last):
>>>File "", line 1, in 
>>>File "/opt/spark/python/pyspark/sql/dataframe.py", line 1660, in
>>> __getattr__
>>>  "'%s' object has no attribute '%s'" % (self.__class__.__name__,
>>> name))
>>> AttributeError: 'DataFrame' object has no attribute 'size'
>>>
>>> Sure it won't work. but if there is such a method that would be great.
>>>
>>> Thanks.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


  1   2   3   4   5   6   >