But would it be the case for multiple tasks running on the same worker and
also both the tasks are running in client mode, so the one true is true for
both or for neither. As mentioned earlier all the confs are same. I have
checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the
environments. Also i verified by running the same script for jupyter
environment and was able to get the same result using the normal script
which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred
from driver to spark cluster in client mode. Any info on that topic would
be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <pmccar...@dstillery.com>
wrote:

> Are you running in cluster mode? A large virtualenv zip for the driver
> sent into the cluster on a slow pipe could account for much of that eight
> minutes.
>
> On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <dhruba.w...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I just ran the same script in a shell in jupyter notebook and find the
>> performance to be similar. So I can confirm this is because the libraries
>> used jupyter notebook python is different than the spark-submit python this
>> is happening.
>>
>> But now I have a following question. Are the dependent libraries in a
>> python script also transferred to the worker machines when executing a
>> python script in spark. Because though the driver python versions are
>> different, the workers machines will use their same python environment to
>> run the code. If anyone can explain this part, it would be helpful.
>>
>>
>>
>>
>> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>>
>>
>> On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <dhruba.w...@gmail.com>
>> wrote:
>>
>>> Just checked from where the script is submitted i.e. wrt Driver, the
>>> python env are different. Jupyter one is running within a the virtual
>>> environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But
>>> the executors have the same python version right? I tried doing a
>>> spark-submit from jupyter shell, it fails to find python 2.7  which is not
>>> there hence throws error.
>>>
>>> Here is the udf which might take time:
>>>
>>> import base64
>>> import zlib
>>>
>>> def decompress(data):
>>>
>>>     bytecode = base64.b64decode(data)
>>>     d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>>>     decompressed_data = d.decompress(bytecode )
>>>     return(decompressed_data.decode('utf-8'))
>>>
>>>
>>> Could this because of the two python environment mismatch from Driver side? 
>>> But the processing
>>>
>>> happens in the executor side?
>>>
>>>
>>>
>>>
>>> *Regards,Dhrub*
>>>
>>> On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
>>>> Maybe you can try running it in a python shell or
>>>> jupyter-console/ipython instead of a spark-submit and check how much time
>>>> it takes too.
>>>>
>>>> Compare the env variables to check that no additional env configuration
>>>> is present in either environment.
>>>>
>>>> Also is the python environment for both the exact same? I ask because
>>>> it looks like you're using a UDF and if the Jupyter python has (let's say)
>>>> numpy compiled with blas it would be faster than a numpy without it. Etc.
>>>> I.E. Some library you use may be using pure python and another may be using
>>>> a faster C extension...
>>>>
>>>> What python libraries are you using in the UDFs? It you don't use UDFs
>>>> at all and use some very simple pure spark functions does the time
>>>> difference still exist?
>>>>
>>>> Also are you using dynamic allocation or some similar spark config
>>>> which could vary performance between runs because the same resources we're
>>>> not utilized on Jupyter / spark-submit?
>>>>
>>>>
>>>> On Wed, Sep 11, 2019, 08:43 Stephen Boesch <java...@gmail.com> wrote:
>>>>
>>>>> Sounds like you have done your homework to properly compare .   I'm
>>>>> guessing the answer to the following is yes .. but in any case:  are they
>>>>> both running against the same spark cluster with the same configuration
>>>>> parameters especially executor memory and number of workers?
>>>>>
>>>>> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
>>>>> dhruba.w...@gmail.com>:
>>>>>
>>>>>> No, i checked for that, hence written "brand new" jupyter notebook.
>>>>>> Also the time taken by both are 30 mins and ~3hrs as i am reading a 500
>>>>>> gigs compressed base64 encoded text data from a hive table and
>>>>>> decompressing and decoding in one of the udfs. Also the time compared is
>>>>>> from Spark UI not  how long the job actually takes after submission. Its
>>>>>> just the running time i am comparing/mentioning.
>>>>>>
>>>>>> As mentioned earlier, all the spark conf params even match in two
>>>>>> scripts and that's why i am puzzled what going on.
>>>>>>
>>>>>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <
>>>>>> pmccar...@dstillery.com> wrote:
>>>>>>
>>>>>>> It's not obvious from what you pasted, but perhaps the juypter
>>>>>>> notebook already is connected to a running spark context, while
>>>>>>> spark-submit needs to get a new spot in the (YARN?) queue.
>>>>>>>
>>>>>>> I would check the cluster job IDs for both to ensure you're getting
>>>>>>> new cluster tasks for each.
>>>>>>>
>>>>>>> On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <
>>>>>>> dhruba.w...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am facing a weird behaviour while running a python script. Here
>>>>>>>> is what the code looks like mostly:
>>>>>>>>
>>>>>>>> def fn1(ip):
>>>>>>>>    some code...
>>>>>>>>     ...
>>>>>>>>
>>>>>>>> def fn2(row):
>>>>>>>>     ...
>>>>>>>>     some operations
>>>>>>>>     ...
>>>>>>>>     return row1
>>>>>>>>
>>>>>>>>
>>>>>>>> udf_fn1 = udf(fn1)
>>>>>>>> cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs
>>>>>>>> with ~4500 partitions
>>>>>>>> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
>>>>>>>>     .drop("colz") \
>>>>>>>>     .withColumnRenamed("colz", "coly")
>>>>>>>>
>>>>>>>> edf = ddf \
>>>>>>>>     .filter(ddf.colp == 'some_value') \
>>>>>>>>     .rdd.map(lambda row: fn2(row)) \
>>>>>>>>     .toDF()
>>>>>>>>
>>>>>>>> print edf.count() // simple way for the performance test in both
>>>>>>>> platforms
>>>>>>>>
>>>>>>>> Now when I run the same code in a brand new jupyter notebook it
>>>>>>>> runs 6x faster than when I run this python script using spark-submit. 
>>>>>>>> The
>>>>>>>> configurations are printed and  compared from both the platforms and 
>>>>>>>> they
>>>>>>>> are exact same. I even tried to run this script in a single cell of 
>>>>>>>> jupyter
>>>>>>>> notebook and still have the same performance. I need to understand if 
>>>>>>>> I am
>>>>>>>> missing something in the spark-submit which is causing the issue.  I 
>>>>>>>> tried
>>>>>>>> to minimise the script to reproduce the same error without much code.
>>>>>>>>
>>>>>>>> Both are run in client mode on a yarn based spark cluster. The
>>>>>>>> machines from which both are executed are also the same and from same 
>>>>>>>> user.
>>>>>>>>
>>>>>>>> What i found is the  the quantile values for median for one ran
>>>>>>>> with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins. 
>>>>>>>>  I
>>>>>>>> am not able to figure out why this is happening.
>>>>>>>>
>>>>>>>> Any one faced this kind of issue before or know how to resolve this?
>>>>>>>>
>>>>>>>> *Regards,*
>>>>>>>> *Dhrub*
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> *Patrick McCarthy  *
>>>>>>>
>>>>>>> Senior Data Scientist, Machine Learning Engineering
>>>>>>>
>>>>>>> Dstillery
>>>>>>>
>>>>>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>>>>>
>>>>>>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>

Reply via email to