Re: About some Spark technical help

2021-12-24 Thread sam smith
Thanks for the feedback Andrew.

Le sam. 25 déc. 2021 à 03:17, Andrew Davidson  a écrit :

> Hi Sam
>
> It is kind of hard to review straight code. Adding some some sample data,
> a unit test and expected results. Would be a good place to start. Ie.
> Determine the fidelity of your implementation compared to the original.
>
> Also a verbal description of the algo would be helpful
>
> Happy Holidays
>
> Andy
>
> On Fri, Dec 24, 2021 at 3:17 AM sam smith 
> wrote:
>
>> Hi Gourav,
>>
>> Good question! that's the programming language i am most proficient at.
>> You are always welcome to suggest corrective remarks about my (Spark)
>> code.
>>
>> Kind regards.
>>
>> Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta 
>> a écrit :
>>
>>> Hi,
>>>
>>> out of sheer and utter curiosity, why JAVA?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
>>> wrote:
>>>
 Hi Andrew,

 Thanks, here's the Github repo to the code and the publication :
 https://github.com/SamSmithDevs10/paperReplicationForReview

 Kind regards

 Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
 écrit :

> Hi Sam
>
>
>
> Can you tell us more? What is the algorithm? Can you send us the URL
> the publication
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *sam smith 
> *Date: *Wednesday, December 22, 2021 at 10:59 AM
> *To: *"user@spark.apache.org" 
> *Subject: *About some Spark technical help
>
>
>
> Hello guys,
>
>
>
> I am replicating a paper's algorithm in Spark / Java, and want to ask
> you guys for some assistance to validate / review about 150 lines of code.
> My github repo contains both my java class and the related paper,
>
>
>
> Any interested reviewer here ?
>
>
>
>
>
> Thanks.
>



Re: measure running time

2021-12-24 Thread bitfox
Thanks a lot Hollis. It is does due to the pypi version. Now I updated 
it.


$ pip3 -V
pip 9.0.1 from /usr/lib/python3/dist-packages (python 3.6)

$ pip3 install sparkmeasure
Collecting sparkmeasure
  Using cached 
https://files.pythonhosted.org/packages/9f/bf/c9810ff2d88513ffc185e65a3ab9df6121ad5b4c78aa8d134a06177f9021/sparkmeasure-0.14.0-py2.py3-none-any.whl

Installing collected packages: sparkmeasure
Successfully installed sparkmeasure-0.14.0

$ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
Python 3.6.9 (default, Jan 26 2021, 15:33:00)
[GCC 8.4.0] on linux
...

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*) from 
range(1000) cross join range(1000) cross join range(100)").show()')

+-+
| count(1)|
+-+
|1|
+-+
...


Hope it helps to others who have met the same issue.
Happy holidays. :0

Bitfox


On 2021-12-25 09:48, Hollis wrote:

 Replied mail 

 From
 Mich Talebzadeh

 Date
 12/25/2021 00:25

 To
 Sean Owen

 Cc
 user、Luca Canali

 Subject
 Re: measure running time

Hi Sean,

I have already discussed an issue in my case with Spark 3.1.1 and
sparkmeasure  with the author Luca Canali on this matter. It has been
reproduced. I think we ought to wait for a patch.

HTH,

Mich

   view my Linkedin profile [1]

Disclaimer: Use it at your own risk. Any and all responsibility for
any loss, damage or destruction of data or any other property which
may arise from relying on this email's technical content is explicitly
disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.

On Fri, 24 Dec 2021 at 14:51, Sean Owen  wrote:


You probably did not install it on your cluster, nor included the
python package with your app

On Fri, Dec 24, 2021, 4:35 AM  wrote:


but I already installed it:

Requirement already satisfied: sparkmeasure in
/usr/local/lib/python2.7/dist-packages

so how? thank you.

On 2021-12-24 18:15, Hollis wrote:

Hi bitfox,

you need pip install sparkmeasure firstly. then can lanch in

pysaprk.



from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(locals(), 'spark.sql("select

count(*)

from range(1000) cross join range(1000) cross join
range(100)").show()')
+-+

| count(1)|
+-+
|1|
+-+

Regards,
Hollis

At 2021-12-24 09:18:19, bit...@bitfox.top wrote:

Hello list,

I run with Spark 3.2.0

After I started pyspark with:
$ pyspark --packages

ch.cern.sparkmeasure:spark-measure_2.12:0.17


I can't load from the module sparkmeasure:


from sparkmeasure import StageMetrics

Traceback (most recent call last):
File "", line 1, in 
ModuleNotFoundError: No module named 'sparkmeasure'

Do you know why? @Luca thanks.


On 2021-12-24 04:20, bit...@bitfox.top wrote:

Thanks Gourav and Luca. I will try with the tools you provide

in

the

Github.

On 2021-12-23 23:40, Luca Canali wrote:

Hi,

I agree with Gourav that just measuring execution time is a

simplistic

approach that may lead you to miss important details, in

particular

when running distributed computations.

WebUI, REST API, and metrics instrumentation in Spark can be

quite

useful for further drill down. See
https://spark.apache.org/docs/latest/monitoring.html

You can also have a look at this tool that takes care of

automating

collecting and aggregating some executor task metrics:
https://github.com/LucaCanali/sparkMeasure

Best,

Luca

From: Gourav Sengupta 
Sent: Thursday, December 23, 2021 14:23
To: bit...@bitfox.top
Cc: user 
Subject: Re: measure running time

Hi,

I do not think that such time comparisons make any sense at

all in

distributed computation. Just saying that an operation in RDD

and

Dataframe can be compared based on their start and stop time

may

not

provide any valid information.

You will have to look into the details of timing and the

steps.

For

example, please look at the SPARK UI to see how timings are

calculated

in distributed computing mode, there are several well written

papers

on this.

Thanks and Regards,

Gourav Sengupta

On Thu, Dec 23, 2021 at 10:57 AM  wrote:


hello community,

In pyspark how can I measure the running time to the

command?

I just want to compare the running time of the RDD API and

dataframe


API, in my this blog:










https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/


I tried spark.time() it doesn't work.
Thank you.











-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org










-

To unsubscribe e-mail: user-unsubscr...@spark.apach

Re: About some Spark technical help

2021-12-24 Thread Andrew Davidson
Hi Sam

It is kind of hard to review straight code. Adding some some sample data, a
unit test and expected results. Would be a good place to start. Ie.
Determine the fidelity of your implementation compared to the original.

Also a verbal description of the algo would be helpful

Happy Holidays

Andy

On Fri, Dec 24, 2021 at 3:17 AM sam smith 
wrote:

> Hi Gourav,
>
> Good question! that's the programming language i am most proficient at.
> You are always welcome to suggest corrective remarks about my (Spark) code.
>
> Kind regards.
>
> Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta 
> a écrit :
>
>> Hi,
>>
>> out of sheer and utter curiosity, why JAVA?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> Thanks, here's the Github repo to the code and the publication :
>>> https://github.com/SamSmithDevs10/paperReplicationForReview
>>>
>>> Kind regards
>>>
>>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
>>> écrit :
>>>
 Hi Sam



 Can you tell us more? What is the algorithm? Can you send us the URL
 the publication



 Kind regards



 Andy



 *From: *sam smith 
 *Date: *Wednesday, December 22, 2021 at 10:59 AM
 *To: *"user@spark.apache.org" 
 *Subject: *About some Spark technical help



 Hello guys,



 I am replicating a paper's algorithm in Spark / Java, and want to ask
 you guys for some assistance to validate / review about 150 lines of code.
 My github repo contains both my java class and the related paper,



 Any interested reviewer here ?





 Thanks.

>>>


Re: measure running time

2021-12-24 Thread Hollis
Hi
I can run this in my pc.
I check the email chian. bitfox install the spark measure with python2 and he 
launch the pyspark with python3. I think it's the reason.

Regards.
Hollis




 Replied mail 
| From | Mich Talebzadeh |
| Date | 12/25/2021 00:25 |
| To | Sean Owen |
| Cc | user、Luca Canali |
| Subject | Re: measure running time |



Hi Sean,




I have already discussed an issue in my case with Spark 3.1.1 and sparkmeasure  
with the author Luca Canali on this matter. It has been reproduced. I think we 
ought to wait for a patch.




HTH,




Mich 







   view my Linkedin profile

 

Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.

 





On Fri, 24 Dec 2021 at 14:51, Sean Owen  wrote:

You probably did not install it on your cluster, nor included the python 
package with your app 


On Fri, Dec 24, 2021, 4:35 AM  wrote:

but I already installed it:

Requirement already satisfied: sparkmeasure in
/usr/local/lib/python2.7/dist-packages

so how? thank you.

On 2021-12-24 18:15, Hollis wrote:
> Hi bitfox,
>
> you need pip install sparkmeasure firstly. then can lanch in pysaprk.
>
 from sparkmeasure import StageMetrics
 stagemetrics = StageMetrics(spark)
 stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*)
