Re: On adding applyInArrow to groupBy and cogroup

2023-11-03 Thread Abdeali Kothari
Seeing more support for arrow based functions would be great.
Gives more control to application developers. And so pandas just becomes 1
of the available options.

On Fri, 3 Nov 2023, 21:23 Luca Canali,  wrote:

> Hi Enrico,
>
>
>
> +1 on supporting Arrow on par with Pandas. Besides the frameworks and
> libraries that you mentioned I add awkward array, a library used in High
> Energy Physics
>
> (for those interested more details on how we tested awkward array with
> Spark from back when mapInArrow was introduced can be found at
> https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_MapInArrow.md
> )
>
>
>
> Cheers,
>
> Luca
>
>
>
> *From:* Enrico Minack 
> *Sent:* Thursday, October 26, 2023 15:33
> *To:* dev 
> *Subject:* On adding applyInArrow to groupBy and cogroup
>
>
>
> Hi devs,
>
> PySpark allows to transform a DataFrame via Pandas *and* Arrow API:
>
> df.mapInArrow(map_arrow, schema="...")
> df.mapInPandas(map_pandas, schema="...")
>
> For df.groupBy(...) and df.groupBy(...).cogroup(...), there is *only* a
> Pandas interface, no Arrow interface:
>
> df.groupBy("id").applyInPandas(apply_pandas, schema="...")
>
> Providing a pure Arrow interface allows user code to use *any*
> Arrow-based data framework, not only Pandas, e.g. Polars. Adding Arrow
> interfaces reduces the need to add more framework-specific support.
>
> We need your thoughts on whether PySpark should support Arrow on a par
> with Pandas, or not: https://github.com/apache/spark/pull/38624
>
> Cheers,
> Enrico
>


Re: Making spark plan UI interactive

2023-09-06 Thread Abdeali Kothari
I feel this pain frequently
Something more interactive would be great

On Wed, 6 Sep 2023 at 4:34 PM, Santosh Pingale
 wrote:

> Hey community
>
> Spark UI with the plan visualisation is an excellent resource for finding
> out crucial information about how your application is doing and what parts
> of the execution can still be optimized to fulfill time/resource
> constraints.
>
> The graph in its current form is sufficient for simpler applications. It
> starts to show limitations, however, when we have applications with complex
> logic and resultant humongous plan. The limitations are in the form of
> trying to navigate the complex graph further complicated by what can fit in
> your screen/viewport.
>
> To make it user friendly, we could try to make it interactive allowing
> users to decide on what area of the plan they want focus on. There are a
> couple of inspirations/examples for this.
> 1. https://tensorboard.dev/experiment/rldGbR8rRHeCEbkK61SWTQ/#graphs
> 2. https://cs.brown.edu/people/jcmace/d3/graph.html?id=small.json
>
> If we could achieve this then it will be an even more powerful resource
> for spark users.
>
> Curious what you think
> Santosh
>


Re: [DISCUSS] SPIP: Python Data Source API

2023-06-19 Thread Abdeali Kothari
I would definitely use it - is it's available :)

On Mon, 19 Jun 2023, 21:56 Jacek Laskowski,  wrote:

> Hi Allison and devs,
>
> Although I was against this idea at first sight (probably because I'm a
> Scala dev), I think it could work as long as there are people who'd be
> interested in such an API. Were there any? I'm just curious. I've seen no
> emails requesting it.
>
> I also doubt that Python devs would like to work on new data sources but
> support their wishes wholeheartedly :)
>
> Pozdrawiam,
> Jacek Laskowski
> 
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Fri, Jun 16, 2023 at 6:14 AM Allison Wang
>  wrote:
>
>> Hi everyone,
>>
>> I would like to start a discussion on “Python Data Source API”.
>>
>> This proposal aims to introduce a simple API in Python for Data Sources.
>> The idea is to enable Python developers to create data sources without
>> having to learn Scala or deal with the complexities of the current data
>> source APIs. The goal is to make a Python-based API that is simple and easy
>> to use, thus making Spark more accessible to the wider Python developer
>> community. This proposed approach is based on the recently introduced
>> Python user-defined table functions with extensions to support data sources.
>>
>> *SPIP Doc*:
>> https://docs.google.com/document/d/1oYrCKEKHzznljYfJO4kx5K_Npcgt1Slyfph3NEk7JRU/edit?usp=sharing
>>
>> *SPIP JIRA*: https://issues.apache.org/jira/browse/SPARK-44076
>>
>> Looking forward to your feedback.
>>
>> Thanks,
>> Allison
>>
>


Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Abdeali Kothari
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert,  wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>...: df1.explain()
>>>...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#20L]
>>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>  +- Filter isnotnull(id#19L)  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>...: df2.createOrReplaceTempView('df2')
>>>...: df3 = spark.sql("""
>>>...: SELECT df1.id, df1.col1, df2.col2
>>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>>...: """)
>>>...: df3.createOrReplaceTempView('df3')
>>>...: df4 = spark.sql("""
>>>...: SELECT df2.*, df3.*
>>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>>...: """)
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-05 Thread Abdeali Kothari
Just thought I'd do a quick bump and add the dev mailing list - in case
there is some insight there
Feels like this should be categorized as a bug for spark 3.2.0

On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
wrote:

> Hi,
> I am using pyspark for some projects. And one of the things we are doing
> is trying to find the tables/columns being used by Spark using the
> execution plan.
>
> When we upgrade to spark 3.2 - the spark plan seems to be different from
> previous versions - mainly when we are doing joins.
> Below is a reproducible example (you could run the same in versions 2.3 to
> 3.1 to see the difference)
>
> My original data frames have the columns: id#0 and id#4
> But after doing the joins we are seeing new columns id#34 and id#19 which
> are not created from the original dataframes I was working with.
> In previous versions of spark, this used to use a ReusedExchange step
> (shown below)
>
> I was trying to understand if this is expected in spark 3.2 where the
> execution plan seems to be creating a new data source which does not
> originate from df1 and df2 which I provided.
> NOTE: The same happens even if I read from parquet files
>
> In spark 3.2:
> In [1]: import pyspark
>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>
> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
> 'col2'])
>...: df1.explain()
>...: df2.explain()
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#0L,col1#1L]
>
> == Physical Plan ==
> *(1) Scan ExistingRDD[id#4L,col2#5L]
>
> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#53]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#20L]
>   +- SortMergeJoin [id#0L], [id#19L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#45]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>  +- Filter isnotnull(id#19L)  +- Scan
> ExistingRDD[id#19L,col2#20L]*
>
> In [4]: df1.createOrReplaceTempView('df1')
>...: df2.createOrReplaceTempView('df2')
>...: df3 = spark.sql("""
>...: SELECT df1.id, df1.col1, df2.col2
>...: FROM df1 JOIN df2 ON df1.id = df2.id
>...: """)
>...: df3.createOrReplaceTempView('df3')
>...: df4 = spark.sql("""
>...: SELECT df2.*, df3.*
>...: FROM df2 JOIN df3 ON df2.id = df3.id
>...: """)
>...: df4.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [id#4L], [id#0L], Inner
>:- Sort [id#4L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
> [id=#110]
>: +- Filter isnotnull(id#4L)
>:+- Scan ExistingRDD[id#4L,col2#5L]
>+- Project [id#0L, col1#1L, col2#35L]
>   +- SortMergeJoin [id#0L], [id#34L], Inner
>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#102]
>  : +- Filter isnotnull(id#0L)
>  :+- Scan ExistingRDD[id#0L,col1#1L]
>
>
>
> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>  +- Filter isnotnull(id#34L)  +- Scan
> ExistingRDD[id#34L,col2#35L]*
>
>
> Doing this in spark 3.1.1 - the plan is:
>
> *(8) SortMergeJoin [id#4L], [id#0L], Inner
> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
> : +- *(1) Filter isnotnull(id#4L)
> :+- *(1) Scan ExistingRDD[id#4L,col2#5L]
> +- *(7) Project [id#0L, col1#1L, col2#20L]
>+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>   :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
> [id=#62]
>   : +- *(3) Filter isnotnull(id#0L)
>   :+- *(3) Scan ExistingRDD[id#0L,col1#1L]
>
> *  +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +-
> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
> ENSURE_REQUIREMENTS, [id=#56]*
>
>


Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
In a bash terminal, can you do:
*export PYSPARK_DRIVER_PYTHON=/path/to/venv/bin/python*
and then:
run the *spark-shell* script ?

This should mimic the behaviour of jupyter in spark-shell and should be
fast (1-2mins similar to jupyter notebook)
This would confirm the guess that the python2.7 venv has some magic ^_^



On Wed, Sep 11, 2019 at 10:32 PM Dhrubajyoti Hati 
wrote:

> Also the performance remains identical when running the same script from
> jupyter terminal instead or normal terminal. In the script the spark
> context is created by
>
> spark = SparkSession \
> .builder \
> ..
> ..
> getOrCreate() command
>
>
> On Wed, Sep 11, 2019 at 10:28 PM Dhrubajyoti Hati 
> wrote:
>
>> If you say that libraries are not transferred by default and in my case I
>> haven't used any --py-files then just because the driver python is
>> different I have facing 6x speed difference ? I am using client mode to
>> submit the program but the udfs and all are executed in the executors, then
>> why is the difference so much?
>>
>> I tried the prints
>> For jupyter one the driver prints
>> ../../jupyter-folder/venv
>>
>> and executors print /usr
>>
>> For spark-submit both of them print /usr
>>
>> The cluster is created few years back and used organisation wide. So how
>> python 2.6.6 is installed, i honestly do not know.  I copied the whole
>> jupyter from org git repo as it was shared, so i do not know how the venv
>> was created or python for venv was created even.
>>
>> The os is CentOS release 6.9 (Final)
>>
>>
>>
>>
>>
>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>>
>>
>> On Wed, Sep 11, 2019 at 8:22 PM Abdeali Kothari 
>> wrote:
>>
>>> The driver python may not always be the same as the executor python.
>>> You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON
>>>
>>> The dependent libraries are not transferred by spark in any way unless
>>> you do a --py-files or .addPyFile()
>>>
>>> Could you try this:
>>> *import sys; print(sys.prefix)*
>>>
>>> on the driver, and also run this inside a UDF with:
>>>
>>> *def dummy(a):*
>>> *import sys; raise AssertionError(sys.prefix)*
>>>
>>> and get the traceback exception on the driver ?
>>> This would be the best way to get the exact sys.prefix (python path) for
>>> both the executors and driver.
>>>
>>> Also, could you elaborate on what environment is this ?
>>> Linux? - CentOS/Ubuntu/etc. ?
>>> How was the py 2.6.6 installed ?
>>> How was the py 2.7.5 venv created and how what the base py 2.7.5
>>> installed ?
>>>
>>> Also, how are you creating the Spark Session in jupyter ?
>>>
>>>
>>> On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
>>>> But would it be the case for multiple tasks running on the same worker
>>>> and also both the tasks are running in client mode, so the one true is true
>>>> for both or for neither. As mentioned earlier all the confs are same. I
>>>> have checked and compared each conf.
>>>>
>>>> As Abdeali mentioned it must be because the  way libraries are in both
>>>> the environments. Also i verified by running the same script for jupyter
>>>> environment and was able to get the same result using the normal script
>>>> which i was running with spark-submit.
>>>>
>>>> Currently i am searching for the ways the python packages are
>>>> transferred from driver to spark cluster in client mode. Any info on that
>>>> topic would be helpful.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> Are you running in cluster mode? A large virtualenv zip for the driver
>>>>> sent into the cluster on a slow pipe could account for much of that eight
>>>>> minutes.
>>>>>
>>>>> On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <
>>>>> dhruba.w...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I just ran the same script in a shell in jupyter notebook and find
>>>>>> the performance to be similar. So I can confirm this is because the
>>>>>> libraries used jupyter notebook python is different than th

Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Abdeali Kothari
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you
do a --py-files or .addPyFile()

Could you try this:
*import sys; print(sys.prefix)*

on the driver, and also run this inside a UDF with:

*def dummy(a):*
*import sys; raise AssertionError(sys.prefix)*

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for
both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ?
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati 
wrote:

> But would it be the case for multiple tasks running on the same worker and
> also both the tasks are running in client mode, so the one true is true for
> both or for neither. As mentioned earlier all the confs are same. I have
> checked and compared each conf.
>
> As Abdeali mentioned it must be because the  way libraries are in both the
> environments. Also i verified by running the same script for jupyter
> environment and was able to get the same result using the normal script
> which i was running with spark-submit.
>
> Currently i am searching for the ways the python packages are transferred
> from driver to spark cluster in client mode. Any info on that topic would
> be helpful.
>
> Thanks!
>
>
>
> On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, 
> wrote:
>
>> Are you running in cluster mode? A large virtualenv zip for the driver
>> sent into the cluster on a slow pipe could account for much of that eight
>> minutes.
>>
>> On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati 
>> wrote:
>>
>>> Hi,
>>>
>>> I just ran the same script in a shell in jupyter notebook and find the
>>> performance to be similar. So I can confirm this is because the libraries
>>> used jupyter notebook python is different than the spark-submit python this
>>> is happening.
>>>
>>> But now I have a following question. Are the dependent libraries in a
>>> python script also transferred to the worker machines when executing a
>>> python script in spark. Because though the driver python versions are
>>> different, the workers machines will use their same python environment to
>>> run the code. If anyone can explain this part, it would be helpful.
>>>
>>>
>>>
>>>
>>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>>>
>>>
>>> On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati 
>>> wrote:
>>>
>>>> Just checked from where the script is submitted i.e. wrt Driver, the
>>>> python env are different. Jupyter one is running within a the virtual
>>>> environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But
>>>> the executors have the same python version right? I tried doing a
>>>> spark-submit from jupyter shell, it fails to find python 2.7  which is not
>>>> there hence throws error.
>>>>
>>>> Here is the udf which might take time:
>>>>
>>>> import base64
>>>> import zlib
>>>>
>>>> def decompress(data):
>>>>
>>>> bytecode = base64.b64decode(data)
>>>> d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>>>> decompressed_data = d.decompress(bytecode )
>>>> return(decompressed_data.decode('utf-8'))
>>>>
>>>>
>>>> Could this because of the two python environment mismatch from Driver 
>>>> side? But the processing
>>>>
>>>> happens in the executor side?
>>>>
>>>>
>>>>
>>>>
>>>> *Regards,Dhrub*
>>>>
>>>> On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <
>>>> abdealikoth...@gmail.com> wrote:
>>>>
>>>>> Maybe you can try running it in a python shell or
>>>>> jupyter-console/ipython instead of a spark-submit and check how much time
>>>>> it takes too.
>>>>>
>>>>> Compare the env variables to check that no additional env
>>>>> configuration is present in either environment.
>>>>>
>>>>> Also is the python environment for both the exact same? I ask because
>>>>> it looks like you're using a UDF and if the J

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Abdeali Kothari
Maybe you can try running it in a python shell or jupyter-console/ipython
instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is
present in either environment.

Also is the python environment for both the exact same? I ask because it
looks like you're using a UDF and if the Jupyter python has (let's say)
numpy compiled with blas it would be faster than a numpy without it. Etc.
I.E. Some library you use may be using pure python and another may be using
a faster C extension...

What python libraries are you using in the UDFs? It you don't use UDFs at
all and use some very simple pure spark functions does the time difference
still exist?

Also are you using dynamic allocation or some similar spark config which
could vary performance between runs because the same resources we're not
utilized on Jupyter / spark-submit?


On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:

> Sounds like you have done your homework to properly compare .   I'm
> guessing the answer to the following is yes .. but in any case:  are they
> both running against the same spark cluster with the same configuration
> parameters especially executor memory and number of workers?
>
> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
> dhruba.w...@gmail.com>:
>
>> No, i checked for that, hence written "brand new" jupyter notebook. Also
>> the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs
>> compressed base64 encoded text data from a hive table and decompressing and
>> decoding in one of the udfs. Also the time compared is from Spark UI not
>> how long the job actually takes after submission. Its just the running time
>> i am comparing/mentioning.
>>
>> As mentioned earlier, all the spark conf params even match in two scripts
>> and that's why i am puzzled what going on.
>>
>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, 
>> wrote:
>>
>>> It's not obvious from what you pasted, but perhaps the juypter notebook
>>> already is connected to a running spark context, while spark-submit needs
>>> to get a new spot in the (YARN?) queue.
>>>
>>> I would check the cluster job IDs for both to ensure you're getting new
>>> cluster tasks for each.
>>>
>>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
>>> wrote:
>>>
 Hi,

 I am facing a weird behaviour while running a python script. Here is
 what the code looks like mostly:

 def fn1(ip):
some code...
 ...

 def fn2(row):
 ...
 some operations
 ...
 return row1


 udf_fn1 = udf(fn1)
 cdf = spark.read.table("") //hive table is of size > 500 Gigs with
 ~4500 partitions
 ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
 .drop("colz") \
 .withColumnRenamed("colz", "coly")

 edf = ddf \
 .filter(ddf.colp == 'some_value') \
 .rdd.map(lambda row: fn2(row)) \
 .toDF()

 print edf.count() // simple way for the performance test in both
 platforms

 Now when I run the same code in a brand new jupyter notebook it runs 6x
 faster than when I run this python script using spark-submit. The
 configurations are printed and  compared from both the platforms and they
 are exact same. I even tried to run this script in a single cell of jupyter
 notebook and still have the same performance. I need to understand if I am
 missing something in the spark-submit which is causing the issue.  I tried
 to minimise the script to reproduce the same error without much code.

 Both are run in client mode on a yarn based spark cluster. The machines
 from which both are executed are also the same and from same user.

 What i found is the  the quantile values for median for one ran with
 jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
 able to figure out why this is happening.

 Any one faced this kind of issue before or know how to resolve this?

 *Regards,*
 *Dhrub*

>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>


Re: Resolving all JIRAs affecting EOL releases

2019-05-15 Thread Abdeali Kothari
Was thinking that getting an estimated statistic of the number of issues
that would be closed if this is done would help.

Open issues: 3882 (project = SPARK AND status in (Open, "In Progress",
Reopened))
Open + Does not affect 3.0+ = 2795
Open + Does not affect 2.4+ = 2373
Open + Does not affect 2.3+ = 1765
Open + Does not affect 2.2+ = 1322
Open + Does not affect 2.1+ = 967
Open + Does not affect 2.0+ = 651

Open + Does not affect 2.0+ + Priority in (Urgent, Blocker, Critical, High)
[JQL1] = 838
Open + Does not affect 2.0+ + Priority in (Urgent, Blocker, Critical, High,
Major) = 206
Open + Does not affect 2.2+ + Priority not in (Urgent, Blocker, Critical,
High) [JQL2] = 1303
Open + Does not affect 2.2+ + Priority not in (Urgent, Blocker, Critical,
High, Major) = 397
Open + Does not affect 2.3+ + Priority not in (Urgent, Blocker, Critical,
High) = 1743
Open + Does not affect 2.3+ + Priority not in (Urgent, Blocker, Critical,
High, Major) = 550

Resolving ALL seems a bit overkill to me.
My current opinion seems like:
 - Resolving "Open + Does not affect 2.0+" is something that should be
done, as I doubt anyone would be looking at the 1.x versions anymore (651
tasks)
 - Resolving "Open + Does not affect 2.3+ + Priority not in (Urgent,
Blocker, Critical, High, Major)" may be a good idea (an additional ~1k
tasks)
The issues with priority Urgent/Blocker/Critical should be triaged - as it
may have something important.


[JQL1]:
project = SPARK
 AND status in (Open, "In Progress", Reopened)
 AND NOT (affectedVersion in versionMatch("^[2-3].*"))
 AND priority NOT IN (Urgent, Blocker, Critical, High)

[JQL2]:
project = SPARK
 AND status in (Open, "In Progress", Reopened)
 AND NOT (affectedVersion in versionMatch("^3.*") OR affectedVersion in
versionMatch("^2.4.*") OR affectedVersion in versionMatch("^2.3.*") OR
affectedVersion in versionMatch("^2.2.*"))
 AND priority NOT IN (Urgent, Blocker, Critical, High)


On Wed, May 15, 2019, 14:55 Hyukjin Kwon  wrote:

> Hi all,
>
> I would like to propose to resolve all JIRAs that affects EOL releases -
> 2.2 and below. and affected version
> not specified. I was rather against this way and considered this as last
> resort in roughly 3 years ago
> when we discussed. Now I think we should go ahead with this. See below.
>
> I have been talking care of this for so long time almost every day those 3
> years. The number of JIRAs
> keeps increasing and it does never go down. Now the number is going over
> 2500 JIRAs.
> Did you guys know? in JIRA, we can only go through page by page up to 1000
> items. So, currently we're even
> having difficulties to go through every JIRA. We should manually filter
> out and check each.
> The number is going over the manageable size.
>
> I am not suggesting this without anything actually trying. This is what we
> have tried within my visibility:
>
>   1. In roughly 3 years ago, Sean tried to gather committers and even
> non-committers people to sort
> out this number. At that time, we were only able to keep this number
> as is. After we lost this momentum,
> it kept increasing back.
>   2. At least I scanned _all_ the previous JIRAs at least more than two
> times and resolved them. Roughly
> once a year. The rest of them are mostly obsolete but not enough
> information to investigate further.
>   3. I strictly stick to "Contributing to JIRA Maintenance"
> https://spark.apache.org/contributing.html and
> resolve JIRAs.
>   4. Promoting other people to comment on JIRA or actively resolve them.
>
> One of the facts I realised is the increasing number of committers doesn't
> virtually help this much (although
> it might be helpful if somebody active in JIRA becomes a committer.)
>
> One of the important thing I should note is that, it's now almost pretty
> difficult to reproduce and test the
> issues found in EOL releases. We should git clone, checkout, build and
> test. And then, see if that issue
> still exists in upstream, and fix. This is non-trivial overhead.
>
> Therefore, I would like to propose resolving _all_ the JIRAs that targets
> EOL releases - 2.2 and below.
> Please let me know if anyone has some concerns or objections.
>
> Thanks.
>


Re: PySpark syntax vs Pandas syntax

2019-03-26 Thread Abdeali Kothari
Nice, will test it out +1

On Tue, Mar 26, 2019, 22:38 Reynold Xin  wrote:

> We just made the repo public: https://github.com/databricks/spark-pandas
>
>
> On Tue, Mar 26, 2019 at 1:20 AM, Timothee Hunter  > wrote:
>
>> To add more details to what Reynold mentioned. As you said, there is
>> going to be some slight differences in any case between Pandas and Spark in
>> any case, simply because Spark needs to know the return types of the
>> functions. In your case, you would need to slightly refactor your apply
>> method to the following (in python 3) to add type hints:
>>
>> ```
>> def f(x) -> float: return x * 3.0
>> df['col3'] = df['col1'].apply(f)
>> ```
>>
>> This has the benefit of keeping your code fully compliant with both
>> pandas and pyspark. We will share more information in the future.
>>
>> Tim
>>
>> On Tue, Mar 26, 2019 at 8:08 AM Hyukjin Kwon  wrote:
>>
>> BTW, I am working on the documentation related with this subject at
>> https://issues.apache.org/jira/browse/SPARK-26022 to describe the
>> difference
>>
>> 2019년 3월 26일 (화) 오후 3:34, Reynold Xin 님이 작성:
>>
>> We have some early stuff there but not quite ready to talk about it in
>> public yet (I hope soon though). Will shoot you a separate email on it.
>>
>> On Mon, Mar 25, 2019 at 11:32 PM Abdeali Kothari <
>> abdealikoth...@gmail.com> wrote:
>>
>> Thanks for the reply Reynold - Has this shim project started ?
>> I'd love to contribute to it - as it looks like I have started making a
>> bunch of helper functions to do something similar for my current task and
>> would prefer not doing it in isolation.
>> Was considering making a git repo and pushing stuff there just today
>> morning. But if there's already folks working on it - I'd prefer
>> collaborating.
>>
>> Note - I'm not recommending we make the logical plan mutable (as I am
>> scared of that too!). I think there are other ways of handling that - but
>> we can go into details later.
>>
>> On Tue, Mar 26, 2019 at 11:58 AM Reynold Xin  wrote:
>>
>> We have been thinking about some of these issues. Some of them are harder
>> to do, e.g. Spark DataFrames are fundamentally immutable, and making the
>> logical plan mutable is a significant deviation from the current paradigm
>> that might confuse the hell out of some users. We are considering building
>> a shim layer as a separate project on top of Spark (so we can make rapid
>> releases based on feedback) just to test this out and see how well it could
>> work in practice.
>>
>> On Mon, Mar 25, 2019 at 11:04 PM Abdeali Kothari <
>> abdealikoth...@gmail.com> wrote:
>>
>> Hi,
>> I was doing some spark to pandas (and vice versa) conversion because some
>> of the pandas codes we have don't work on huge data. And some spark codes
>> work very slow on small data.
>>
>> It was nice to see that pyspark had some similar syntax for the common
>> pandas operations that the python community is used to.
>>
>> GroupBy aggs: df.groupby(['col2']).agg({'col2': 'count'}).show()
>> Column selects: df[['col1', 'col2']]
>> Row Filters: df[df['col1'] < 3.0]
>>
>> I was wondering about a bunch of other functions in pandas which seemed
>> common. And thought there must've been a discussion about it in the
>> community - hence started this thread.
>>
>> I was wondering whether there has been discussion on adding the following
>> functions:
>>
>> *Column setters*:
>> In Pandas:
>> df['col3'] = df['col1'] * 3.0
>> While I do the following in PySpark:
>> df = df.withColumn('col3', df['col1'] * 3.0)
>>
>> *Column apply()*:
>> In Pandas:
>> df['col3'] = df['col1'].apply(lambda x: x * 3.0)
>> While I do the following in PySpark:
>> df = df.withColumn('col3', F.udf(lambda x: x * 3.0, 'float')(df['col1']))
>>
>> I understand that this one cannot be as simple as in pandas due to the
>> output-type that's needed here. But could be done like:
>> df['col3'] = df['col1'].apply((lambda x: x * 3.0), 'float')
>>
>> Multi column in pandas is:
>> df['col3'] = df[['col1', 'col2']].apply(lambda x: x.col1 * 3.0)
>> Maybe this can be done in pyspark as or if we can send a pyspark.sql.Row
>> directly it would be similar (?):
>> df['col3'] = df[['col1', 'col2']].apply((lambda col1, col2: col1 * 3.0),
>> 'float')
>>
>> *Rename*:
>> In Pandas:
>> df.rename(columns={...})
>> While I do the following in PySpark:
>> df.toDF(*[{'col2': 'col3'}.get(i, i) for i in df.columns])
>>
>> *To Dictionary*:
>> In Pandas:
>> df.to_dict(orient='list')
>> While I do the following in PySpark:
>> {f.name: [row[i] for row in df.collect()] for i, f in
>> enumerate(df.schema.fields)}
>>
>> I thought I'd start the discussion with these and come back to some of
>> the others I see that could be helpful.
>>
>> *Note*: (with the column functions in mind) I understand the concept of
>> the DataFrame cannot be modified. And I am not suggesting we change that
>> nor any underlying principle. Just trying to add syntactic sugar here.
>>
>>
>


Re: PySpark syntax vs Pandas syntax

2019-03-26 Thread Abdeali Kothari
Thanks for the reply Reynold - Has this shim project started ?
I'd love to contribute to it - as it looks like I have started making a
bunch of helper functions to do something similar for my current task and
would prefer not doing it in isolation.
Was considering making a git repo and pushing stuff there just today
morning. But if there's already folks working on it - I'd prefer
collaborating.

Note - I'm not recommending we make the logical plan mutable (as I am
scared of that too!). I think there are other ways of handling that - but
we can go into details later.

On Tue, Mar 26, 2019 at 11:58 AM Reynold Xin  wrote:

> We have been thinking about some of these issues. Some of them are harder
> to do, e.g. Spark DataFrames are fundamentally immutable, and making the
> logical plan mutable is a significant deviation from the current paradigm
> that might confuse the hell out of some users. We are considering building
> a shim layer as a separate project on top of Spark (so we can make rapid
> releases based on feedback) just to test this out and see how well it could
> work in practice.
>
> On Mon, Mar 25, 2019 at 11:04 PM Abdeali Kothari 
> wrote:
>
>> Hi,
>> I was doing some spark to pandas (and vice versa) conversion because some
>> of the pandas codes we have don't work on huge data. And some spark codes
>> work very slow on small data.
>>
>> It was nice to see that pyspark had some similar syntax for the common
>> pandas operations that the python community is used to.
>>
>> GroupBy aggs: df.groupby(['col2']).agg({'col2': 'count'}).show()
>> Column selects: df[['col1', 'col2']]
>> Row Filters: df[df['col1'] < 3.0]
>>
>> I was wondering about a bunch of other functions in pandas which seemed
>> common. And thought there must've been a discussion about it in the
>> community - hence started this thread.
>>
>> I was wondering whether there has been discussion on adding the following
>> functions:
>>
>> *Column setters*:
>> In Pandas:
>> df['col3'] = df['col1'] * 3.0
>> While I do the following in PySpark:
>> df = df.withColumn('col3', df['col1'] * 3.0)
>>
>> *Column apply()*:
>> In Pandas:
>> df['col3'] = df['col1'].apply(lambda x: x * 3.0)
>> While I do the following in PySpark:
>> df = df.withColumn('col3', F.udf(lambda x: x * 3.0, 'float')(df['col1']))
>>
>> I understand that this one cannot be as simple as in pandas due to the
>> output-type that's needed here. But could be done like:
>> df['col3'] = df['col1'].apply((lambda x: x * 3.0), 'float')
>>
>> Multi column in pandas is:
>> df['col3'] = df[['col1', 'col2']].apply(lambda x: x.col1 * 3.0)
>> Maybe this can be done in pyspark as or if we can send a pyspark.sql.Row
>> directly it would be similar (?):
>> df['col3'] = df[['col1', 'col2']].apply((lambda col1, col2: col1 * 3.0),
>> 'float')
>>
>> *Rename*:
>> In Pandas:
>> df.rename(columns={...})
>> While I do the following in PySpark:
>> df.toDF(*[{'col2': 'col3'}.get(i, i) for i in df.columns])
>>
>> *To Dictionary*:
>> In Pandas:
>> df.to_dict(orient='list')
>> While I do the following in PySpark:
>> {f.name: [row[i] for row in df.collect()] for i, f in
>> enumerate(df.schema.fields)}
>>
>> I thought I'd start the discussion with these and come back to some of
>> the others I see that could be helpful.
>>
>> *Note*: (with the column functions in mind) I understand the concept of
>> the DataFrame cannot be modified. And I am not suggesting we change that
>> nor any underlying principle. Just trying to add syntactic sugar here.
>>
>>


PySpark syntax vs Pandas syntax

2019-03-26 Thread Abdeali Kothari
Hi,
I was doing some spark to pandas (and vice versa) conversion because some
of the pandas codes we have don't work on huge data. And some spark codes
work very slow on small data.

It was nice to see that pyspark had some similar syntax for the common
pandas operations that the python community is used to.

GroupBy aggs: df.groupby(['col2']).agg({'col2': 'count'}).show()
Column selects: df[['col1', 'col2']]
Row Filters: df[df['col1'] < 3.0]

I was wondering about a bunch of other functions in pandas which seemed
common. And thought there must've been a discussion about it in the
community - hence started this thread.

I was wondering whether there has been discussion on adding the following
functions:

*Column setters*:
In Pandas:
df['col3'] = df['col1'] * 3.0
While I do the following in PySpark:
df = df.withColumn('col3', df['col1'] * 3.0)

*Column apply()*:
In Pandas:
df['col3'] = df['col1'].apply(lambda x: x * 3.0)
While I do the following in PySpark:
df = df.withColumn('col3', F.udf(lambda x: x * 3.0, 'float')(df['col1']))

I understand that this one cannot be as simple as in pandas due to the
output-type that's needed here. But could be done like:
df['col3'] = df['col1'].apply((lambda x: x * 3.0), 'float')

Multi column in pandas is:
df['col3'] = df[['col1', 'col2']].apply(lambda x: x.col1 * 3.0)
Maybe this can be done in pyspark as or if we can send a pyspark.sql.Row
directly it would be similar (?):
df['col3'] = df[['col1', 'col2']].apply((lambda col1, col2: col1 * 3.0),
'float')

*Rename*:
In Pandas:
df.rename(columns={...})
While I do the following in PySpark:
df.toDF(*[{'col2': 'col3'}.get(i, i) for i in df.columns])

*To Dictionary*:
In Pandas:
df.to_dict(orient='list')
While I do the following in PySpark:
{f.name: [row[i] for row in df.collect()] for i, f in
enumerate(df.schema.fields)}

I thought I'd start the discussion with these and come back to some of the
others I see that could be helpful.

*Note*: (with the column functions in mind) I understand the concept of the
DataFrame cannot be modified. And I am not suggesting we change that nor
any underlying principle. Just trying to add syntactic sugar here.


Accumulator issues in PySpark

2018-09-25 Thread Abdeali Kothari
I was trying to check out accumulators and see if I could use them for
anything.
I made a demo program and could not figure out how to add them up.

I found that I need to do a shuffle between all my python UDFs that I am
running for the accumulators to be run. Basically, if I do 5 withColumn()
with Python UDFs, I find the accumulator's value gets added only for the
last UDF I run before my action.

Here is a snippet to reproduce with Spark 2.3.2:

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark import AccumulatorParam

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)

SHUFFLE = False

def main(data):
print(">>> Check0", test_accum.value)
def test(x):
global test_accum
test_accum += 1.0
return x

print(">>> Check1", test_accum.value)

def test2(x):
global test_accum
test_accum += 100.0
return x

print(">>> Check2", test_accum.value)
func_udf = F.udf(test, T.DoubleType())
print(">>> Check3", test_accum.value)
func_udf2 = F.udf(test2, T.DoubleType())
print(">>> Check4", test_accum.value)

data = data.withColumn("out1", func_udf(data["a"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check5", test_accum.value)
data = data.withColumn("out2", func_udf2(data["b"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check6", test_accum.value)

data.show()  # ACTION
print(">>> Check7", test_accum.value)
return data


df = spark.createDataFrame([
[1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for
field_name in ["a", "b"]]))

df2 = main(df)



 Output 1 - with SHUFFLE=False
...
*# >>> Check7 100.0*


 Output 2 - with SHUFFLE=True
...
*# >>> Check7 101.0*

Basically looks like:
 - Accumulator works only for last UDF before a shuffle-like operation
Not sure if this is a bug or expected behaviour.

Overall goal:
I'm trying to capture error messages from UDFs if they error out. The plan
is to try/except them and catch an error, save to accumulator and continue
execution with `return None`.


Accessing the SQL parser

2018-01-11 Thread Abdeali Kothari
I was writing some code to try to auto find a list of tables and databases
being used in a SparkSQL query. Mainly I was looking to auto-check the
permissions and owners of all the tables a query will be trying to access.

I was wondering whether PySpark has some method for me to directly use the
AST that SparkSQL uses?

Or is there some documentation on how I can generate and understand the AST
in Spark?

Regards,
AbdealiJK