> from range(1000) cross join range(1000) cross join
> range(100)").show()')
> +-+
>
> | count(1)|
> +-+
> |1|
> +-+
>
> Regards,
> Hollis
>
> At 2021-12-24 09:18:19, bit...@bitfox.top wrote:
>> Hello list,
>>
>> I run with Spark 3.2.0
>>
>> After I started pyspark with:
>> $ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
>>
>> I can't load from the module sparkmeasure:
>>
> from sparkmeasure import StageMetrics
>> Traceback (most recent call last):
>>   File "", line 1, in 
>> ModuleNotFoundError: No module named 'sparkmeasure'
>>
>> Do you know why? @Luca thanks.
>>
>>
>> On 2021-12-24 04:20, bit...@bitfox.top wrote:
>>> Thanks Gourav and Luca. I will try with the tools you provide in
> the
>>> Github.
>>>
>>> On 2021-12-23 23:40, Luca Canali wrote:
 Hi,

 I agree with Gourav that just measuring execution time is a
> simplistic
 approach that may lead you to miss important details, in
> particular
 when running distributed computations.

 WebUI, REST API, and metrics instrumentation in Spark can be quite
 useful for further drill down. See
 https://spark.apache.org/docs/latest/monitoring.html

 You can also have a look at this tool that takes care of
> automating
 collecting and aggregating some executor task metrics:
 https://github.com/LucaCanali/sparkMeasure

 Best,

 Luca

 From: Gourav Sengupta 
 Sent: Thursday, December 23, 2021 14:23
 To: bit...@bitfox.top
 Cc: user 
 Subject: Re: measure running time

 Hi,

 I do not think that such time comparisons make any sense at all in
 distributed computation. Just saying that an operation in RDD and
 Dataframe can be compared based on their start and stop time may
> not
 provide any valid information.

 You will have to look into the details of timing and the steps.
> For
 example, please look at the SPARK UI to see how timings are
> calculated
 in distributed computing mode, there are several well written
> papers
 on this.

 Thanks and Regards,

 Gourav Sengupta

 On Thu, Dec 23, 2021 at 10:57 AM  wrote:

> hello community,
>
> In pyspark how can I measure the running time to the command?
> I just want to compare the running time of the RDD API and
> dataframe
>
> API, in my this blog:
>

> https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/
>
> I tried spark.time() it doesn't work.
> Thank you.
>
>

> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: df.show() to text file

2021-12-24 Thread Sean Owen
You can redirect the stdout of your program I guess but show is for
display, not saving data. Use df.write methods for that.

On Fri, Dec 24, 2021, 7:02 PM  wrote:

> Hello list,
>
> spark newbie here :0
> How can I write the df.show() result to a text file in the system?
> I run with pyspark, not the python client programming.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Hollis
Hi,

it is the same thing when you are using Sql or dataframe api. actually, they 
will be optimized by catalyst then push to rdd.
and in this case, there are many times on iteration,  (16000 times).  so you 
got a very big execution plan when you join the dataframe again and again, I 
think this is  the reason you got the IOM and analysis exception.
my suggestion is  you need checkpoint the dataframe when joined 200 dataframes.
so you can trancate the lineage. so the optimizer only analysis the 200 
dataframe.
this will reduce the pressure of spark engine.




| |
Hollis
|




 Replied mail 
| From | Gourav Sengupta |
| Date | 12/25/2021 03:46 |
| To | Sean Owen |
| Cc | Andrew Davidson、Nicholas 
Gustafson、User |
| Subject | Re: OOM Joining thousands of dataframes Was: AnalysisException: 
Trouble using select() to append multiple columns |
Hi,


may be I am getting confused as always  :) , but the requirement looked pretty 
simple to me to be implemented in SQL, or it is just the euphoria of Christmas 
eve 


Anyways, in case the above can be implemented in SQL, then I can have a look at 
it.


Yes, indeed there are bespoke scenarios where dataframes may apply and RDD are 
used, but for UDF's I prefer SQL as well, but that may be a personal 
idiosyncrasy. The Oreilly book on data algorithms using SPARK, pyspark uses 
dataframes and RDD API's :)


Regards,
Gourav Sengupta


On Fri, Dec 24, 2021 at 6:11 PM Sean Owen  wrote:

This is simply not generally true, no, and not in this case. The programmatic 
and SQL APIs overlap a lot, and where they do, they're essentially aliases. Use 
whatever is more natural.
What I wouldn't recommend doing is emulating SQL-like behavior in custom code, 
UDFs, etc. The native operators will be faster.
Sometimes you have to go outside SQL where necessary, like in UDFs or complex 
aggregation logic. Then you can't use SQL.


On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta  
wrote:

Hi,


yeah I think that in practice you will always find that dataframes can give 
issues regarding a lot of things, and then you can argue. In the SPARK 
conference, I think last year, it was shown that more than 92% or 95% use the 
SPARK SQL API, if I am not mistaken. 


I think that you can do the entire processing at one single go. 


Can you please write down the end to end SQL and share without the 16000 
iterations?




Regards,
Gourav Sengupta




On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson  wrote:


Hi Sean and Gourav

 

Thanks for the suggestions. I thought that both the sql and dataframe apis are 
wrappers around the same frame work? Ie. catalysts.  

 

I tend to mix and match my code. Sometimes I find it easier to write using sql 
some times dataframes. What is considered best practices?

 

Here is an example

 


df.show() to text file

2021-12-24 Thread bitfox

Hello list,

spark newbie here :0
How can I write the df.show() result to a text file in the system?
I run with pyspark, not the python client programming.

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe's storage size

2021-12-24 Thread Gourav Sengupta
Hi,

even the cached data has different memory for the dataframes with exactly
the same data depending on a lot of conditions.

I generally tend to try to understand the problem before jumping into
conclusions through assumptions, sadly a habit I cannot overcome.

Is there a way to understand what is the person trying to achieve here by
knowing the size of dataframe?



Regards,
Gourav

On Fri, Dec 24, 2021 at 2:49 PM Sean Owen  wrote:

> I assume it means size in memory when cached, which does make sense.
> Fastest thing is to look at it in the UI Storage tab after it is cached.
>
> On Fri, Dec 24, 2021, 4:54 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> This question, once again like the last one, does not make much sense at
>> all. Where are you trying to store the data frame, and how?
>>
>> Are you just trying to write a blog, as you were mentioning in an earlier
>> email, and trying to fill in some gaps? I think that the questions are
>> entirely wrong.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Dec 24, 2021 at 2:04 AM  wrote:
>>
>>> Hello
>>>
>>> Is it possible to know a dataframe's total storage size in bytes? such
>>> as:
>>>
>>> >>> df.size()
>>> Traceback (most recent call last):
>>>File "", line 1, in 
>>>File "/opt/spark/python/pyspark/sql/dataframe.py", line 1660, in
>>> __getattr__
>>>  "'%s' object has no attribute '%s'" % (self.__class__.__name__,
>>> name))
>>> AttributeError: 'DataFrame' object has no attribute 'size'
>>>
>>> Sure it won't work. but if there is such a method that would be great.
>>>
>>> Thanks.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Gourav Sengupta
Hi,

may be I am getting confused as always  :) , but the requirement looked
pretty simple to me to be implemented in SQL, or it is just the euphoria of
Christmas eve

Anyways, in case the above can be implemented in SQL, then I can have a
look at it.

Yes, indeed there are bespoke scenarios where dataframes may apply and RDD
are used, but for UDF's I prefer SQL as well, but that may be a
personal idiosyncrasy. The Oreilly book on data algorithms using SPARK,
pyspark uses dataframes and RDD API's :)

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 6:11 PM Sean Owen  wrote:

> This is simply not generally true, no, and not in this case. The
> programmatic and SQL APIs overlap a lot, and where they do, they're
> essentially aliases. Use whatever is more natural.
> What I wouldn't recommend doing is emulating SQL-like behavior in custom
> code, UDFs, etc. The native operators will be faster.
> Sometimes you have to go outside SQL where necessary, like in UDFs or
> complex aggregation logic. Then you can't use SQL.
>
> On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> yeah I think that in practice you will always find that dataframes can
>> give issues regarding a lot of things, and then you can argue. In the SPARK
>> conference, I think last year, it was shown that more than 92% or 95% use
>> the SPARK SQL API, if I am not mistaken.
>>
>> I think that you can do the entire processing at one single go.
>>
>> Can you please write down the end to end SQL and share without the 16000
>> iterations?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson 
>> wrote:
>>
>>> Hi Sean and Gourav
>>>
>>>
>>>
>>> Thanks for the suggestions. I thought that both the sql and dataframe
>>> apis are wrappers around the same frame work? Ie. catalysts.
>>>
>>>
>>>
>>> I tend to mix and match my code. Sometimes I find it easier to write
>>> using sql some times dataframes. What is considered best practices?
>>>
>>>
>>>
>>> Here is an example
>>>
>>>
>>>
>>>


Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Sean Owen
This is simply not generally true, no, and not in this case. The
programmatic and SQL APIs overlap a lot, and where they do, they're
essentially aliases. Use whatever is more natural.
What I wouldn't recommend doing is emulating SQL-like behavior in custom
code, UDFs, etc. The native operators will be faster.
Sometimes you have to go outside SQL where necessary, like in UDFs or
complex aggregation logic. Then you can't use SQL.

On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta 
wrote:

> Hi,
>
> yeah I think that in practice you will always find that dataframes can
> give issues regarding a lot of things, and then you can argue. In the SPARK
> conference, I think last year, it was shown that more than 92% or 95% use
> the SPARK SQL API, if I am not mistaken.
>
> I think that you can do the entire processing at one single go.
>
> Can you please write down the end to end SQL and share without the 16000
> iterations?
>
>
> Regards,
> Gourav Sengupta
>
>
> On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson  wrote:
>
>> Hi Sean and Gourav
>>
>>
>>
>> Thanks for the suggestions. I thought that both the sql and dataframe
>> apis are wrappers around the same frame work? Ie. catalysts.
>>
>>
>>
>> I tend to mix and match my code. Sometimes I find it easier to write
>> using sql some times dataframes. What is considered best practices?
>>
>>
>>
>> Here is an example
>>
>>
>>
>>


Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Gourav Sengupta
Hi,

yeah I think that in practice you will always find that dataframes can give
issues regarding a lot of things, and then you can argue. In the SPARK
conference, I think last year, it was shown that more than 92% or 95% use
the SPARK SQL API, if I am not mistaken.

I think that you can do the entire processing at one single go.

Can you please write down the end to end SQL and share without the 16000
iterations?


Regards,
Gourav Sengupta


On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson  wrote:

> Hi Sean and Gourav
>
>
>
> Thanks for the suggestions. I thought that both the sql and dataframe apis
> are wrappers around the same frame work? Ie. catalysts.
>
>
>
> I tend to mix and match my code. Sometimes I find it easier to write using
> sql some times dataframes. What is considered best practices?
>
>
>
> Here is an example
>
>
>
> Case 1
>
>for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000
> times!
>
> sampleName = self.sampleNamesList[i]
>
> sampleDF= quantSparkDFList[i]
>
>sampleSDF.createOrReplaceTempView( "sample" )
>
>
>
> # the sample name must be quoted else column names with a '-'
>
> # like GTEX-1117F-0426-SM-5EGHI will generate an error
>
> # spark thinks the '-' is an expression. '_' is also
>
> # a special char for the sql like operator
>
> # https://stackoverflow.com/a/63899306/4586180
>
> sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
>
> from \n\
>
>rawCounts as rc, \n\
>
>sample  \n\
>
> where \n\
>
> rc.Name == sample.Name \n'.format(
> sampleName )
>
>
>
> rawCountsSDF = self.spark.sql( sqlStmt )
>
> rawCountsSDF.createOrReplaceTempView( "rawCounts"
>
>
>
> case 2
>
>for i in range( 1, len(dfList) ):
>
>  df2 = dfList[i]
>
>  retDF = retDF.join( df2.selectExpr("*"),
> on=["Name"] )
>
>
>
>
>
> I think my out of memory exception maybe because the query plan is huge. I
> have not figure out how to figure out if that is my bug or not. My untested
> work around is organize the data so that each massive join is run on 1/5 of
> the total data set, then union them all together. Each “part” will still
> need to iterate 16000 times
>
>
>
> In general I assume we want to avoid for loops. I assume Spark is unable
> to optimize them. It would be nice if spark provide some sort of join all
> function even if it used a for loop to hide this from me
>
>
>
> Happy holidays
>
>
>
> Andy
>
>
>
>
>
>
>
> *From: *Sean Owen 
> *Date: *Friday, December 24, 2021 at 8:30 AM
> *To: *Gourav Sengupta 
> *Cc: *Andrew Davidson , Nicholas Gustafson <
> njgustaf...@gmail.com>, User 
> *Subject: *Re: AnalysisException: Trouble using select() to append
> multiple columns
>
>
>
> (that's not the situation below we are commenting on)
>
> On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta 
> wrote:
>
> Hi,
>
>
>
> try to write several withColumns in a dataframe with functions and then
> see the UI for time differences. This should be done with large data sets
> of course, in order of a around 200GB +
>
>
>
> With scenarios involving nested queries and joins the time differences
> shown in the UI becomes a bit more visible.
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Fri, Dec 24, 2021 at 2:48 PM Sean Owen  wrote:
>
> Nah, it's going to translate to the same plan as the equivalent SQL.
>
>
>
> On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
> wrote:
>
> Hi,
>
>
>
> please note that using SQL is much more performant, and easier to manage
> these kind of issues. You might want to look at the SPARK UI to see the
> advantage of using SQL over dataframes API.
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson 
> wrote:
>
> Thanks Nicholas
>
>
>
> Andy
>
>
>
> *From: *Nicholas Gustafson 
> *Date: *Friday, December 17, 2021 at 6:12 PM
> *To: *Andrew Davidson 
> *Cc: *"user@spark.apache.org" 
> *Subject: *Re: AnalysisException: Trouble using select() to append
> multiple columns
>
>
>
> Since df1 and df2 are different DataFrames, you will need to use a join.
> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
> on=[“Name”])
>
>
>
> On Dec 17, 2021, at 16:25, Andrew Davidson 
> wrote:
>
> 
>
> Hi I am a newbie
>
>
>
> I have 16,000 data files, all files have the same number of rows and
> columns. The row ids are identical and are in the same order. I want to
> create a new data frame that contains the 3rd column from each data file
>
>
>
> I wrote a test program that uses a for loop and Join. It works with my
> small test set. I get an OOM when I try to run using the all the data
> files. I realize that join ( map reduce) is probably not a great solution
> for my problem
>
>
>
> Recently I found several a

OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Andrew Davidson
Hi Sean and Gourav

Thanks for the suggestions. I thought that both the sql and dataframe apis are 
wrappers around the same frame work? Ie. catalysts.

I tend to mix and match my code. Sometimes I find it easier to write using sql 
some times dataframes. What is considered best practices?

Here is an example

Case 1
   for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 times!
sampleName = self.sampleNamesList[i]
sampleDF= quantSparkDFList[i]
   sampleSDF.createOrReplaceTempView( "sample" )

# the sample name must be quoted else column names with a '-'
# like GTEX-1117F-0426-SM-5EGHI will generate an error
# spark thinks the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   rawCounts as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

rawCountsSDF = self.spark.sql( sqlStmt )
rawCountsSDF.createOrReplaceTempView( "rawCounts"

case 2
   for i in range( 1, len(dfList) ):
 df2 = dfList[i]
 retDF = retDF.join( df2.selectExpr("*"), on=["Name"] )


I think my out of memory exception maybe because the query plan is huge. I have 
not figure out how to figure out if that is my bug or not. My untested work 
around is organize the data so that each massive join is run on 1/5 of the 
total data set, then union them all together. Each “part” will still need to 
iterate 16000 times

In general I assume we want to avoid for loops. I assume Spark is unable to 
optimize them. It would be nice if spark provide some sort of join all function 
even if it used a for loop to hide this from me

Happy holidays

Andy



From: Sean Owen 
Date: Friday, December 24, 2021 at 8:30 AM
To: Gourav Sengupta 
Cc: Andrew Davidson , Nicholas Gustafson 
, User 
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

(that's not the situation below we are commenting on)
On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

try to write several withColumns in a dataframe with functions and then see the 
UI for time differences. This should be done with large data sets of course, in 
order of a around 200GB +

With scenarios involving nested queries and joins the time differences shown in 
the UI becomes a bit more visible.

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 2:48 PM Sean Owen 
mailto:sro...@gmail.com>> wrote:
Nah, it's going to translate to the same plan as the equivalent SQL.

On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:
Hi,

please note that using SQL is much more performant, and easier to manage these 
kind of issues. You might want to look at the SPARK UI to see the advantage of 
using SQL over dataframes API.


Regards,
Gourav Sengupta

On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson  
wrote:
Thanks Nicholas

Andy

From: Nicholas Gustafson mailto:njgustaf...@gmail.com>>
Date: Friday, December 17, 2021 at 6:12 PM
To: Andrew Davidson 
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: AnalysisException: Trouble using select() to append multiple 
columns

Since df1 and df2 are different DataFrames, you will need to use a join. For 
example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”])

On Dec 17, 2021, at 16:25, Andrew Davidson  wrote:

Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+---+--+

|   Name|ctrl_1|

+---+--+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  12.0|

| txId_4|  13.0|

| txId_5|  14.0|

| txId_6|  15.0|

| txId_7|  16.0|

| txId_8|  17.0

Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Sean Owen
(that's not the situation below we are commenting on)

On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta 
wrote:

> Hi,
>
> try to write several withColumns in a dataframe with functions and then
> see the UI for time differences. This should be done with large data sets
> of course, in order of a around 200GB +
>
> With scenarios involving nested queries and joins the time differences
> shown in the UI becomes a bit more visible.
>
> Regards,
> Gourav Sengupta
>
> On Fri, Dec 24, 2021 at 2:48 PM Sean Owen  wrote:
>
>> Nah, it's going to translate to the same plan as the equivalent SQL.
>>
>> On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
>> wrote:
>>
>>> Hi,
>>>
>>> please note that using SQL is much more performant, and easier to manage
>>> these kind of issues. You might want to look at the SPARK UI to see the
>>> advantage of using SQL over dataframes API.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson
>>>  wrote:
>>>
 Thanks Nicholas



 Andy



 *From: *Nicholas Gustafson 
 *Date: *Friday, December 17, 2021 at 6:12 PM
 *To: *Andrew Davidson 
 *Cc: *"user@spark.apache.org" 
 *Subject: *Re: AnalysisException: Trouble using select() to append
 multiple columns



 Since df1 and df2 are different DataFrames, you will need to use a
 join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
 on=[“Name”])



 On Dec 17, 2021, at 16:25, Andrew Davidson 
 wrote:

 

 Hi I am a newbie



 I have 16,000 data files, all files have the same number of rows and
 columns. The row ids are identical and are in the same order. I want to
 create a new data frame that contains the 3rd column from each data
 file



 I wrote a test program that uses a for loop and Join. It works with my
 small test set. I get an OOM when I try to run using the all the data
 files. I realize that join ( map reduce) is probably not a great solution
 for my problem



 Recently I found several articles that take about the challenge with
 using withColumn() and talk about how to use select() to append columns



 https://mungingdata.com/pyspark/select-add-columns-withcolumn/


 https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop



 I am using pyspark spark-3.1.2-bin-hadoop3.2



 I wrote a little test program. It am able to append columns created
 using pyspark.sql.function.lit(). I am not able to append columns from
 other data frames



 df1

 DataFrame[Name: string, ctrl_1: double]

 +---+--+

 |   Name|ctrl_1|

 +---+--+

 | txId_1|   0.0|

 | txId_2|  11.0|

 | txId_3|  12.0|

 | txId_4|  13.0|

 | txId_5|  14.0|

 | txId_6|  15.0|

 | txId_7|  16.0|

 | txId_8|  17.0|

 | txId_9|  18.0|

 |txId_10|  19.0|

 +---+--+



 # use select to append multiple literals

 allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"),
 pyf.lit("mn0").alias("y")] )



 allDF3

 DataFrame[Name: string, ctrl_1: double, x: string, y: string]

 +---+--+---+---+

 |   Name|ctrl_1|  x|  y|

 +---+--+---+---+

 | txId_1|   0.0|abc|mn0|

 | txId_2|  11.0|abc|mn0|

 | txId_3|  12.0|abc|mn0|

 | txId_4|  13.0|abc|mn0|

 | txId_5|  14.0|abc|mn0|

 | txId_6|  15.0|abc|mn0|

 | txId_7|  16.0|abc|mn0|

 | txId_8|  17.0|abc|mn0|

 | txId_9|  18.0|abc|mn0|

 |txId_10|  19.0|abc|mn0|

 +---+--+---+---+



 df2

 DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
 NumReads: double]

 +---+--+---+++

 |   Name|Length|EffectiveLength| TPM|NumReads|

 +---+--+---+++

 | txId_1|  1500| 1234.5|12.1| 0.1|

 | txId_2|  1510| 1244.5|13.1|11.1|

 | txId_3|  1520| 1254.5|14.1|12.1|

 | txId_4|  1530| 1264.5|15.1|13.1|

 | txId_5|  1540| 1274.5|16.1|14.1|

 | txId_6|  1550| 1284.5|17.1|15.1|

 | txId_7|  1560| 1294.5|18.1|16.1|

 | txId_8|  1570| 1304.5|19.1|17.1|

 | txId_9|  1580| 1314.5|20.1|18.1|

 |txId_10|  1590| 1324.5|21.1|19.1|

 +---+--+---+++



 s2Col = df2["NumReads"].alias( 'ctrl_2' )

 print("type(s2Col) = {}".format(type(s2Col)) )



 type(s2Col) =

Re: measure running time

2021-12-24 Thread Mich Talebzadeh
Hi Sean,


I have already discussed an issue in my case with Spark 3.1.1
and sparkmeasure  with the author Luca Canali on this matter. It has been
reproduced. I think we ought to wait for a patch.


HTH,


Mich



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 24 Dec 2021 at 14:51, Sean Owen  wrote:

> You probably did not install it on your cluster, nor included the python
> package with your app
>
> On Fri, Dec 24, 2021, 4:35 AM  wrote:
>
>> but I already installed it:
>>
>> Requirement already satisfied: sparkmeasure in
>> /usr/local/lib/python2.7/dist-packages
>>
>> so how? thank you.
>>
>> On 2021-12-24 18:15, Hollis wrote:
>> > Hi bitfox,
>> >
>> > you need pip install sparkmeasure firstly. then can lanch in pysaprk.
>> >
>>  from sparkmeasure import StageMetrics
>>  stagemetrics = StageMetrics(spark)
>>  stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*)
>> > from range(1000) cross join range(1000) cross join
>> > range(100)").show()')
>> > +-+
>> >
>> > | count(1)|
>> > +-+
>> > |1|
>> > +-+
>> >
>> > Regards,
>> > Hollis
>> >
>> > At 2021-12-24 09:18:19, bit...@bitfox.top wrote:
>> >> Hello list,
>> >>
>> >> I run with Spark 3.2.0
>> >>
>> >> After I started pyspark with:
>> >> $ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
>> >>
>> >> I can't load from the module sparkmeasure:
>> >>
>> > from sparkmeasure import StageMetrics
>> >> Traceback (most recent call last):
>> >>   File "", line 1, in 
>> >> ModuleNotFoundError: No module named 'sparkmeasure'
>> >>
>> >> Do you know why? @Luca thanks.
>> >>
>> >>
>> >> On 2021-12-24 04:20, bit...@bitfox.top wrote:
>> >>> Thanks Gourav and Luca. I will try with the tools you provide in
>> > the
>> >>> Github.
>> >>>
>> >>> On 2021-12-23 23:40, Luca Canali wrote:
>>  Hi,
>> 
>>  I agree with Gourav that just measuring execution time is a
>> > simplistic
>>  approach that may lead you to miss important details, in
>> > particular
>>  when running distributed computations.
>> 
>>  WebUI, REST API, and metrics instrumentation in Spark can be quite
>>  useful for further drill down. See
>>  https://spark.apache.org/docs/latest/monitoring.html
>> 
>>  You can also have a look at this tool that takes care of
>> > automating
>>  collecting and aggregating some executor task metrics:
>>  https://github.com/LucaCanali/sparkMeasure
>> 
>>  Best,
>> 
>>  Luca
>> 
>>  From: Gourav Sengupta 
>>  Sent: Thursday, December 23, 2021 14:23
>>  To: bit...@bitfox.top
>>  Cc: user 
>>  Subject: Re: measure running time
>> 
>>  Hi,
>> 
>>  I do not think that such time comparisons make any sense at all in
>>  distributed computation. Just saying that an operation in RDD and
>>  Dataframe can be compared based on their start and stop time may
>> > not
>>  provide any valid information.
>> 
>>  You will have to look into the details of timing and the steps.
>> > For
>>  example, please look at the SPARK UI to see how timings are
>> > calculated
>>  in distributed computing mode, there are several well written
>> > papers
>>  on this.
>> 
>>  Thanks and Regards,
>> 
>>  Gourav Sengupta
>> 
>>  On Thu, Dec 23, 2021 at 10:57 AM  wrote:
>> 
>> > hello community,
>> >
>> > In pyspark how can I measure the running time to the command?
>> > I just want to compare the running time of the RDD API and
>> > dataframe
>> >
>> > API, in my this blog:
>> >
>> 
>> >
>> https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/
>> >
>> > I tried spark.time() it doesn't work.
>> > Thank you.
>> >
>> >
>> 
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>> >>>
>> > -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Gourav Sengupta
Hi,

try to write several withColumns in a dataframe with functions and then see
the UI for time differences. This should be done with large data sets of
course, in order of a around 200GB +

With scenarios involving nested queries and joins the time differences
shown in the UI becomes a bit more visible.

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 2:48 PM Sean Owen  wrote:

> Nah, it's going to translate to the same plan as the equivalent SQL.
>
> On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> please note that using SQL is much more performant, and easier to manage
>> these kind of issues. You might want to look at the SPARK UI to see the
>> advantage of using SQL over dataframes API.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson 
>> wrote:
>>
>>> Thanks Nicholas
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> *From: *Nicholas Gustafson 
>>> *Date: *Friday, December 17, 2021 at 6:12 PM
>>> *To: *Andrew Davidson 
>>> *Cc: *"user@spark.apache.org" 
>>> *Subject: *Re: AnalysisException: Trouble using select() to append
>>> multiple columns
>>>
>>>
>>>
>>> Since df1 and df2 are different DataFrames, you will need to use a join.
>>> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
>>> on=[“Name”])
>>>
>>>
>>>
>>> On Dec 17, 2021, at 16:25, Andrew Davidson 
>>> wrote:
>>>
>>> 
>>>
>>> Hi I am a newbie
>>>
>>>
>>>
>>> I have 16,000 data files, all files have the same number of rows and
>>> columns. The row ids are identical and are in the same order. I want to
>>> create a new data frame that contains the 3rd column from each data file
>>>
>>>
>>>
>>> I wrote a test program that uses a for loop and Join. It works with my
>>> small test set. I get an OOM when I try to run using the all the data
>>> files. I realize that join ( map reduce) is probably not a great solution
>>> for my problem
>>>
>>>
>>>
>>> Recently I found several articles that take about the challenge with
>>> using withColumn() and talk about how to use select() to append columns
>>>
>>>
>>>
>>> https://mungingdata.com/pyspark/select-add-columns-withcolumn/
>>>
>>>
>>> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop
>>>
>>>
>>>
>>> I am using pyspark spark-3.1.2-bin-hadoop3.2
>>>
>>>
>>>
>>> I wrote a little test program. It am able to append columns created
>>> using pyspark.sql.function.lit(). I am not able to append columns from
>>> other data frames
>>>
>>>
>>>
>>> df1
>>>
>>> DataFrame[Name: string, ctrl_1: double]
>>>
>>> +---+--+
>>>
>>> |   Name|ctrl_1|
>>>
>>> +---+--+
>>>
>>> | txId_1|   0.0|
>>>
>>> | txId_2|  11.0|
>>>
>>> | txId_3|  12.0|
>>>
>>> | txId_4|  13.0|
>>>
>>> | txId_5|  14.0|
>>>
>>> | txId_6|  15.0|
>>>
>>> | txId_7|  16.0|
>>>
>>> | txId_8|  17.0|
>>>
>>> | txId_9|  18.0|
>>>
>>> |txId_10|  19.0|
>>>
>>> +---+--+
>>>
>>>
>>>
>>> # use select to append multiple literals
>>>
>>> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"),
>>> pyf.lit("mn0").alias("y")] )
>>>
>>>
>>>
>>> allDF3
>>>
>>> DataFrame[Name: string, ctrl_1: double, x: string, y: string]
>>>
>>> +---+--+---+---+
>>>
>>> |   Name|ctrl_1|  x|  y|
>>>
>>> +---+--+---+---+
>>>
>>> | txId_1|   0.0|abc|mn0|
>>>
>>> | txId_2|  11.0|abc|mn0|
>>>
>>> | txId_3|  12.0|abc|mn0|
>>>
>>> | txId_4|  13.0|abc|mn0|
>>>
>>> | txId_5|  14.0|abc|mn0|
>>>
>>> | txId_6|  15.0|abc|mn0|
>>>
>>> | txId_7|  16.0|abc|mn0|
>>>
>>> | txId_8|  17.0|abc|mn0|
>>>
>>> | txId_9|  18.0|abc|mn0|
>>>
>>> |txId_10|  19.0|abc|mn0|
>>>
>>> +---+--+---+---+
>>>
>>>
>>>
>>> df2
>>>
>>> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
>>> NumReads: double]
>>>
>>> +---+--+---+++
>>>
>>> |   Name|Length|EffectiveLength| TPM|NumReads|
>>>
>>> +---+--+---+++
>>>
>>> | txId_1|  1500| 1234.5|12.1| 0.1|
>>>
>>> | txId_2|  1510| 1244.5|13.1|11.1|
>>>
>>> | txId_3|  1520| 1254.5|14.1|12.1|
>>>
>>> | txId_4|  1530| 1264.5|15.1|13.1|
>>>
>>> | txId_5|  1540| 1274.5|16.1|14.1|
>>>
>>> | txId_6|  1550| 1284.5|17.1|15.1|
>>>
>>> | txId_7|  1560| 1294.5|18.1|16.1|
>>>
>>> | txId_8|  1570| 1304.5|19.1|17.1|
>>>
>>> | txId_9|  1580| 1314.5|20.1|18.1|
>>>
>>> |txId_10|  1590| 1324.5|21.1|19.1|
>>>
>>> +---+--+---+++
>>>
>>>
>>>
>>> s2Col = df2["NumReads"].alias( 'ctrl_2' )
>>>
>>> print("type(s2Col) = {}".format(type(s2Col)) )
>>>
>>>
>>>
>>> type(s2Col) = 
>>>
>>>
>>>
>>> allDF4 = df1.select( ["*", s2Col] )
>>>
>>> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
>>>  in select(self, *cols)
>>>
>>> *   1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)]
>>>
>>> *   1668* """
>>>
>>> -> 1669 jdf = self._jdf.select(self._jc

Re: measure running time

2021-12-24 Thread Sean Owen
You probably did not install it on your cluster, nor included the python
package with your app

On Fri, Dec 24, 2021, 4:35 AM  wrote:

> but I already installed it:
>
> Requirement already satisfied: sparkmeasure in
> /usr/local/lib/python2.7/dist-packages
>
> so how? thank you.
>
> On 2021-12-24 18:15, Hollis wrote:
> > Hi bitfox,
> >
> > you need pip install sparkmeasure firstly. then can lanch in pysaprk.
> >
>  from sparkmeasure import StageMetrics
>  stagemetrics = StageMetrics(spark)
>  stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*)
> > from range(1000) cross join range(1000) cross join
> > range(100)").show()')
> > +-+
> >
> > | count(1)|
> > +-+
> > |1|
> > +-+
> >
> > Regards,
> > Hollis
> >
> > At 2021-12-24 09:18:19, bit...@bitfox.top wrote:
> >> Hello list,
> >>
> >> I run with Spark 3.2.0
> >>
> >> After I started pyspark with:
> >> $ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
> >>
> >> I can't load from the module sparkmeasure:
> >>
> > from sparkmeasure import StageMetrics
> >> Traceback (most recent call last):
> >>   File "", line 1, in 
> >> ModuleNotFoundError: No module named 'sparkmeasure'
> >>
> >> Do you know why? @Luca thanks.
> >>
> >>
> >> On 2021-12-24 04:20, bit...@bitfox.top wrote:
> >>> Thanks Gourav and Luca. I will try with the tools you provide in
> > the
> >>> Github.
> >>>
> >>> On 2021-12-23 23:40, Luca Canali wrote:
>  Hi,
> 
>  I agree with Gourav that just measuring execution time is a
> > simplistic
>  approach that may lead you to miss important details, in
> > particular
>  when running distributed computations.
> 
>  WebUI, REST API, and metrics instrumentation in Spark can be quite
>  useful for further drill down. See
>  https://spark.apache.org/docs/latest/monitoring.html
> 
>  You can also have a look at this tool that takes care of
> > automating
>  collecting and aggregating some executor task metrics:
>  https://github.com/LucaCanali/sparkMeasure
> 
>  Best,
> 
>  Luca
> 
>  From: Gourav Sengupta 
>  Sent: Thursday, December 23, 2021 14:23
>  To: bit...@bitfox.top
>  Cc: user 
>  Subject: Re: measure running time
> 
>  Hi,
> 
>  I do not think that such time comparisons make any sense at all in
>  distributed computation. Just saying that an operation in RDD and
>  Dataframe can be compared based on their start and stop time may
> > not
>  provide any valid information.
> 
>  You will have to look into the details of timing and the steps.
> > For
>  example, please look at the SPARK UI to see how timings are
> > calculated
>  in distributed computing mode, there are several well written
> > papers
>  on this.
> 
>  Thanks and Regards,
> 
>  Gourav Sengupta
> 
>  On Thu, Dec 23, 2021 at 10:57 AM  wrote:
> 
> > hello community,
> >
> > In pyspark how can I measure the running time to the command?
> > I just want to compare the running time of the RDD API and
> > dataframe
> >
> > API, in my this blog:
> >
> 
> >
> https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/
> >
> > I tried spark.time() it doesn't work.
> > Thank you.
> >
> >
> 
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >>>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe's storage size

2021-12-24 Thread Sean Owen
I assume it means size in memory when cached, which does make sense.
Fastest thing is to look at it in the UI Storage tab after it is cached.

On Fri, Dec 24, 2021, 4:54 AM Gourav Sengupta 
wrote:

> Hi,
>
> This question, once again like the last one, does not make much sense at
> all. Where are you trying to store the data frame, and how?
>
> Are you just trying to write a blog, as you were mentioning in an earlier
> email, and trying to fill in some gaps? I think that the questions are
> entirely wrong.
>
> Regards,
> Gourav Sengupta
>
> On Fri, Dec 24, 2021 at 2:04 AM  wrote:
>
>> Hello
>>
>> Is it possible to know a dataframe's total storage size in bytes? such
>> as:
>>
>> >>> df.size()
>> Traceback (most recent call last):
>>File "", line 1, in 
>>File "/opt/spark/python/pyspark/sql/dataframe.py", line 1660, in
>> __getattr__
>>  "'%s' object has no attribute '%s'" % (self.__class__.__name__,
>> name))
>> AttributeError: 'DataFrame' object has no attribute 'size'
>>
>> Sure it won't work. but if there is such a method that would be great.
>>
>> Thanks.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Sean Owen
Nah, it's going to translate to the same plan as the equivalent SQL.

On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta 
wrote:

> Hi,
>
> please note that using SQL is much more performant, and easier to manage
> these kind of issues. You might want to look at the SPARK UI to see the
> advantage of using SQL over dataframes API.
>
>
> Regards,
> Gourav Sengupta
>
> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson 
> wrote:
>
>> Thanks Nicholas
>>
>>
>>
>> Andy
>>
>>
>>
>> *From: *Nicholas Gustafson 
>> *Date: *Friday, December 17, 2021 at 6:12 PM
>> *To: *Andrew Davidson 
>> *Cc: *"user@spark.apache.org" 
>> *Subject: *Re: AnalysisException: Trouble using select() to append
>> multiple columns
>>
>>
>>
>> Since df1 and df2 are different DataFrames, you will need to use a join.
>> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
>> on=[“Name”])
>>
>>
>>
>> On Dec 17, 2021, at 16:25, Andrew Davidson 
>> wrote:
>>
>> 
>>
>> Hi I am a newbie
>>
>>
>>
>> I have 16,000 data files, all files have the same number of rows and
>> columns. The row ids are identical and are in the same order. I want to
>> create a new data frame that contains the 3rd column from each data file
>>
>>
>>
>> I wrote a test program that uses a for loop and Join. It works with my
>> small test set. I get an OOM when I try to run using the all the data
>> files. I realize that join ( map reduce) is probably not a great solution
>> for my problem
>>
>>
>>
>> Recently I found several articles that take about the challenge with
>> using withColumn() and talk about how to use select() to append columns
>>
>>
>>
>> https://mungingdata.com/pyspark/select-add-columns-withcolumn/
>>
>>
>> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop
>>
>>
>>
>> I am using pyspark spark-3.1.2-bin-hadoop3.2
>>
>>
>>
>> I wrote a little test program. It am able to append columns created using
>> pyspark.sql.function.lit(). I am not able to append columns from other data
>> frames
>>
>>
>>
>> df1
>>
>> DataFrame[Name: string, ctrl_1: double]
>>
>> +---+--+
>>
>> |   Name|ctrl_1|
>>
>> +---+--+
>>
>> | txId_1|   0.0|
>>
>> | txId_2|  11.0|
>>
>> | txId_3|  12.0|
>>
>> | txId_4|  13.0|
>>
>> | txId_5|  14.0|
>>
>> | txId_6|  15.0|
>>
>> | txId_7|  16.0|
>>
>> | txId_8|  17.0|
>>
>> | txId_9|  18.0|
>>
>> |txId_10|  19.0|
>>
>> +---+--+
>>
>>
>>
>> # use select to append multiple literals
>>
>> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"),
>> pyf.lit("mn0").alias("y")] )
>>
>>
>>
>> allDF3
>>
>> DataFrame[Name: string, ctrl_1: double, x: string, y: string]
>>
>> +---+--+---+---+
>>
>> |   Name|ctrl_1|  x|  y|
>>
>> +---+--+---+---+
>>
>> | txId_1|   0.0|abc|mn0|
>>
>> | txId_2|  11.0|abc|mn0|
>>
>> | txId_3|  12.0|abc|mn0|
>>
>> | txId_4|  13.0|abc|mn0|
>>
>> | txId_5|  14.0|abc|mn0|
>>
>> | txId_6|  15.0|abc|mn0|
>>
>> | txId_7|  16.0|abc|mn0|
>>
>> | txId_8|  17.0|abc|mn0|
>>
>> | txId_9|  18.0|abc|mn0|
>>
>> |txId_10|  19.0|abc|mn0|
>>
>> +---+--+---+---+
>>
>>
>>
>> df2
>>
>> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
>> NumReads: double]
>>
>> +---+--+---+++
>>
>> |   Name|Length|EffectiveLength| TPM|NumReads|
>>
>> +---+--+---+++
>>
>> | txId_1|  1500| 1234.5|12.1| 0.1|
>>
>> | txId_2|  1510| 1244.5|13.1|11.1|
>>
>> | txId_3|  1520| 1254.5|14.1|12.1|
>>
>> | txId_4|  1530| 1264.5|15.1|13.1|
>>
>> | txId_5|  1540| 1274.5|16.1|14.1|
>>
>> | txId_6|  1550| 1284.5|17.1|15.1|
>>
>> | txId_7|  1560| 1294.5|18.1|16.1|
>>
>> | txId_8|  1570| 1304.5|19.1|17.1|
>>
>> | txId_9|  1580| 1314.5|20.1|18.1|
>>
>> |txId_10|  1590| 1324.5|21.1|19.1|
>>
>> +---+--+---+++
>>
>>
>>
>> s2Col = df2["NumReads"].alias( 'ctrl_2' )
>>
>> print("type(s2Col) = {}".format(type(s2Col)) )
>>
>>
>>
>> type(s2Col) = 
>>
>>
>>
>> allDF4 = df1.select( ["*", s2Col] )
>>
>> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
>>  in select(self, *cols)
>>
>> *   1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)]
>>
>> *   1668* """
>>
>> -> 1669 jdf = self._jdf.select(self._jcols(*cols))
>>
>> *   1670* return DataFrame(jdf, self.sql_ctx)
>>
>> *   1671*
>>
>>
>>
>> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>>  in __call__(self, *args)
>>
>> *   1303* answer = self.gateway_client.send_command(command)
>>
>> *   1304* return_value = get_return_value(
>>
>> -> 1305 answer, self.gateway_client, self.target_id, self.name)
>>
>> *   1306*
>>
>> *   1307* for temp_arg in temp_args:
>>
>>
>>
>> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
>>  in

Re: Conda Python Env in K8S

2021-12-24 Thread Hyukjin Kwon
Can you share the logs, settings, environment, etc. and file a JIRA? There
are integration test cases for K8S support, and I myself also tested it
before.
It would be helpful if you try what I did at
https://databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html
and see if it works.

On Mon, 6 Dec 2021 at 17:22, Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi Mich,
>
>
>
> Thanks for your response. Yes –py-files options works. I also tested it.
>
> The question is why the –archives option doesn’t?
>
>
>
> From Jira I can see that it should be available since 3.1.0:
>
>
>
> https://issues.apache.org/jira/browse/SPARK-33530
>
> https://issues.apache.org/jira/browse/SPARK-33615
>
>
>
> Best,
>
> Meikel
>
>
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Samstag, 4. Dezember 2021 18:36
> *To:* Bode, Meikel, NMA-CFD 
> *Cc:* dev ; user@spark.apache.org
> *Subject:* Re: Conda Python Env in K8S
>
>
>
>
> Hi Meikel
>
>
>
> In the past I tried with
>
>
>
>--py-files
> hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/DSBQ.zip \
>
>--archives
> hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/pyspark_venv.zip#pyspark_venv \
>
>
>
> which is basically what you are doing. the first line --py-files works but
> the second one fails
>
>
>
> It tried to unpack them ? It tries to unpack them
>
>
>
> Unpacking an archive hdfs://
> 50.140.197.220:9000/minikube/codes/pyspark_venv.zip#pyspark_venv
> 
>  from
> /tmp/spark-502a5b57-0fe6-45bd-867d-9738e678e9a3/pyspark_venv.zip to
> /opt/spark/work-dir/./pyspark_venv
>
>
>
> But it failed.
>
>
>
> This could be due to creating the virtual environment inside the docker in
> the work-dir *o*r sometimes when there is not enough available memory to
> gunzip and untar the file, especially if your executors are built on
> cluster nodes with less memory than the driver node.
>
>
>
> However, The most convenient way to add additional packages to the docker
> image is to add them directly to the docker image at time of creating the
> image. So external packages are bundled as a part of my docker image
> because it is fixed and if an application requires those set of
> dependencies every time, they are there. Also note that every time you put
> RUN statement it creates an intermediate container and hence it increases
> build time. So reduce it as follows
>
> RUN pip install pyyaml numpy cx_Oracle --no-cache-dir
>
> The --no-cheche-dir option to pip is to prevent the downloaded binaries from 
> being added to the image, reducing the image size. It is also advisable to 
> install all packages in one line. Every time you put RUN statement it creates 
> an intermediate container and hence it increases the build time. So reduce it 
> by putting all packages in one line.
>
> Log in to the docker image and check for Python packages installed
>
> docker run -u 0 -it 
> spark/spark-py:3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages bash
>
> root@5bc049af7278:/opt/spark/work-dir# pip list
>
> PackageVersion
>
> -- ---
>
> cx-Oracle  8.3.0
>
> numpy  1.21.4
>
> pip21.3.1
>
> PyYAML 6.0
>
> setuptools 59.4.0
>
> wheel  0.34.2
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Sat, 4 Dec 2021 at 07:52, Bode, Meikel, NMA-CFD <
> meikel.b...@bertelsmann.de> wrote:
>
> Hi Mich,
>
>
>
> sure thats possible. But distributing the complete env would be more
> practical.
>
> A workaround at the moment is, that we build different environments and
> store them in a pv and then we mount it into the pods and refer from the
> SparkApplication resource to the desired env..
>
>
>
> But actually these options exist and I want to understand what the issue
> is…
>
> Any hints on that?
>
>
>
> Best,
>
> Meikel
>
>
>
> *

Re: About some Spark technical help

2021-12-24 Thread sam smith
Hi Gourav,

Good question! that's the programming language i am most proficient at.
You are always welcome to suggest corrective remarks about my (Spark) code.

Kind regards.

Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta  a
écrit :

> Hi,
>
> out of sheer and utter curiosity, why JAVA?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Dec 23, 2021 at 5:10 PM sam smith 
> wrote:
>
>> Hi Andrew,
>>
>> Thanks, here's the Github repo to the code and the publication :
>> https://github.com/SamSmithDevs10/paperReplicationForReview
>>
>> Kind regards
>>
>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
>> écrit :
>>
>>> Hi Sam
>>>
>>>
>>>
>>> Can you tell us more? What is the algorithm? Can you send us the URL the
>>> publication
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> *From: *sam smith 
>>> *Date: *Wednesday, December 22, 2021 at 10:59 AM
>>> *To: *"user@spark.apache.org" 
>>> *Subject: *About some Spark technical help
>>>
>>>
>>>
>>> Hello guys,
>>>
>>>
>>>
>>> I am replicating a paper's algorithm in Spark / Java, and want to ask
>>> you guys for some assistance to validate / review about 150 lines of code.
>>> My github repo contains both my java class and the related paper,
>>>
>>>
>>>
>>> Any interested reviewer here ?
>>>
>>>
>>>
>>>
>>>
>>> Thanks.
>>>
>>


Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-24 Thread Gourav Sengupta
Hi,

please note that using SQL is much more performant, and easier to manage
these kind of issues. You might want to look at the SPARK UI to see the
advantage of using SQL over dataframes API.


Regards,
Gourav Sengupta

On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson 
wrote:

> Thanks Nicholas
>
>
>
> Andy
>
>
>
> *From: *Nicholas Gustafson 
> *Date: *Friday, December 17, 2021 at 6:12 PM
> *To: *Andrew Davidson 
> *Cc: *"user@spark.apache.org" 
> *Subject: *Re: AnalysisException: Trouble using select() to append
> multiple columns
>
>
>
> Since df1 and df2 are different DataFrames, you will need to use a join.
> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
> on=[“Name”])
>
>
>
> On Dec 17, 2021, at 16:25, Andrew Davidson 
> wrote:
>
> 
>
> Hi I am a newbie
>
>
>
> I have 16,000 data files, all files have the same number of rows and
> columns. The row ids are identical and are in the same order. I want to
> create a new data frame that contains the 3rd column from each data file
>
>
>
> I wrote a test program that uses a for loop and Join. It works with my
> small test set. I get an OOM when I try to run using the all the data
> files. I realize that join ( map reduce) is probably not a great solution
> for my problem
>
>
>
> Recently I found several articles that take about the challenge with using
> withColumn() and talk about how to use select() to append columns
>
>
>
> https://mungingdata.com/pyspark/select-add-columns-withcolumn/
>
>
> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop
>
>
>
> I am using pyspark spark-3.1.2-bin-hadoop3.2
>
>
>
> I wrote a little test program. It am able to append columns created using
> pyspark.sql.function.lit(). I am not able to append columns from other data
> frames
>
>
>
> df1
>
> DataFrame[Name: string, ctrl_1: double]
>
> +---+--+
>
> |   Name|ctrl_1|
>
> +---+--+
>
> | txId_1|   0.0|
>
> | txId_2|  11.0|
>
> | txId_3|  12.0|
>
> | txId_4|  13.0|
>
> | txId_5|  14.0|
>
> | txId_6|  15.0|
>
> | txId_7|  16.0|
>
> | txId_8|  17.0|
>
> | txId_9|  18.0|
>
> |txId_10|  19.0|
>
> +---+--+
>
>
>
> # use select to append multiple literals
>
> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"),
> pyf.lit("mn0").alias("y")] )
>
>
>
> allDF3
>
> DataFrame[Name: string, ctrl_1: double, x: string, y: string]
>
> +---+--+---+---+
>
> |   Name|ctrl_1|  x|  y|
>
> +---+--+---+---+
>
> | txId_1|   0.0|abc|mn0|
>
> | txId_2|  11.0|abc|mn0|
>
> | txId_3|  12.0|abc|mn0|
>
> | txId_4|  13.0|abc|mn0|
>
> | txId_5|  14.0|abc|mn0|
>
> | txId_6|  15.0|abc|mn0|
>
> | txId_7|  16.0|abc|mn0|
>
> | txId_8|  17.0|abc|mn0|
>
> | txId_9|  18.0|abc|mn0|
>
> |txId_10|  19.0|abc|mn0|
>
> +---+--+---+---+
>
>
>
> df2
>
> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
> NumReads: double]
>
> +---+--+---+++
>
> |   Name|Length|EffectiveLength| TPM|NumReads|
>
> +---+--+---+++
>
> | txId_1|  1500| 1234.5|12.1| 0.1|
>
> | txId_2|  1510| 1244.5|13.1|11.1|
>
> | txId_3|  1520| 1254.5|14.1|12.1|
>
> | txId_4|  1530| 1264.5|15.1|13.1|
>
> | txId_5|  1540| 1274.5|16.1|14.1|
>
> | txId_6|  1550| 1284.5|17.1|15.1|
>
> | txId_7|  1560| 1294.5|18.1|16.1|
>
> | txId_8|  1570| 1304.5|19.1|17.1|
>
> | txId_9|  1580| 1314.5|20.1|18.1|
>
> |txId_10|  1590| 1324.5|21.1|19.1|
>
> +---+--+---+++
>
>
>
> s2Col = df2["NumReads"].alias( 'ctrl_2' )
>
> print("type(s2Col) = {}".format(type(s2Col)) )
>
>
>
> type(s2Col) = 
>
>
>
> allDF4 = df1.select( ["*", s2Col] )
>
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
>  in select(self, *cols)
>
> *   1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)]
>
> *   1668* """
>
> -> 1669 jdf = self._jdf.select(self._jcols(*cols))
>
> *   1670* return DataFrame(jdf, self.sql_ctx)
>
> *   1671*
>
>
>
> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>
> *   1303* answer = self.gateway_client.send_command(command)
>
> *   1304* return_value = get_return_value(
>
> -> 1305 answer, self.gateway_client, self.target_id, self.name)
>
> *   1306*
>
> *   1307* for temp_arg in temp_args:
>
>
>
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
>  in deco(*a, **kw)
>
> *115* # Hide where the exception came from that shows a 
> non-Pythonic
>
> *116* # JVM exception message.
>
> --> 117 raise converted from None
>
> *118* else:
>
> *119* raise
>
>
>
> AnalysisException: Resolved attribute(s) NumReads#14 missing from 
> N

Re: Unable to use WriteStream to write to delta file.

2021-12-24 Thread Gourav Sengupta
Hi,

also please ensure that you have read all the required documentation to
understand whether you need to do any metadata migration or not.


Regards,
Gourav Sengupta

On Sun, Dec 19, 2021 at 11:55 AM Alex Ott  wrote:

> Make sure that you're using compatible version of Delta Lake library. For
> Spark 3.2 it's 1.1.0:
> https://github.com/delta-io/delta/releases/tag/v1.1.0
> For other Spark versions, check this table:
> https://docs.delta.io/latest/releases.html
>
> On Fri, Dec 17, 2021 at 2:36 PM Stelios Philippou 
> wrote:
>
>> Hi Abhinav,
>>
>> Using ReadStream or Read will not mind.
>>
>> The following error
>> java.lang.NoSuchMethodError:
>> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(
>>
>> states that you are using different version of Spark at someplace of your
>> project or you are using an older component
>>
>> Please check your Spark Binaries and as well as your pom that you are
>> indeed using the same versions.
>>
>> On Fri, 17 Dec 2021 at 15:11, Abhinav Gundapaneni
>>  wrote:
>>
>>> Hello Spark community,
>>>
>>>
>>>
>>> I’m using Apache spark(version 3.2) to read a CSV file to a dataframe
>>> using ReadStream, process the dataframe and write the dataframe to Delta
>>> file using WriteStream. I’m getting a failure during the WriteStream
>>> process. I’m trying to run the script locally in my windows 11 machine.
>>> Below is the stack trace of the error I’m facing. Please let me know if
>>> there’s anything that I’m missing.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V
>>>
>>> at
>>> org.apache.spark.sql.delta.schema.SchemaUtils$.checkFieldNames(SchemaUtils.scala:958)
>>>
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata(OptimisticTransaction.scala:216)
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransactionImpl.verifyNewMetadata$(OptimisticTransaction.scala:214)
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransaction.verifyNewMetadata(OptimisticTransaction.scala:80)
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:208)
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:195)
>>>
>>> at
>>> org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:80)
>>>
>>> at
>>> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:101)
>>>
>>> at
>>> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:62)
>>>
>>> at
>>> org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)
>>>
>>>
>>> at
>>> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:59)
>>>
>>> at
>>> org.apache.spark.sql.delta.schema.ImplicitMetadataOperation.updateMetadata$(ImplicitMetadataOperation.scala:50)
>>>
>>> at
>>> org.apache.spark.sql.delta.sources.DeltaSink.updateMetadata(DeltaSink.scala:37)
>>>
>>>
>>> at
>>> org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:80)
>>>
>>>
>>> at
>>> org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
>>>
>>> at
>>> org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
>>>
>>> at
>>> org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>>>
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>>>
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>>>
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>>>
>>> at
>>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>>
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>>
>>> at
>>> org.apache.spark.sql.execution.

Re: About some Spark technical help

2021-12-24 Thread Gourav Sengupta
Hi,

out of sheer and utter curiosity, why JAVA?

Regards,
Gourav Sengupta

On Thu, Dec 23, 2021 at 5:10 PM sam smith 
wrote:

> Hi Andrew,
>
> Thanks, here's the Github repo to the code and the publication :
> https://github.com/SamSmithDevs10/paperReplicationForReview
>
> Kind regards
>
> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson  a
> écrit :
>
>> Hi Sam
>>
>>
>>
>> Can you tell us more? What is the algorithm? Can you send us the URL the
>> publication
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>> *From: *sam smith 
>> *Date: *Wednesday, December 22, 2021 at 10:59 AM
>> *To: *"user@spark.apache.org" 
>> *Subject: *About some Spark technical help
>>
>>
>>
>> Hello guys,
>>
>>
>>
>> I am replicating a paper's algorithm in Spark / Java, and want to ask you
>> guys for some assistance to validate / review about 150 lines of code. My
>> github repo contains both my java class and the related paper,
>>
>>
>>
>> Any interested reviewer here ?
>>
>>
>>
>>
>>
>> Thanks.
>>
>


Re: Dataframe's storage size

2021-12-24 Thread Gourav Sengupta
Hi,

This question, once again like the last one, does not make much sense at
all. Where are you trying to store the data frame, and how?

Are you just trying to write a blog, as you were mentioning in an earlier
email, and trying to fill in some gaps? I think that the questions are
entirely wrong.

Regards,
Gourav Sengupta

On Fri, Dec 24, 2021 at 2:04 AM  wrote:

> Hello
>
> Is it possible to know a dataframe's total storage size in bytes? such
> as:
>
> >>> df.size()
> Traceback (most recent call last):
>File "", line 1, in 
>File "/opt/spark/python/pyspark/sql/dataframe.py", line 1660, in
> __getattr__
>  "'%s' object has no attribute '%s'" % (self.__class__.__name__,
> name))
> AttributeError: 'DataFrame' object has no attribute 'size'
>
> Sure it won't work. but if there is such a method that would be great.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: measure running time

2021-12-24 Thread Gourav Sengupta
Hi,

There are too many blogs out there with absolutely no value. Before writing
another blog, which does not make much sense by doing run time comparisons
between RDD and dataframes (as stated earlier), it may be  useful to first
understand what you are trying to achieve by writing this blog.

Then perhaps based on that you may want to look at different options.


Regards,
Gourav Sengupta



On Fri, Dec 24, 2021 at 10:42 AM  wrote:

> As you see below:
>
> $ pip install sparkmeasure
> Collecting sparkmeasure
>Using cached
>
> https://files.pythonhosted.org/packages/9f/bf/c9810ff2d88513ffc185e65a3ab9df6121ad5b4c78aa8d134a06177f9021/sparkmeasure-0.14.0-py2.py3-none-any.whl
> Installing collected packages: sparkmeasure
> Successfully installed sparkmeasure-0.14.0
>
>
> $ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
> Python 3.6.9 (default, Jan 26 2021, 15:33:00)
> [GCC 8.4.0] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> ..
> >>> from sparkmeasure import StageMetrics
> Traceback (most recent call last):
>File "", line 1, in 
> ModuleNotFoundError: No module named 'sparkmeasure'
>
>
> That doesn't work still.
> I run spark 3.2.0 on an ubuntu system.
>
> Regards.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: measure running time

2021-12-24 Thread bitfox

As you see below:

$ pip install sparkmeasure
Collecting sparkmeasure
  Using cached 
https://files.pythonhosted.org/packages/9f/bf/c9810ff2d88513ffc185e65a3ab9df6121ad5b4c78aa8d134a06177f9021/sparkmeasure-0.14.0-py2.py3-none-any.whl

Installing collected packages: sparkmeasure
Successfully installed sparkmeasure-0.14.0


$ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
Python 3.6.9 (default, Jan 26 2021, 15:33:00)
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
..

from sparkmeasure import StageMetrics

Traceback (most recent call last):
  File "", line 1, in 
ModuleNotFoundError: No module named 'sparkmeasure'


That doesn't work still.
I run spark 3.2.0 on an ubuntu system.

Regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: measure running time

2021-12-24 Thread bitfox

but I already installed it:

Requirement already satisfied: sparkmeasure in 
/usr/local/lib/python2.7/dist-packages


so how? thank you.

On 2021-12-24 18:15, Hollis wrote:

Hi bitfox,

you need pip install sparkmeasure firstly. then can lanch in pysaprk.


from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*)

from range(1000) cross join range(1000) cross join
range(100)").show()')
+-+

| count(1)|
+-+
|1|
+-+

Regards,
Hollis

At 2021-12-24 09:18:19, bit...@bitfox.top wrote:

Hello list,

I run with Spark 3.2.0

After I started pyspark with:
$ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17

I can't load from the module sparkmeasure:


from sparkmeasure import StageMetrics

Traceback (most recent call last):
  File "", line 1, in 
ModuleNotFoundError: No module named 'sparkmeasure'

Do you know why? @Luca thanks.


On 2021-12-24 04:20, bit...@bitfox.top wrote:

Thanks Gourav and Luca. I will try with the tools you provide in

the

Github.

On 2021-12-23 23:40, Luca Canali wrote:

Hi,

I agree with Gourav that just measuring execution time is a

simplistic

approach that may lead you to miss important details, in

particular

when running distributed computations.

WebUI, REST API, and metrics instrumentation in Spark can be quite
useful for further drill down. See
https://spark.apache.org/docs/latest/monitoring.html

You can also have a look at this tool that takes care of

automating

collecting and aggregating some executor task metrics:
https://github.com/LucaCanali/sparkMeasure

Best,

Luca

From: Gourav Sengupta 
Sent: Thursday, December 23, 2021 14:23
To: bit...@bitfox.top
Cc: user 
Subject: Re: measure running time

Hi,

I do not think that such time comparisons make any sense at all in
distributed computation. Just saying that an operation in RDD and
Dataframe can be compared based on their start and stop time may

not

provide any valid information.

You will have to look into the details of timing and the steps.

For

example, please look at the SPARK UI to see how timings are

calculated

in distributed computing mode, there are several well written

papers

on this.

Thanks and Regards,

Gourav Sengupta

On Thu, Dec 23, 2021 at 10:57 AM  wrote:


hello community,

In pyspark how can I measure the running time to the command?
I just want to compare the running time of the RDD API and

dataframe


API, in my this blog:




https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/


I tried spark.time() it doesn't work.
Thank you.





-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re:Re: measure running time

2021-12-24 Thread Hollis
Hi bitfox,


you need pip install sparkmeasure firstly. then can lanch in pysaprk.


| >>> from sparkmeasure import StageMetrics
>>> stagemetrics = StageMetrics(spark)
>>> stagemetrics.runandmeasure(locals(), 'spark.sql("select count(*) from 
>>> range(1000) cross join range(1000) cross join range(100)").show()')
+-+ 
| count(1)|
+-+
|1|
+-+



|


Regards,
Hollis






At 2021-12-24 09:18:19, bit...@bitfox.top wrote:
>Hello list,
>
>I run with Spark 3.2.0
>
>After I started pyspark with:
>$ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
>
>I can't load from the module sparkmeasure:
>
 from sparkmeasure import StageMetrics
>Traceback (most recent call last):
>   File "", line 1, in 
>ModuleNotFoundError: No module named 'sparkmeasure'
>
>Do you know why? @Luca thanks.
>
>
>On 2021-12-24 04:20, bit...@bitfox.top wrote:
>> Thanks Gourav and Luca. I will try with the tools you provide in the 
>> Github.
>> 
>> On 2021-12-23 23:40, Luca Canali wrote:
>>> Hi,
>>> 
>>> I agree with Gourav that just measuring execution time is a simplistic
>>> approach that may lead you to miss important details, in particular
>>> when running distributed computations.
>>> 
>>> WebUI, REST API, and metrics instrumentation in Spark can be quite
>>> useful for further drill down. See
>>> https://spark.apache.org/docs/latest/monitoring.html
>>> 
>>> You can also have a look at this tool that takes care of automating
>>> collecting and aggregating some executor task metrics:
>>> https://github.com/LucaCanali/sparkMeasure
>>> 
>>> Best,
>>> 
>>> Luca
>>> 
>>> From: Gourav Sengupta 
>>> Sent: Thursday, December 23, 2021 14:23
>>> To: bit...@bitfox.top
>>> Cc: user 
>>> Subject: Re: measure running time
>>> 
>>> Hi,
>>> 
>>> I do not think that such time comparisons make any sense at all in
>>> distributed computation. Just saying that an operation in RDD and
>>> Dataframe can be compared based on their start and stop time may not
>>> provide any valid information.
>>> 
>>> You will have to look into the details of timing and the steps. For
>>> example, please look at the SPARK UI to see how timings are calculated
>>> in distributed computing mode, there are several well written papers
>>> on this.
>>> 
>>> Thanks and Regards,
>>> 
>>> Gourav Sengupta
>>> 
>>> On Thu, Dec 23, 2021 at 10:57 AM  wrote:
>>> 
 hello community,
 
 In pyspark how can I measure the running time to the command?
 I just want to compare the running time of the RDD API and dataframe
 
 API, in my this blog:
 
>>> https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/
 
 I tried spark.time() it doesn't work.
 Thank you.
 
 
>>> -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org