unsubscribe

2023-11-09 Thread Duflot Patrick
unsubscribe


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Patrick Tucci
Hi Meena,

It's not impossible, but it's unlikely that there's a bug in Spark SQL
randomly duplicating rows. The most likely explanation is there are more
records in the item table that match your sys/custumer_id/scode criteria
than you expect.

In your original query, try changing select rev.* to select I.*. This will
show you the records from item that the join produces. If the first part of
the code only returns one record, I expect you will see 4 distinct records
returned here.

Thanks,

Patrick


On Sun, Oct 22, 2023 at 1:29 AM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


Re: Spark stand-alone mode

2023-09-19 Thread Patrick Tucci
Multiple applications can run at once, but you need to either configure
Spark or your applications to allow that. In stand-alone mode, each
application attempts to take all resources available by default. This
section of the documentation has more details:

https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling

Explicitly setting the resources per application limits the resources to
the configured values for the lifetime of the application. You can use
dynamic allocation to allow Spark to scale the resources up and down per
application based on load, but the configuration is relatively more complex:

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:

>
> Thanks all for your suggestions. Noted with thanks.
> Just wanted share few more details about the environment
> 1. We use NFS for data storage and data is in parquet format
> 2. All HPC nodes are connected and already work as a cluster for Studio
> workbench. I can setup password less SSH if it not exist already.
> 3. We will stick with NFS for now and stand alone then may be will explore
> HDFS and YARN.
>
> Can you please confirm whether multiple users can run spark jobs at the
> same time?
> If so I will start working on it and let you know how it goes
>
> Mich, the link to Hadoop is not working. Can you please check and let me
> know the correct link. Would like to explore Hadoop option as well.
>
>
>
> Thanks,
> Elango
>
> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
> wrote:
>
>> you need to setup ssh without password, use key instead.  How to connect
>> without password using SSH (passwordless)
>> 
>>
>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Hi,
>>>
>>> Can these 4 nodes talk to each other through ssh as trusted hosts (on
>>> top of the network that Sean already mentioned)? Otherwise you need to set
>>> it up. You can install a LAN if you have another free port at the back of
>>> your HPC nodes. They should
>>>
>>> You ought to try to set up a Hadoop cluster pretty easily. Check this
>>> old article of mine for Hadoop set-up.
>>>
>>>
>>> https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D
>>>
>>> Hadoop will provide you with a common storage layer (HDFS) that these
>>> nodes will be able to share and talk. Yarn is your best bet as the resource
>>> manager with reasonably powerful hosts you have. However, for now the Stand
>>> Alone mode will do. Make sure that the Metastore you choose, (by default it
>>> will use Hive Metastore called Derby :( ) is something respetable like
>>> Postgres DB that can handle multiple concurrent spark jobs
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Distinguished Technologist, Solutions Architect & Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:
>>>

 Hi all,

 We have 4 HPC nodes and installed spark individually in all nodes.

 Spark is used as local mode(each driver/executor will have 8 cores and
 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
 scheduler.

 As this is local mode, we are facing performance issue(as only one
 executor) when it comes dealing with large datasets.

 Can I convert this 4 nodes into spark standalone cluster. We dont have
 hadoop so yarn mode is out of scope.

 Shall I follow the official documentation for setting up standalone
 cluster. Will it work? Do I need to aware anything else?
 Can you please share your thoughts?

 Thanks,
 Elango

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Re: Spark stand-alone mode

2023-09-15 Thread Patrick Tucci
I use Spark in standalone mode. It works well, and the instructions on the
site are accurate for the most part. The only thing that didn't work for me
was the start_all.sh script. Instead, I use a simple script that starts the
master node, then uses SSH to connect to the worker machines and start the
worker nodes.

All the nodes will need access to the same data, so you will need some sort
of shared file system. You could use an NFS share mounted to the same point
on each machine, S3, or HDFS.

Standalone also acquires all resources when an application is submitted, so
by default only one application may be run at a time. You can limit the
resources allocated to each application to allow multiple concurrent
applications, or you could configure dynamic allocation to scale the
resources up and down per application as needed.

On Fri, Sep 15, 2023 at 5:56 AM Ilango  wrote:

>
> Hi all,
>
> We have 4 HPC nodes and installed spark individually in all nodes.
>
> Spark is used as local mode(each driver/executor will have 8 cores and 65
> GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
> scheduler.
>
> As this is local mode, we are facing performance issue(as only one
> executor) when it comes dealing with large datasets.
>
> Can I convert this 4 nodes into spark standalone cluster. We dont have
> hadoop so yarn mode is out of scope.
>
> Shall I follow the official documentation for setting up standalone
> cluster. Will it work? Do I need to aware anything else?
> Can you please share your thoughts?
>
> Thanks,
> Elango
>


Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
No, the driver memory was not set explicitly. So it was likely the default
value, which appears to be 1GB.

On Thu, Aug 17, 2023, 16:49 Mich Talebzadeh 
wrote:

> One question, what was the driver memory before setting it to 4G? Did you
> have it set at all before?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 21:01, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Here are my config values from spark-defaults.conf:
>>
>> spark.eventLog.enabled true
>> spark.eventLog.dir hdfs://10.0.50.1:8020/spark-logs
>> spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
>> spark.history.fs.logDirectory hdfs://10.0.50.1:8020/spark-logs
>> spark.history.fs.update.interval 10s
>> spark.history.ui.port 18080
>> spark.sql.warehouse.dir hdfs://10.0.50.1:8020/user/spark/warehouse
>> spark.executor.cores 4
>> spark.executor.memory 16000M
>> spark.sql.legacy.createHiveTableByDefault false
>> spark.driver.host 10.0.50.1
>> spark.scheduler.mode FAIR
>> spark.driver.memory 4g #added 2023-08-17
>>
>> The only application that runs on the cluster is the Spark Thrift server,
>> which I launch like so:
>>
>> ~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077
>>
>> The cluster runs in standalone mode and does not use Yarn for resource
>> management. As a result, the Spark Thrift server acquires all available
>> cluster resources when it starts. This is okay; as of right now, I am the
>> only user of the cluster. If I add more users, they will also be SQL users,
>> submitting queries through the Thrift server.
>>
>> Let me know if you have any other questions or thoughts.
>>
>> Thanks,
>>
>> Patrick
>>
>> On Thu, Aug 17, 2023 at 3:09 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hello Paatrick,
>>>
>>> As a matter of interest what parameters and their respective values do
>>> you use in spark-submit. I assume it is running in YARN mode.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 17 Aug 2023 at 19:36, Patrick Tucci 
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Yes, that's the sequence of events. I think the big breakthrough is
>>>> that (for now at least) Spark is throwing errors instead of the queries
>>>> hanging. Which is a big step forward. I can at least troubleshoot issues if
>>>> I know what they are.
>>>>
>>>> When I reflect on the issues I faced and the solutions, my issue may
>>>> have been driver memory all along. I just couldn't determine that was the
>>>> issue because I never saw any errors. In one case, converting a LEFT JOIN
>>>> to an inner JOIN caused the query to run. In another case, replacing a text
>>>> field with an int ID and JOINing on the ID column worked. Per your advice,
>>>> changing file formats from ORC to Parquet solved one issue. These
>>>> interventions could have changed the way Spark needed to broadcast data to
>>>> execute the query, thereby reducing demand on the memory-constrained 
>>>> driver.
>>>>
>>>> Fingers crossed this is the solution. I will rep

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Mich,

Here are my config values from spark-defaults.conf:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://10.0.50.1:8020/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://10.0.50.1:8020/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080
spark.sql.warehouse.dir hdfs://10.0.50.1:8020/user/spark/warehouse
spark.executor.cores 4
spark.executor.memory 16000M
spark.sql.legacy.createHiveTableByDefault false
spark.driver.host 10.0.50.1
spark.scheduler.mode FAIR
spark.driver.memory 4g #added 2023-08-17

The only application that runs on the cluster is the Spark Thrift server,
which I launch like so:

~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077

The cluster runs in standalone mode and does not use Yarn for resource
management. As a result, the Spark Thrift server acquires all available
cluster resources when it starts. This is okay; as of right now, I am the
only user of the cluster. If I add more users, they will also be SQL users,
submitting queries through the Thrift server.

Let me know if you have any other questions or thoughts.

Thanks,

Patrick

On Thu, Aug 17, 2023 at 3:09 PM Mich Talebzadeh 
wrote:

> Hello Paatrick,
>
> As a matter of interest what parameters and their respective values do
> you use in spark-submit. I assume it is running in YARN mode.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 19:36, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Yes, that's the sequence of events. I think the big breakthrough is that
>> (for now at least) Spark is throwing errors instead of the queries hanging.
>> Which is a big step forward. I can at least troubleshoot issues if I know
>> what they are.
>>
>> When I reflect on the issues I faced and the solutions, my issue may have
>> been driver memory all along. I just couldn't determine that was the issue
>> because I never saw any errors. In one case, converting a LEFT JOIN to an
>> inner JOIN caused the query to run. In another case, replacing a text field
>> with an int ID and JOINing on the ID column worked. Per your advice,
>> changing file formats from ORC to Parquet solved one issue. These
>> interventions could have changed the way Spark needed to broadcast data to
>> execute the query, thereby reducing demand on the memory-constrained driver.
>>
>> Fingers crossed this is the solution. I will reply to this thread if the
>> issue comes up again (hopefully it doesn't!).
>>
>> Thanks again,
>>
>> Patrick
>>
>> On Thu, Aug 17, 2023 at 1:54 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Patrik,
>>>
>>> glad that you have managed to sort this problem out. Hopefully it will
>>> go away for good.
>>>
>>> Still we are in the dark about how this problem is going away and coming
>>> back :( As I recall the chronology of events were as follows:
>>>
>>>
>>>1. The Issue with hanging Spark job reported
>>>2. concurrency on Hive metastore (single threaded Derby DB) was
>>>identified as a possible cause
>>>3. You changed the underlying Hive table formats from ORC to Parquet
>>>and somehow it worked
>>>4. The issue was reported again
>>>5. You upgraded the spark version from 3.4.0 to 3.4.1 (as a possible
>>>underlying issue) and encountered driver memory limitation.
>>>6. you allocated more memory to the driver and it is running ok for
>>>now,
>>>7. It appears that you are doing some join between a large dataset
>>>and a smaller dataset. Spark decides to do broadcast join by taking the
>>>smaller dataset, fit it into the driver memory and broadcasting it to all
>>>executors.  That is where you had this issue with the memory limit on the
>>>driver. In the absence of Broadcast join, spark needs to perform a 
>>> shuffle
>>>which is an expensive process.
>>>   1. you can increase t

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Mich,

Yes, that's the sequence of events. I think the big breakthrough is that
(for now at least) Spark is throwing errors instead of the queries hanging.
Which is a big step forward. I can at least troubleshoot issues if I know
what they are.

When I reflect on the issues I faced and the solutions, my issue may have
been driver memory all along. I just couldn't determine that was the issue
because I never saw any errors. In one case, converting a LEFT JOIN to an
inner JOIN caused the query to run. In another case, replacing a text field
with an int ID and JOINing on the ID column worked. Per your advice,
changing file formats from ORC to Parquet solved one issue. These
interventions could have changed the way Spark needed to broadcast data to
execute the query, thereby reducing demand on the memory-constrained driver.

Fingers crossed this is the solution. I will reply to this thread if the
issue comes up again (hopefully it doesn't!).

Thanks again,

Patrick

On Thu, Aug 17, 2023 at 1:54 PM Mich Talebzadeh 
wrote:

> Hi Patrik,
>
> glad that you have managed to sort this problem out. Hopefully it will go
> away for good.
>
> Still we are in the dark about how this problem is going away and coming
> back :( As I recall the chronology of events were as follows:
>
>
>1. The Issue with hanging Spark job reported
>2. concurrency on Hive metastore (single threaded Derby DB) was
>identified as a possible cause
>3. You changed the underlying Hive table formats from ORC to Parquet
>and somehow it worked
>4. The issue was reported again
>5. You upgraded the spark version from 3.4.0 to 3.4.1 (as a possible
>underlying issue) and encountered driver memory limitation.
>6. you allocated more memory to the driver and it is running ok for
>now,
>7. It appears that you are doing some join between a large dataset and
>a smaller dataset. Spark decides to do broadcast join by taking the smaller
>dataset, fit it into the driver memory and broadcasting it to all
>executors.  That is where you had this issue with the memory limit on the
>driver. In the absence of Broadcast join, spark needs to perform a shuffle
>which is an expensive process.
>   1. you can increase the broadcast join memory setting the conf.
>   parameter "spark.sql.autoBroadcastJoinThreshold" in bytes (check the 
> manual)
>   2. You can also disable the broadcast join by setting
>   "spark.sql.autoBroadcastJoinThreshold", -1 to see what is happening.
>
>
> So you still need to find a resolution to this issue. Maybe 3.4.1 has
> managed to fix some underlying issues.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 17:17, Patrick Tucci 
> wrote:
>
>> Hi Everyone,
>>
>> I just wanted to follow up on this issue. This issue has continued since
>> our last correspondence. Today I had a query hang and couldn't resolve the
>> issue. I decided to upgrade my Spark install from 3.4.0 to 3.4.1. After
>> doing so, instead of the query hanging, I got an error message that the
>> driver didn't have enough memory to broadcast objects. After increasing the
>> driver memory, the query runs without issue.
>>
>> I hope this can be helpful to someone else in the future. Thanks again
>> for the support,
>>
>> Patrick
>>
>> On Sun, Aug 13, 2023 at 7:52 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK I use Hive 3.1.1
>>>
>>> My suggestion is to put your hive issues to u...@hive.apache.org and
>>> for JAVA version compatibility
>>>
>>> They will give you better info.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your 

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Everyone,

I just wanted to follow up on this issue. This issue has continued since
our last correspondence. Today I had a query hang and couldn't resolve the
issue. I decided to upgrade my Spark install from 3.4.0 to 3.4.1. After
doing so, instead of the query hanging, I got an error message that the
driver didn't have enough memory to broadcast objects. After increasing the
driver memory, the query runs without issue.

I hope this can be helpful to someone else in the future. Thanks again for
the support,

Patrick

On Sun, Aug 13, 2023 at 7:52 AM Mich Talebzadeh 
wrote:

> OK I use Hive 3.1.1
>
> My suggestion is to put your hive issues to u...@hive.apache.org and for
> JAVA version compatibility
>
> They will give you better info.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 13 Aug 2023 at 11:48, Patrick Tucci 
> wrote:
>
>> I attempted to install Hive yesterday. The experience was similar to
>> other attempts at installing Hive: it took a few hours and at the end of
>> the process, I didn't have a working setup. The latest stable release would
>> not run. I never discovered the cause, but similar StackOverflow questions
>> suggest it might be a Java incompatibility issue. Since I didn't want to
>> downgrade or install an additional Java version, I attempted to use the
>> latest alpha as well. This appears to have worked, although I couldn't
>> figure out how to get it to use the metastore_db from Spark.
>>
>> After turning my attention back to Spark, I determined the issue. After
>> much troubleshooting, I discovered that if I performed a COUNT(*) using
>> the same JOINs, the problem query worked. I removed all the columns from
>> the SELECT statement and added them one by one until I found the culprit.
>> It's a text field on one of the tables. When the query SELECTs this column,
>> or attempts to filter on it, the query hangs and never completes. If I
>> remove all explicit references to this column, the query works fine. Since
>> I need this column in the results, I went back to the ETL and extracted the
>> values to a dimension table. I replaced the text column in the source table
>> with an integer ID column and the query worked without issue.
>>
>> On the topic of Hive, does anyone have any detailed resources for how to
>> set up Hive from scratch? Aside from the official site, since those
>> instructions didn't work for me. I'm starting to feel uneasy about building
>> my process around Spark. There really shouldn't be any instances where I
>> ask Spark to run legal ANSI SQL code and it just does nothing. In the past
>> 4 days I've run into 2 of these instances, and the solution was more voodoo
>> and magic than examining errors/logs and fixing code. I feel that I should
>> have a contingency plan in place for when I run into an issue with Spark
>> that can't be resolved.
>>
>> Thanks everyone.
>>
>>
>> On Sat, Aug 12, 2023 at 2:18 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK you would not have known unless you went through the process so to
>>> speak.
>>>
>>> Let us do something revolutionary here 
>>>
>>> Install hive and its metastore. You already have hadoop anyway
>>>
>>> https://cwiki.apache.org/confluence/display/hive/adminmanual+installation
>>>
>>> hive metastore
>>>
>>>
>>> https://data-flair.training/blogs/apache-hive-metastore/#:~:text=What%20is%20Hive%20Metastore%3F,by%20using%20metastore%20service%20API
>>> .
>>>
>>> choose one of these
>>>
>>> derby  hive  mssql  mysql  oracle  postgres
>>>
>>> Mine is an oracle. postgres is good as well.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-13 Thread Patrick Tucci
I attempted to install Hive yesterday. The experience was similar to other
attempts at installing Hive: it took a few hours and at the end of the
process, I didn't have a working setup. The latest stable release would not
run. I never discovered the cause, but similar StackOverflow questions
suggest it might be a Java incompatibility issue. Since I didn't want to
downgrade or install an additional Java version, I attempted to use the
latest alpha as well. This appears to have worked, although I couldn't
figure out how to get it to use the metastore_db from Spark.

After turning my attention back to Spark, I determined the issue. After
much troubleshooting, I discovered that if I performed a COUNT(*) using the
same JOINs, the problem query worked. I removed all the columns from the
SELECT statement and added them one by one until I found the culprit. It's
a text field on one of the tables. When the query SELECTs this column, or
attempts to filter on it, the query hangs and never completes. If I remove
all explicit references to this column, the query works fine. Since I need
this column in the results, I went back to the ETL and extracted the values
to a dimension table. I replaced the text column in the source table with
an integer ID column and the query worked without issue.

On the topic of Hive, does anyone have any detailed resources for how to
set up Hive from scratch? Aside from the official site, since those
instructions didn't work for me. I'm starting to feel uneasy about building
my process around Spark. There really shouldn't be any instances where I
ask Spark to run legal ANSI SQL code and it just does nothing. In the past
4 days I've run into 2 of these instances, and the solution was more voodoo
and magic than examining errors/logs and fixing code. I feel that I should
have a contingency plan in place for when I run into an issue with Spark
that can't be resolved.

Thanks everyone.


On Sat, Aug 12, 2023 at 2:18 PM Mich Talebzadeh 
wrote:

> OK you would not have known unless you went through the process so to
> speak.
>
> Let us do something revolutionary here 
>
> Install hive and its metastore. You already have hadoop anyway
>
> https://cwiki.apache.org/confluence/display/hive/adminmanual+installation
>
> hive metastore
>
>
> https://data-flair.training/blogs/apache-hive-metastore/#:~:text=What%20is%20Hive%20Metastore%3F,by%20using%20metastore%20service%20API
> .
>
> choose one of these
>
> derby  hive  mssql  mysql  oracle  postgres
>
> Mine is an oracle. postgres is good as well.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Aug 2023 at 18:31, Patrick Tucci 
> wrote:
>
>> Yes, on premise.
>>
>> Unfortunately after installing Delta Lake and re-writing all tables as
>> Delta tables, the issue persists.
>>
>> On Sat, Aug 12, 2023 at 11:34 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> ok sure.
>>>
>>> Is this Delta Lake going to be on-premise?
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Aug 2023 at 12:03, Patrick Tucci 
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Thanks for the feedback. My original intention after reading your
>>>> response was to stick to Hive for managing tables. Unfortunately, I'm
>>>> running into another case of SQL scripts hanging. Since all tables are
>>>

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-12 Thread Patrick Tucci
Yes, on premise.

Unfortunately after installing Delta Lake and re-writing all tables as
Delta tables, the issue persists.

On Sat, Aug 12, 2023 at 11:34 AM Mich Talebzadeh 
wrote:

> ok sure.
>
> Is this Delta Lake going to be on-premise?
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Aug 2023 at 12:03, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Thanks for the feedback. My original intention after reading your
>> response was to stick to Hive for managing tables. Unfortunately, I'm
>> running into another case of SQL scripts hanging. Since all tables are
>> already Parquet, I'm out of troubleshooting options. I'm going to migrate
>> to Delta Lake and see if that solves the issue.
>>
>> Thanks again for your feedback.
>>
>> Patrick
>>
>> On Fri, Aug 11, 2023 at 10:09 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Patrick,
>>>
>>> There is not anything wrong with Hive On-premise it is the best data
>>> warehouse there is
>>>
>>> Hive handles both ORC and Parquet formal well. They are both columnar
>>> implementations of relational model. What you are seeing is the Spark API
>>> to Hive which prefers Parquet. I found out a few years ago.
>>>
>>> From your point of view I suggest you stick to parquet format with Hive
>>> specific to Spark. As far as I know you don't have a fully independent Hive
>>> DB as yet.
>>>
>>> Anyway stick to Hive for now as you never know what issues you may be
>>> facing using moving to Delta Lake.
>>>
>>> You can also use compression
>>>
>>> STORED AS PARQUET
>>> TBLPROPERTIES ("parquet.compression"="SNAPPY")
>>>
>>> ALSO
>>>
>>> ANALYZE TABLE  COMPUTE STATISTICS FOR COLUMNS
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 11 Aug 2023 at 11:26, Patrick Tucci 
>>> wrote:
>>>
>>>> Thanks for the reply Stephen and Mich.
>>>>
>>>> Stephen, you're right, it feels like Spark is waiting for something,
>>>> but I'm not sure what. I'm the only user on the cluster and there are
>>>> plenty of resources (+60 cores, +250GB RAM). I even tried restarting
>>>> Hadoop, Spark and the host servers to make sure nothing was lingering in
>>>> the background.
>>>>
>>>> Mich, thank you so much, your suggestion worked. Storing the tables as
>>>> Parquet solves the issue.
>>>>
>>>> Interestingly, I found that only the MemberEnrollment table needs to be
>>>> Parquet. The ID field in MemberEnrollment is an int calculated during load
>>>> by a ROW_NUMBER() function. Further testing found that if I hard code a 0
>>>> as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
>>>> query works without issue even if both tables are ORC.
>>>>
>>>> Should I infer from this issue that the Hive components prefer Parquet
>>>> over ORC? Furthermore, should I consider using a different table storage
>>>> framework, like Delta Lake, instead of the Hive components? Given this
>>>> issue and other issues I've ha

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-12 Thread Patrick Tucci
Hi Mich,

Thanks for the feedback. My original intention after reading your response
was to stick to Hive for managing tables. Unfortunately, I'm running into
another case of SQL scripts hanging. Since all tables are already Parquet,
I'm out of troubleshooting options. I'm going to migrate to Delta Lake and
see if that solves the issue.

Thanks again for your feedback.

Patrick

On Fri, Aug 11, 2023 at 10:09 AM Mich Talebzadeh 
wrote:

> Hi Patrick,
>
> There is not anything wrong with Hive On-premise it is the best data
> warehouse there is
>
> Hive handles both ORC and Parquet formal well. They are both columnar
> implementations of relational model. What you are seeing is the Spark API
> to Hive which prefers Parquet. I found out a few years ago.
>
> From your point of view I suggest you stick to parquet format with Hive
> specific to Spark. As far as I know you don't have a fully independent Hive
> DB as yet.
>
> Anyway stick to Hive for now as you never know what issues you may be
> facing using moving to Delta Lake.
>
> You can also use compression
>
> STORED AS PARQUET
> TBLPROPERTIES ("parquet.compression"="SNAPPY")
>
> ALSO
>
> ANALYZE TABLE  COMPUTE STATISTICS FOR COLUMNS
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Aug 2023 at 11:26, Patrick Tucci 
> wrote:
>
>> Thanks for the reply Stephen and Mich.
>>
>> Stephen, you're right, it feels like Spark is waiting for something, but
>> I'm not sure what. I'm the only user on the cluster and there are plenty of
>> resources (+60 cores, +250GB RAM). I even tried restarting Hadoop, Spark
>> and the host servers to make sure nothing was lingering in the background.
>>
>> Mich, thank you so much, your suggestion worked. Storing the tables as
>> Parquet solves the issue.
>>
>> Interestingly, I found that only the MemberEnrollment table needs to be
>> Parquet. The ID field in MemberEnrollment is an int calculated during load
>> by a ROW_NUMBER() function. Further testing found that if I hard code a 0
>> as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
>> query works without issue even if both tables are ORC.
>>
>> Should I infer from this issue that the Hive components prefer Parquet
>> over ORC? Furthermore, should I consider using a different table storage
>> framework, like Delta Lake, instead of the Hive components? Given this
>> issue and other issues I've had with Hive, I'm starting to think a
>> different solution might be more robust and stable. The main condition is
>> that my application operates solely through Thrift server, so I need to be
>> able to connect to Spark through Thrift server and have it write tables
>> using Delta Lake instead of Hive. From this StackOverflow question, it
>> looks like this is possible:
>> https://stackoverflow.com/questions/69862388/how-to-run-spark-sql-thrift-server-in-local-mode-and-connect-to-delta-using-jdbc
>>
>> Thanks again to everyone who replied for their help.
>>
>> Patrick
>>
>>
>> On Fri, Aug 11, 2023 at 2:14 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Steve may have a valid point. You raised an issue with concurrent writes
>>> before, if I recall correctly. Since this limitation may be due to Hive
>>> metastore. By default Spark uses Apache Derby for its database
>>> persistence. *However it is limited to only one Spark session at any
>>> time for the purposes of metadata storage.*  That may be the cause here
>>> as well. Does this happen if the underlying tables are created as PARQUET
>>> as opposed to ORC?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-11 Thread Patrick Tucci
Thanks for the reply Stephen and Mich.

Stephen, you're right, it feels like Spark is waiting for something, but
I'm not sure what. I'm the only user on the cluster and there are plenty of
resources (+60 cores, +250GB RAM). I even tried restarting Hadoop, Spark
and the host servers to make sure nothing was lingering in the background.

Mich, thank you so much, your suggestion worked. Storing the tables as
Parquet solves the issue.

Interestingly, I found that only the MemberEnrollment table needs to be
Parquet. The ID field in MemberEnrollment is an int calculated during load
by a ROW_NUMBER() function. Further testing found that if I hard code a 0
as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
query works without issue even if both tables are ORC.

Should I infer from this issue that the Hive components prefer Parquet over
ORC? Furthermore, should I consider using a different table storage
framework, like Delta Lake, instead of the Hive components? Given this
issue and other issues I've had with Hive, I'm starting to think a
different solution might be more robust and stable. The main condition is
that my application operates solely through Thrift server, so I need to be
able to connect to Spark through Thrift server and have it write tables
using Delta Lake instead of Hive. From this StackOverflow question, it
looks like this is possible:
https://stackoverflow.com/questions/69862388/how-to-run-spark-sql-thrift-server-in-local-mode-and-connect-to-delta-using-jdbc

Thanks again to everyone who replied for their help.

Patrick


On Fri, Aug 11, 2023 at 2:14 AM Mich Talebzadeh 
wrote:

> Steve may have a valid point. You raised an issue with concurrent writes
> before, if I recall correctly. Since this limitation may be due to Hive
> metastore. By default Spark uses Apache Derby for its database
> persistence. *However it is limited to only one Spark session at any time
> for the purposes of metadata storage.*  That may be the cause here as
> well. Does this happen if the underlying tables are created as PARQUET as
> opposed to ORC?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Aug 2023 at 01:33, Stephen Coy 
> wrote:
>
>> Hi Patrick,
>>
>> When this has happened to me in the past (admittedly via spark-submit) it
>> has been because another job was still running and had already claimed some
>> of the resources (cores and memory).
>>
>> I think this can also happen if your configuration tries to claim
>> resources that will never be available.
>>
>> Cheers,
>>
>> SteveC
>>
>>
>> On 11 Aug 2023, at 3:36 am, Patrick Tucci 
>> wrote:
>>
>> Hello,
>>
>> I'm attempting to run a query on Spark 3.4.0 through the Spark
>> ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
>> standalone mode using HDFS for storage.
>>
>> The query is as follows:
>>
>> SELECT ME.*, MB.BenefitID
>> FROM MemberEnrollment ME
>> JOIN MemberBenefits MB
>> ON ME.ID <http://me.id/> = MB.EnrollmentID
>> WHERE MB.BenefitID = 5
>> LIMIT 10
>>
>> The tables are defined as follows:
>>
>> -- Contains about 3M rows
>> CREATE TABLE MemberEnrollment
>> (
>> ID INT
>> , MemberID VARCHAR(50)
>> , StartDate DATE
>> , EndDate DATE
>> -- Other columns, but these are the most important
>> ) STORED AS ORC;
>>
>> -- Contains about 25m rows
>> CREATE TABLE MemberBenefits
>> (
>> EnrollmentID INT
>> , BenefitID INT
>> ) STORED AS ORC;
>>
>> When I execute the query, it runs a single broadcast exchange stage,
>> which completes after a few seconds. Then everything just hangs. The
>> JDBC/ODBC tab in the UI shows the query state as COMPILED, but no stages or
>> tasks are executing or pending:
>>
>> 
>>
>> I've let the query run for as long as 30 minutes with no additional
>> stages, progress, or errors. I'm not sure where to start troubleshooting.
>>
>> Thanks for your help,
>>
>> Patrick
>>
>>
>> This email contains confiden

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hi Mich,

I don't believe Hive is installed. I set up this cluster from scratch. I
installed Hadoop and Spark by downloading them from their project websites.
If Hive isn't bundled with Hadoop or Spark, I don't believe I have it. I'm
running the Thrift server distributed with Spark, like so:

~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077

I can look into installing Hive, but it might take some time. I tried to
set up Hive when I first started evaluating distributed data processing
solutions, but I encountered many issues. Spark was much simpler, which was
part of the reason why I chose it.

Thanks again for the reply, I truly appreciate your help.

Patrick

On Thu, Aug 10, 2023 at 3:43 PM Mich Talebzadeh 
wrote:

> sorry host is 10.0.50.1
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Aug 2023 at 20:41, Mich Talebzadeh 
> wrote:
>
>> Hi Patrick
>>
>> That beeline on port 1 is a hive thrift server running on your hive
>> on host 10.0.50.1:1.
>>
>> if you can access that host, you should be able to log into hive by
>> typing hive. The os user is hadoop in your case and sounds like there is no
>> password!
>>
>> Once inside that host, hive logs are kept in your case
>> /tmp/hadoop/hive.log or go to /tmp and do
>>
>> /tmp> find ./ -name hive.log. It should be under /tmp/hive.log
>>
>> Try running the sql inside hive and see what it says
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 10 Aug 2023 at 20:02, Patrick Tucci 
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Thanks for the reply. Unfortunately I don't have Hive set up on my
>>> cluster. I can explore this if there are no other ways to troubleshoot.
>>>
>>> I'm using beeline to run commands against the Thrift server. Here's the
>>> command I use:
>>>
>>> ~/spark/bin/beeline -u jdbc:hive2://10.0.50.1:1 -n hadoop -f
>>> command.sql
>>>
>>> Thanks again for your help.
>>>
>>> Patrick
>>>
>>>
>>> On Thu, Aug 10, 2023 at 2:24 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Can you run this sql query through hive itself?
>>>>
>>>> Are you using this command or similar for your thrift server?
>>>>
>>>> beeline -u jdbc:hive2:///1/default
>>>> org.apache.hive.jdbc.HiveDriver -n hadoop -p xxx
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Solutions Architect/Engineering Lead
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 1

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. Unfortunately I don't have Hive set up on my cluster.
I can explore this if there are no other ways to troubleshoot.

I'm using beeline to run commands against the Thrift server. Here's the
command I use:

~/spark/bin/beeline -u jdbc:hive2://10.0.50.1:1 -n hadoop -f command.sql

Thanks again for your help.

Patrick


On Thu, Aug 10, 2023 at 2:24 PM Mich Talebzadeh 
wrote:

> Can you run this sql query through hive itself?
>
> Are you using this command or similar for your thrift server?
>
> beeline -u jdbc:hive2:///1/default
> org.apache.hive.jdbc.HiveDriver -n hadoop -p xxx
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Aug 2023 at 18:39, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm attempting to run a query on Spark 3.4.0 through the Spark
>> ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
>> standalone mode using HDFS for storage.
>>
>> The query is as follows:
>>
>> SELECT ME.*, MB.BenefitID
>> FROM MemberEnrollment ME
>> JOIN MemberBenefits MB
>> ON ME.ID = MB.EnrollmentID
>> WHERE MB.BenefitID = 5
>> LIMIT 10
>>
>> The tables are defined as follows:
>>
>> -- Contains about 3M rows
>> CREATE TABLE MemberEnrollment
>> (
>> ID INT
>> , MemberID VARCHAR(50)
>> , StartDate DATE
>> , EndDate DATE
>> -- Other columns, but these are the most important
>> ) STORED AS ORC;
>>
>> -- Contains about 25m rows
>> CREATE TABLE MemberBenefits
>> (
>> EnrollmentID INT
>> , BenefitID INT
>> ) STORED AS ORC;
>>
>> When I execute the query, it runs a single broadcast exchange stage,
>> which completes after a few seconds. Then everything just hangs. The
>> JDBC/ODBC tab in the UI shows the query state as COMPILED, but no stages or
>> tasks are executing or pending:
>>
>> [image: image.png]
>>
>> I've let the query run for as long as 30 minutes with no additional
>> stages, progress, or errors. I'm not sure where to start troubleshooting.
>>
>> Thanks for your help,
>>
>> Patrick
>>
>


Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hello,

I'm attempting to run a query on Spark 3.4.0 through the Spark
ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
standalone mode using HDFS for storage.

The query is as follows:

SELECT ME.*, MB.BenefitID
FROM MemberEnrollment ME
JOIN MemberBenefits MB
ON ME.ID = MB.EnrollmentID
WHERE MB.BenefitID = 5
LIMIT 10

The tables are defined as follows:

-- Contains about 3M rows
CREATE TABLE MemberEnrollment
(
ID INT
, MemberID VARCHAR(50)
, StartDate DATE
, EndDate DATE
-- Other columns, but these are the most important
) STORED AS ORC;

-- Contains about 25m rows
CREATE TABLE MemberBenefits
(
EnrollmentID INT
, BenefitID INT
) STORED AS ORC;

When I execute the query, it runs a single broadcast exchange stage, which
completes after a few seconds. Then everything just hangs. The JDBC/ODBC
tab in the UI shows the query state as COMPILED, but no stages or tasks are
executing or pending:

[image: image.png]

I've let the query run for as long as 30 minutes with no additional stages,
progress, or errors. I'm not sure where to start troubleshooting.

Thanks for your help,

Patrick


Re: Spark-SQL - Concurrent Inserts Into Same Table Throws Exception

2023-07-30 Thread Patrick Tucci
Hi Mich and Pol,

Thanks for the feedback. The database layer is Hadoop 3.3.5. The cluster
restarted so I lost the stack trace in the application UI. In the snippets
I saved, it looks like the exception being thrown was from Hive. Given the
feedback you've provided, I suspect the issue is with how the Hive
components are handling concurrent writes.

While using a different format would likely help with this issue, I think I
have found an easier solution for now. Currently I have many individual
scripts that perform logic and insert the results separately. Instead of
each script performing an insert, each script can instead create a view.
After the views are created, one single script can perform one single
INSERT, combining the views with UNION ALL statements.

-- Old logic --
-- Script 1
INSERT INTO EventClaims
/*Long, complicated query 1*/

-- Script N
INSERT INTO EventClaims
/*Long, complicated query N*/

-- New logic --
-- Script 1
CREATE VIEW Q1 AS
/*Long, complicated query 1*/

-- Script N
CREATE VIEW QN AS
/*Long, complicated query N*/

-- Final script --
INSERT INTO EventClaims
SELECT * FROM Q1 UNION ALL
SELECT * FROM QN

The old approach had almost two dozen stages with relatively fewer tasks.
The new approach requires only 3 stages. With fewer stages and more tasks,
cluster utilization is much higher.

Thanks again for your feedback. I suspect better concurrent writes will be
valuable for my project in the future, so this is good information to have
ready.

Thanks,

Patrick

On Sun, Jul 30, 2023 at 5:30 AM Pol Santamaria  wrote:

> Hi Patrick,
>
> You can have multiple writers simultaneously writing to the same table in
> HDFS by utilizing an open table format with concurrency control. Several
> formats, such as Apache Hudi, Apache Iceberg, Delta Lake, and Qbeast
> Format, offer this capability. All of them provide advanced features that
> will work better in different use cases according to the writing pattern,
> type of queries, data characteristics, etc.
>
> *Pol Santamaria*
>
>
> On Sat, Jul 29, 2023 at 4:28 PM Mich Talebzadeh 
> wrote:
>
>> It is not Spark SQL that throws the error. It is the underlying Database
>> or layer that throws the error.
>>
>> Spark acts as an ETL tool.  What is the underlying DB  where the table
>> resides? Is concurrency supported. Please send the error to this list
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 29 Jul 2023 at 12:02, Patrick Tucci 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm building an application on Spark SQL. The cluster is set up in
>>> standalone mode with HDFS as storage. The only Spark application running is
>>> the Spark Thrift Server using FAIR scheduling mode. Queries are submitted
>>> to Thrift Server using beeline.
>>>
>>> I have multiple queries that insert rows into the same table
>>> (EventClaims). These queries work fine when run sequentially, however, some
>>> individual queries don't fully utilize the resources available on the
>>> cluster. I would like to run all of these queries concurrently to more
>>> fully utilize available resources. When I attempt to do this, tasks
>>> eventually begin to fail. The stack trace is pretty long, but here's what
>>> looks like the most relevant parts:
>>>
>>>
>>> org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
>>>
>>> org.apache.hive.service.cli.HiveSQLException: Error running query:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 28
>>> in stage 128.0 failed 4 times, most recent failure: Lost task 28.3 in stage
>>> 128.0 (TID 6578) (10.0.50.2 executor 0): org.apache.spark.SparkException:
>>> [TASK_WRITE_FAILED] Task failed while writing rows to hdfs://
>>> 10.0.50.1:8020/user/spark/warehouse/eventclaims.
>>>
>>> Is it possible to have multiple concurrent writers to the same table
>>> with Spark SQL? Is there any way to make this work?
>>>
>>> Thanks for the help.
>>>
>>> Patrick
>>>
>>


Spark-SQL - Concurrent Inserts Into Same Table Throws Exception

2023-07-29 Thread Patrick Tucci
Hello,

I'm building an application on Spark SQL. The cluster is set up in
standalone mode with HDFS as storage. The only Spark application running is
the Spark Thrift Server using FAIR scheduling mode. Queries are submitted
to Thrift Server using beeline.

I have multiple queries that insert rows into the same table (EventClaims).
These queries work fine when run sequentially, however, some individual
queries don't fully utilize the resources available on the cluster. I would
like to run all of these queries concurrently to more fully utilize
available resources. When I attempt to do this, tasks eventually begin to
fail. The stack trace is pretty long, but here's what looks like the most
relevant parts:

org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)

org.apache.hive.service.cli.HiveSQLException: Error running query:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 28
in stage 128.0 failed 4 times, most recent failure: Lost task 28.3 in stage
128.0 (TID 6578) (10.0.50.2 executor 0): org.apache.spark.SparkException:
[TASK_WRITE_FAILED] Task failed while writing rows to hdfs://
10.0.50.1:8020/user/spark/warehouse/eventclaims.

Is it possible to have multiple concurrent writers to the same table with
Spark SQL? Is there any way to make this work?

Thanks for the help.

Patrick


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. I started running ANALYZE TABLE on the external
table, but the progress was very slow. The stage had only read about 275MB
in 10 minutes. That equates to about 5.5 hours just to analyze the table.

This might just be the reality of trying to process a 240m record file with
80+ columns, unless there's an obvious issue with my setup that someone
sees. The solution is likely going to involve increasing parallelization.

To that end, I extracted and re-zipped this file in bzip. Since bzip is
splittable and gzip is not, Spark can process the bzip file in parallel.
The same CTAS query only took about 45 minutes. This is still a bit slower
than I had hoped, but the import from bzip fully utilized all available
cores. So we can give the cluster more resources if we need the process to
go faster.

Patrick

On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh 
wrote:

> OK for now have you analyzed statistics in Hive external table
>
> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
> COLUMNS;
> spark-sql (default)> DESC EXTENDED test.stg_t2;
>
> Hive external tables have little optimization
>
> HTH
>
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>> and 64GB of RAM.
>>
>> I'm trying to process a large pipe delimited file that has been
>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>> columns). I uploaded the gzipped file to HDFS and created an external table
>> using the attached script. I tried two simpler queries on the same table,
>> and they finished in ~5 and ~10 minutes respectively:
>>
>> SELECT COUNT(*) FROM ClaimsImport;
>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>
>> However, when I tried to create a table stored as ORC using this table as
>> the input, the query ran for almost 4 hours:
>>
>> CREATE TABLE Claims STORED AS ORC
>> AS
>> SELECT *
>> FROM ClaimsImport
>> --Exclude the header record
>> WHERE ClaimID <> 'ClaimID';
>>
>> [image: image.png]
>>
>> Why is there such a speed disparity between these different operations? I
>> understand that this job cannot be parallelized because the file is
>> compressed with gzip. I also understand that creating an ORC table from the
>> input will take more time than a simple COUNT(*). But it doesn't feel like
>> the CREATE TABLE operation should take more than 24x longer than a simple
>> SELECT COUNT(*) statement.
>>
>> Thanks for any help. Please let me know if I can provide any additional
>> information.
>>
>> Patrick
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hello,

I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master node
has 2 cores and 8GB of RAM. There is a single worker node with 8 cores and
64GB of RAM.

I'm trying to process a large pipe delimited file that has been compressed
with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
uploaded the gzipped file to HDFS and created an external table using the
attached script. I tried two simpler queries on the same table, and they
finished in ~5 and ~10 minutes respectively:

SELECT COUNT(*) FROM ClaimsImport;
SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;

However, when I tried to create a table stored as ORC using this table as
the input, the query ran for almost 4 hours:

CREATE TABLE Claims STORED AS ORC
AS
SELECT *
FROM ClaimsImport
--Exclude the header record
WHERE ClaimID <> 'ClaimID';

[image: image.png]

Why is there such a speed disparity between these different operations? I
understand that this job cannot be parallelized because the file is
compressed with gzip. I also understand that creating an ORC table from the
input will take more time than a simple COUNT(*). But it doesn't feel like
the CREATE TABLE operation should take more than 24x longer than a simple
SELECT COUNT(*) statement.

Thanks for any help. Please let me know if I can provide any additional
information.

Patrick


Create Table.sql
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Patrick Tucci
Window functions don't work like traditional GROUP BYs. They allow you to
partition data and pull any relevant column, whether it's used in the
partition or not.

I'm not sure what the syntax is for PySpark, but the standard SQL would be
something like this:

WITH InputData AS
(
  SELECT 'USA' Country, 'New York' City, 900 Population
  UNION
  SELECT 'USA' Country, 'Miami', 620 Population
  UNION
  SELECT 'Ukraine' Country, 'Kyiv', 300 Population
  UNION
  SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
)

 SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population DESC)
PopulationRank
 FROM InputData;

Results would be something like this:

CountryCity   Population PopulationRank
UkraineKyiv   3001
UkraineKharkiv1402
USANew York   9001
USAMiami  6202

Which you could further filter in another CTE or subquery where
PopulationRank = 1.

As I mentioned, I'm not sure how this translates into PySpark, but that's
the general concept in SQL.

On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

> If we only wanted to know the biggest population, max function would
> suffice. The problem is I also want the name of the city with the biggest
> population.
>
> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>
>> As Mich says, isn't this just max by population partitioned by country in
>> a window function?
>>
>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   Thank you for the response!
>>>
>>>   I can think of two ways to get the largest city by country, but both
>>> seem to be inefficient:
>>>
>>>   (1) I could group by country, sort each group by population, add the
>>> row number within each group, and then retain only cities with a row number
>>> equal to 1. But it seems wasteful to sort everything when I only want the
>>> largest of each country
>>>
>>>   (2) I could group by country, get the maximum city population for each
>>> country, join that with the original data frame, and then retain only
>>> cities with population equal to the maximum population in the country. But
>>> that seems also expensive because I need to join.
>>>
>>>   Am I missing something?
>>>
>>>   Thanks!
>>>
>>>  Best, Oliver
>>>
>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 In spark you can use windowing function
 s to
 achieve this

 HTH


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
 oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is
> the maximum of the group? For example, imagine a DataFrame containing all
> major cities in the world, with three columns: (1) City name (2) Country
> (3) population. How would I get a DataFrame that only contains the largest
> city in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network ,
> Flannick Lab , Broad Institute
> 
>

>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


RE: Re: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-22 Thread Patrick Tucci

Thanks. How would I go about formally submitting a feature request for this?

On 2022/11/21 23:47:16 Andrew Melo wrote:
> I think this is the right place, just a hard question :) As far as I
> know, there's no "case insensitive flag", so YMMV
>
> On Mon, Nov 21, 2022 at 5:40 PM Patrick Tucci  wrote:
> >
> > Is this the wrong list for this type of question?
> >
> > On 2022/11/12 16:34:48 Patrick Tucci wrote:
> > > Hello,
> > >
> > > Is there a way to set string comparisons to be case-insensitive
> > globally? I
> > > understand LOWER() can be used, but my codebase contains 27k 
lines of SQL
> > > and many string comparisons. I would need to apply LOWER() to 
each string
> > > literal in the code base. I'd also need to change all the 
ETL/import code

> > > to apply LOWER() to each string value on import.
> > >
> > > Current behavior:
> > >
> > > SELECT 'ABC' = 'abc';
> > > false
> > > Time taken: 5.466 seconds, Fetched 1 row(s)
> > >
> > > SELECT 'ABC' IN ('AbC', 'abc');
> > > false
> > > Time taken: 5.498 seconds, Fetched 1 row(s)
> > >
> > > SELECT 'ABC' like 'Ab%'
> > > false
> > > Time taken: 5.439 seconds, Fetched 1 row(s)
> > >
> > > Desired behavior would be true for all of the above with the proposed
> > > case-insensitive flag set.
> > >
> > > Thanks,
> > >
> > > Patrick
> > >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-21 Thread Patrick Tucci

Is this the wrong list for this type of question?

On 2022/11/12 16:34:48 Patrick Tucci wrote:
> Hello,
>
> Is there a way to set string comparisons to be case-insensitive 
globally? I

> understand LOWER() can be used, but my codebase contains 27k lines of SQL
> and many string comparisons. I would need to apply LOWER() to each string
> literal in the code base. I'd also need to change all the ETL/import code
> to apply LOWER() to each string value on import.
>
> Current behavior:
>
> SELECT 'ABC' = 'abc';
> false
> Time taken: 5.466 seconds, Fetched 1 row(s)
>
> SELECT 'ABC' IN ('AbC', 'abc');
> false
> Time taken: 5.498 seconds, Fetched 1 row(s)
>
> SELECT 'ABC' like 'Ab%'
> false
> Time taken: 5.439 seconds, Fetched 1 row(s)
>
> Desired behavior would be true for all of the above with the proposed
> case-insensitive flag set.
>
> Thanks,
>
> Patrick
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-12 Thread Patrick Tucci
Hello,

Is there a way to set string comparisons to be case-insensitive globally? I
understand LOWER() can be used, but my codebase contains 27k lines of SQL
and many string comparisons. I would need to apply LOWER() to each string
literal in the code base. I'd also need to change all the ETL/import code
to apply LOWER() to each string value on import.

Current behavior:

SELECT 'ABC' = 'abc';
false
Time taken: 5.466 seconds, Fetched 1 row(s)

SELECT 'ABC' IN ('AbC', 'abc');
false
Time taken: 5.498 seconds, Fetched 1 row(s)

SELECT 'ABC' like 'Ab%'
false
Time taken: 5.439 seconds, Fetched 1 row(s)

Desired behavior would be true for all of the above with the proposed
case-insensitive flag set.

Thanks,

Patrick


Profiling options for PandasUDF (2.4.7 on yarn)

2021-05-28 Thread Patrick McCarthy
I'm trying to do a very large aggregation of sparse matrices in which my
source data looks like

root
 |-- device_id: string (nullable = true)
 |-- row_id: array (nullable = true)
 ||-- element: integer (containsNull = true)
 |-- column_id: array (nullable = true)
 ||-- element: integer (containsNull = true)



I assume each row to reflect a sparse matrix where each combination of
(row_id, column_id) has value of 1. I have a PandasUDF which performs a
GROUPED_MAP that transforms every row into a scipy.sparse.csr_matrix and,
within the group, sums the matrices before returning columns of (count,
row_id, column_id).

It works at small scale but gets unstable as I scale up. Is there a way to
profile this function in a spark session or am I limited to profiling on
pandas data frames without spark?

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue while installing dependencies Python Spark

2020-12-18 Thread Patrick McCarthy
At the risk of repeating myself, this is what I was hoping to avoid when I
suggested deploying a full, zipped, conda venv.

What is your motivation for running an install process on the nodes and
risking the process failing, instead of pushing a validated environment
artifact and not having that risk? In either case you move about the same
number of bytes around.

On Fri, Dec 18, 2020 at 3:04 PM Sachit Murarka 
wrote:

> Hi Patrick/Users,
>
> I am exploring wheel file form packages for this , as this seems simple:-
>
>
> https://bytes.grubhub.com/managing-dependencies-and-artifacts-in-pyspark-7641aa89ddb7
>
> However, I am facing another issue:- I am using pandas , which needs
> numpy. Numpy is giving error!
>
>
> ImportError: Unable to import required dependencies:
> numpy:
>
> IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
>
> Importing the numpy C-extensions failed. This error can happen for
> many reasons, often due to issues with your setup or how NumPy was
> installed.
>
> We have compiled some common reasons and troubleshooting tips at:
>
> https://numpy.org/devdocs/user/troubleshooting-importerror.html
>
> Please note and check the following:
>
>   * The Python version is: Python3.7 from "/usr/bin/python3"
>   * The NumPy version is: "1.19.4"
>
> and make sure that they are the versions you expect.
> Please carefully study the documentation linked above for further help.
>
> Original error was: No module named 'numpy.core._multiarray_umath'
>
>
>
> Kind Regards,
> Sachit Murarka
>
>
> On Thu, Dec 17, 2020 at 9:24 PM Patrick McCarthy 
> wrote:
>
>> I'm not very familiar with the environments on cloud clusters, but in
>> general I'd be reluctant to lean on setuptools or other python install
>> mechanisms. In the worst case, you might encounter /usr/bin/pip not having
>> permissions to install new packages, or even if you do a package might
>> require something you can't change like a libc dependency.
>>
>> Perhaps you can install the .whl and its dependencies to the virtualenv
>> on a local machine, and then *after* the install process, package that
>> venv?
>>
>> If possible, I like conda for this approach over a vanilla venv because
>> it will contain all the non-python dependencies (like libc) if they're
>> needed.
>>
>>
>> Another thing - I think there are several ways to do this, but I've had
>> the most success including the .zip containing my environment in
>> `spark.yarn.dist.archives` and then using a relative path:
>>
>> os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'
>>
>> dist_archives =
>> 'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'
>>
>> SparkSession.builder.
>> ...
>>  .config('spark.yarn.dist.archives', dist_archives)
>>
>>
>> On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
>> wrote:
>>
>>> Hi Users
>>>
>>> I have a wheel file , while creating it I have mentioned dependencies in
>>> setup.py file.
>>> Now I have 2 virtual envs, 1 was already there . another one I created
>>> just now.
>>>
>>> I have switched to new virtual env, I want spark to download the
>>> dependencies while doing spark-submit using wheel.
>>>
>>> Could you please help me on this?
>>>
>>> It is not downloading dependencies , instead it is pointing to older
>>> version of  virtual env and proceeding with the execution of spark job.
>>>
>>> Please note I have tried setting the env variables also.
>>> Also I have tried following options as well in spark submit
>>>
>>> --conf spark.pyspark.virtualenv.enabled=true  --conf
>>> spark.pyspark.virtualenv.type=native --conf
>>> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
>>> spark.pyspark.python= /path/to/venv/bin/python3 --conf
>>> spark.pyspark.driver.python=/path/to/venv/bin/python3
>>>
>>> This did not help too..
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Getting error message

2020-12-17 Thread Patrick McCarthy
Possibly. In that case maybe you should step back from spark and see if
there are OS-level tools to understand what's going on, like looking for
evidence of the OOM killer -
https://docs.memset.com/other/linux-s-oom-process-killer

On Thu, Dec 17, 2020 at 1:45 PM Vikas Garg  wrote:

> I am running code in a local machine that is single node machine.
>
> Getting into logs,  it looked like the host is killed.  This is happening
> very frequently an I am unable to find the reason of this.
>
> Could low memory be the reason?
>
> On Fri, 18 Dec 2020, 00:11 Patrick McCarthy, 
> wrote:
>
>> 'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'
>>
>> You may want to change the number of failures to a higher number like 4.
>> A single failure on a task should be able to be tolerated, especially if
>> you're on a shared cluster where resources can be preempted.
>>
>>  It seems that a node dies or goes off the network, so perhaps you can
>> also debug the logs on the failing node to see why it disappears and
>> prevent the failures in the first place.
>>
>> On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:
>>
>>> Mydomain is named by me while pasting the logs
>>>
>>> Also,  there are multiple class files in my project, if I run any 1 or 2
>>> at a time,  then they run fine,  sometimes they too give this error. But
>>> running all the classes at the same time always give this error.
>>>
>>> Once this error come, I can't run any program and on restarting the
>>> system, program starts running fine.
>>> This error goes away on
>>>
>>> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
>>> wrote:
>>>
>>>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on
>>>> your network, is it?
>>>>
>>>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Since last few days, I am getting error message while running my
>>>>> project. I have searched Google for the solution but didn't got any help.
>>>>>
>>>>> Can someone help me to figure out how I could mitigate this issue?
>>>>>
>>>>>
>>>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while
>>>>> beginning fetch of 1 outstanding blocks
>>>>>
>>>>> *java.io.IOException*: Failed to connect to
>>>>> my-domain.com/192.168.166.8:63534
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:253*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:195*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>>>> *NettyBlockTransferService.scala:122*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>>>> *RetryingBlockFetcher.java:141*)
>>>>>
>>>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>>>> *RetryingBlockFetcher.java:121*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>>>> *NettyBlockTransferService.scala:143*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>>>> *BlockTransferService.scala:103*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>>>> *BlockManager.scala:1010*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at scala.Option.orElse(*Option.scala:447*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>>>> *BlockManager.scala:1092*)
>>>>>
>>>>> at
>>>>&

Re: Getting error message

2020-12-17 Thread Patrick McCarthy
'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'

You may want to change the number of failures to a higher number like 4. A
single failure on a task should be able to be tolerated, especially if
you're on a shared cluster where resources can be preempted.

 It seems that a node dies or goes off the network, so perhaps you can also
debug the logs on the failing node to see why it disappears and prevent the
failures in the first place.

On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:

> Mydomain is named by me while pasting the logs
>
> Also,  there are multiple class files in my project, if I run any 1 or 2
> at a time,  then they run fine,  sometimes they too give this error. But
> running all the classes at the same time always give this error.
>
> Once this error come, I can't run any program and on restarting the
> system, program starts running fine.
> This error goes away on
>
> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
> wrote:
>
>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on your
>> network, is it?
>>
>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>
>>> Hi,
>>>
>>> Since last few days, I am getting error message while running my
>>> project. I have searched Google for the solution but didn't got any help.
>>>
>>> Can someone help me to figure out how I could mitigate this issue?
>>>
>>>
>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while beginning
>>> fetch of 1 outstanding blocks
>>>
>>> *java.io.IOException*: Failed to connect to
>>> my-domain.com/192.168.166.8:63534
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:253*)
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:195*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>> *NettyBlockTransferService.scala:122*)
>>>
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>> *RetryingBlockFetcher.java:141*)
>>>
>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>> *RetryingBlockFetcher.java:121*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>> *NettyBlockTransferService.scala:143*)
>>>
>>> at org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>> *BlockTransferService.scala:103*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>> *BlockManager.scala:1010*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>> *BlockManager.scala:954*)
>>>
>>> at scala.Option.orElse(*Option.scala:447*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>> *BlockManager.scala:954*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>> *BlockManager.scala:1092*)
>>>
>>> at
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
>>> *TaskResultGetter.scala:88*)
>>>
>>> at
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(
>>> *Utils.scala:1932*)
>>>
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
>>> *TaskResultGetter.scala:63*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> *ThreadPoolExecutor.java:1149*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> *ThreadPoolExecutor.java:624*)
>>>
>>> at java.lang.Thread.run(*Thread.java:748*)
>>>
>>> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
>>> Permission denied: no further information:
>>> my-domain.com/192.168.166.8:63534
>>>
>>> Caused by: *java.net.SocketException*: Permission denied: no further
>>> information
>>>
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>>>
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>>> *SocketChannelImpl.java:715*)
>>

Re: Issue while installing dependencies Python Spark

2020-12-17 Thread Patrick McCarthy
I'm not very familiar with the environments on cloud clusters, but in
general I'd be reluctant to lean on setuptools or other python install
mechanisms. In the worst case, you might encounter /usr/bin/pip not having
permissions to install new packages, or even if you do a package might
require something you can't change like a libc dependency.

Perhaps you can install the .whl and its dependencies to the virtualenv on
a local machine, and then *after* the install process, package that venv?

If possible, I like conda for this approach over a vanilla venv because it
will contain all the non-python dependencies (like libc) if they're needed.


Another thing - I think there are several ways to do this, but I've had the
most success including the .zip containing my environment in
`spark.yarn.dist.archives` and then using a relative path:

os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'

dist_archives =
'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'

SparkSession.builder.
...
 .config('spark.yarn.dist.archives', dist_archives)


On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
wrote:

> Hi Users
>
> I have a wheel file , while creating it I have mentioned dependencies in
> setup.py file.
> Now I have 2 virtual envs, 1 was already there . another one I created
> just now.
>
> I have switched to new virtual env, I want spark to download the
> dependencies while doing spark-submit using wheel.
>
> Could you please help me on this?
>
> It is not downloading dependencies , instead it is pointing to older
> version of  virtual env and proceeding with the execution of spark job.
>
> Please note I have tried setting the env variables also.
> Also I have tried following options as well in spark submit
>
> --conf spark.pyspark.virtualenv.enabled=true  --conf
> spark.pyspark.virtualenv.type=native --conf
> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
> spark.pyspark.python= /path/to/venv/bin/python3 --conf
> spark.pyspark.driver.python=/path/to/venv/bin/python3
>
> This did not help too..
>
> Kind Regards,
> Sachit Murarka
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-10-30 Thread Patrick McCarthy
That's a very large vector. Is it sparse? Perhaps you'd have better luck
performing an aggregate instead of a pivot, and assembling the vector using
a UDF.

On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
 wrote:

> Hello,
>
> I have a very large long-format dataframe (several billion rows) that I'd
> like to pivot and vectorize (using the VectorAssembler), with the aim to
> reduce dimensionality using something akin to TF-IDF. Once pivoted, the
> dataframe will have ~130 million columns.
>
> The source, long-format schema looks as follows:
>
> root
>  |-- entity_id: long (nullable = false)
>  |-- attribute_id: long (nullable = false)
>  |-- event_count: integer (nullable = true)
>
> Pivoting as per the following fails, exhausting executor and driver
> memory. I am unsure whether increasing memory limits would be successful
> here as my sense is that pivoting and then using a VectorAssembler isn't
> the right approach to solving this problem.
>
> wide_frame = (
> long_frame.groupBy("entity_id")
> .pivot("attribute_id")
> .agg(F.first("event_count"))
> )
>
> Are there other Spark patterns that I should attempt in order to achieve
> my end goal of a vector of attributes for every entity?
>
> Thanks, Daniel
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Hive using Spark engine vs native spark with hive integration.

2020-10-07 Thread Patrick McCarthy
I think a lot will depend on what the scripts do. I've seen some legacy
hive scripts which were written in an awkward way (e.g. lots of subqueries,
nested explodes) because pre-spark it was the only way to express certain
logic. For fairly straightforward operations I expect Catalyst would reduce
both code to similar plans.

On Tue, Oct 6, 2020 at 12:07 PM Manu Jacob 
wrote:

> Hi All,
>
>
>
> Not sure if I need to ask this question on spark community or hive
> community.
>
>
>
> We have a set of hive scripts that runs on EMR (Tez engine). We would like
> to experiment by moving some of it onto Spark. We are planning to
> experiment with two options.
>
>
>
>1. Use the current code based on HQL, with engine set as spark.
>2. Write pure spark code in scala/python using SparkQL and hive
>integration.
>
>
>
> The first approach helps us to transition to Spark quickly but not sure if
> this is the best approach in terms of performance.  Could not find any
> reasonable comparisons of this two approaches.  It looks like writing pure
> Spark code, gives us more control to add logic and also control some of the
> performance features, for example things like caching/evicting etc.
>
>
>
>
>
> Any advice on this is much appreciated.
>
>
>
>
>
> Thanks,
>
> -Manu
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Patrick McCarthy
Can you simply do a string split on space, and then another on '='?

On Sun, Aug 9, 2020 at 12:00 PM anbutech  wrote:

> Hi All,
>
> I have a following info.in the data column.
>
> <1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
> packt=20 orgin=null address=null dest=fgjglgl
>
> here I want to create a separate column for the above key value pairs after
> the integer <1000> separated by spaces.
> Is there any way to achieved it using regexp_extract inbuilt functions.i
> don't want to do it using udf function.
> apart from udf,is there any way to achieved it.
>
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
If you use pandas_udfs in 2.4 they should be quite performant (or at least
won't suffer serialization overhead), might be worth looking into.

I didn't run your code but one consideration is that the while loop might
be making the DAG a lot bigger than it has to be. You might see if defining
those columns with list comprehensions forming a single select() statement
makes for a smaller DAG.

On Mon, Aug 3, 2020 at 10:06 AM Henrique Oliveira  wrote:

> Hi Patrick, thank you for your quick response.
> That's exactly what I think. Actually, the result of this processing is an
> intermediate table that is going to be used for other views generation.
> Another approach I'm trying now, is to move the "explosion" step for this
> "view generation" step, this way I don't need to explode every column but
> just those used for the final client.
>
> ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
> python udfs I tried had very bad performance, but I will give it a try in
> this case. It can't be worse.
> Thanks again!
>
> Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
> pmccar...@dstillery.com> escreveu:
>
>> This seems like a very expensive operation. Why do you want to write out
>> all the exploded values? If you just want all combinations of values, could
>> you instead do it at read-time with a UDF or something?
>>
>> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>>
>>> I forgot to add an information. By "can't write" I mean it keeps
>>> processing
>>> and nothing happens. The job runs for hours even with a very small file
>>> and
>>> I have to force the stoppage.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
This seems like a very expensive operation. Why do you want to write out
all the exploded values? If you just want all combinations of values, could
you instead do it at read-time with a UDF or something?

On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:

> I forgot to add an information. By "can't write" I mean it keeps processing
> and nothing happens. The job runs for hours even with a very small file and
> I have to force the stoppage.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue in parallelization of CNN model using spark

2020-07-14 Thread Patrick McCarthy
Please don't advocate for piracy, this book is not freely available.

I own it and it's wonderful, Mr. Géron deserves to benefit from it.

On Mon, Jul 13, 2020 at 9:59 PM Anwar AliKhan 
wrote:

>  link to a free book  which may be useful.
>
> Hands-On Machine Learning with Scikit-Learn, Keras, and Tensorflow
> Concepts, Tools, and Techniques to Build Intelligent Systems by Aurélien
> Géron
>
> https://bit.ly/2zxueGt
>
>
>
>
>
>  13 Jul 2020, 15:18 Sean Owen,  wrote:
>
>> There is a multilayer perceptron implementation in Spark ML, but
>> that's not what you're looking for.
>> To parallelize model training developed using standard libraries like
>> Keras, use Horovod from Uber.
>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>
>> On Mon, Jul 13, 2020 at 6:59 AM Mukhtaj Khan  wrote:
>> >
>> > Dear Spark User
>> >
>> > I am trying to parallelize the CNN (convolutional neural network) model
>> using spark. I have developed the model using python and Keras library. The
>> model works fine on a single machine but when we try on multiple machines,
>> the execution time remains the same as sequential.
>> > Could you please tell me that there is any built-in library for CNN to
>> parallelize in spark framework. Moreover, MLLIB does not have any support
>> for CNN.
>> > Best regards
>> > Mukhtaj
>> >
>> >
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Building Spark 3.0.0 for Hive 1.2

2020-07-10 Thread Patrick McCarthy
I'm trying to build Spark 3.0.0 for my Yarn cluster, with Hadoop 2.7.3 and
Hive 1.2.1. I downloaded the source and created a runnable dist with

./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
-Phive-1.2 -Phadoop-2.7 -Pyarn

We're running Spark 2.4.0 in production so I copied the hive-site.xml,
spark-env.sh and spark-defaults.conf from there.

When I try to create a SparkSession in a normal Python REPL, I get the
following uninformative error. How can I debug this? I can run the
spark-shell and get to a scala prompt with Hive access seemingly without
error.

Python 3.6.3 (default, Apr 10 2018, 16:07:04)[GCC 4.8.3 20140911 (Red
Hat 4.8.3-9)] on linuxType "help", "copyright", "credits" or "license"
for more information.>>> import os>>> import sys>>>
os.environ['SPARK_HOME'] = '/home/pmccarthy/custom-spark-3'>>>
sys.path.insert(0,os.path.join(os.environ['SPARK_HOME'],'python','lib','py4j-src.zip'))>>>
sys.path.append(os.path.join(os.environ['SPARK_HOME'],'python'))>>>
import pyspark>>> from pyspark.sql import SparkSession>>> spark =
(SparkSession.builder.enableHiveSupport().config('spark.master','local').getOrCreate())

Traceback (most recent call last):
  File "", line 1, in 
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/session.py",
line 191, in getOrCreate
session._jsparkSession.sessionState().conf().setConfString(key, value)
  File 
"/home/pmccarthy/custom-spark-3/python/lib/py4j-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/utils.py",
line 137, in deco
raise_from(converted)
  File "", line 3, in raise_from
pyspark.sql.utils.IllegalArgumentException: 


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Reading TB of JSON file

2020-06-18 Thread Patrick McCarthy
Assuming that the file can be easily split, I would divide it into a number
of pieces and move those pieces to HDFS before using spark at all, using
`hdfs dfs` or similar. At that point you can use your executors to perform
the reading instead of the driver.

On Thu, Jun 18, 2020 at 9:12 AM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I have a 50GB of JSON file, I would like to read and persist at HDFS so it
> can be taken into next transformation. I am trying to read as
> spark.read.json(path) but this is giving Out of memory error on driver.
> Obviously, I can't afford having 50 GB on driver memory. In general, what
> is the best practice to read large JSON file like 50 GB?
>
> Thanks
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Add python library

2020-06-08 Thread Patrick McCarthy
I've found Anaconda encapsulates modules and dependencies and such nicely,
and you can deploy all the needed .so files and such by deploying a whole
conda environment.

I've used this method with success:
https://community.cloudera.com/t5/Community-Articles/Running-PySpark-with-Conda-Env/ta-p/247551

On Sat, Jun 6, 2020 at 4:16 PM Anwar AliKhan 
wrote:

>  " > Have you looked into this article?
> https://medium.com/@SSKahani/pyspark-applications-dependencies-99415e0df987
>  "
>
> This is weird !
> I was hanging out here https://machinelearningmastery.com/start-here/.
> When I came across this post.
>
> The weird part is I was just wondering  how I can take one of the
> projects(Open AI GYM taxi-vt2 in Python), a project I want to develop
> further.
>
> I want to run on Spark using Spark's parallelism features and GPU
> capabilities,  when I am using bigger datasets . While installing the
> workers (slaves)  doing the sliced dataset computations on the new 8GB RAM
> Raspberry Pi (Linux).
>
> Are any other documents on official website which shows how to do that,
> or any other location  , preferably showing full self contained examples?
>
>
>
> On Fri, 5 Jun 2020, 09:02 Dark Crusader, 
> wrote:
>
>> Hi Stone,
>>
>>
>> I haven't tried it with .so files however I did use the approach he
>> recommends to install my other dependencies.
>> I Hope it helps.
>>
>> On Fri, Jun 5, 2020 at 1:12 PM Stone Zhong  wrote:
>>
>>> Hi,
>>>
>>> So my pyspark app depends on some python libraries, it is not a problem,
>>> I pack all the dependencies into a file libs.zip, and then call
>>> *sc.addPyFile("libs.zip")* and it works pretty well for a while.
>>>
>>> Then I encountered a problem, if any of my library has any binary file
>>> dependency (like .so files), this approach does not work. Mainly because
>>> when you set PYTHONPATH to a zip file, python does not look up needed
>>> binary library (e.g. a .so file) inside the zip file, this is a python
>>> *limitation*. So I got a workaround:
>>>
>>> 1) Do not call sc.addPyFile, instead extract the libs.zip into current
>>> directory
>>> 2) When my python code starts, manually call *sys.path.insert(0,
>>> f"{os.getcwd()}/libs")* to set PYTHONPATH
>>>
>>> This workaround works well for me. Then I got another problem: what if
>>> my code in executor need python library that has binary code? Below is am
>>> example:
>>>
>>> def do_something(p):
>>> ...
>>>
>>> rdd = sc.parallelize([
>>> {"x": 1, "y": 2},
>>> {"x": 2, "y": 3},
>>> {"x": 3, "y": 4},
>>> ])
>>> a = rdd.map(do_something)
>>>
>>> What if the function "do_something" need a python library that has
>>> binary code? My current solution is, extract libs.zip into a NFS share (or
>>> a SMB share) and manually do *sys.path.insert(0,
>>> f"share_mount_dir/libs") *in my "do_something" function, but adding
>>> such code in each function looks ugly, is there any better/elegant solution?
>>>
>>> Thanks,
>>> Stone
>>>
>>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the
bucketing still seems to require the exchange before the join. Both tables
below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
*   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
   : +- *(1) Project [x#29, y#30, z#31]
   :+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :   +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
*  +- Exchange hashpartitioning(x#35, y#36, 200)*
 +- *(3) Project [x#35, y#36, z#37]
+- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
   +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim  wrote:

> You can use bucketBy to avoid shuffling in your scenario. This test suite
> has some examples:
> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>
> Thanks,
> Terry
>
> On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
> wrote:
>
>> Hey all,
>>
>> I have one large table, A, and two medium sized tables, B & C, that I'm
>> trying to complete a join on efficiently. The result is multiplicative on A
>> join B, so I'd like to avoid shuffling that result. For this example, let's
>> just assume each table has three columns, x, y, z. The below is all being
>> tested on Spark 2.4.5 locally.
>>
>> I'd like to perform the following join:
>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>> This outputs the following physical plan:
>> == Physical Plan ==
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>>: +- LocalTableScan [x#72, y#73, z#74]
>>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>   +- Exchange hashpartitioning(x#52, z#54, 200)
>>  +- LocalTableScan [x#52, y#53, z#54]
>>
>>
>> I may be misremembering, but in the past I thought you had the ability to
>> pre-partition each table by "x" and it would satisfy the requirements of
>> the join since it is already clustered by the key on both sides using the
>> same hash function (this assumes numPartitions lines up obviously). However
>> it seems like it will insert another exchange:
>>
>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- Exchange hashpartitioning(x#32, 200)
>>:   :+- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
&g

Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm
trying to complete a join on efficiently. The result is multiplicative on A
join B, so I'd like to avoid shuffling that result. For this example, let's
just assume each table has three columns, x, y, z. The below is all being
tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to
pre-partition each table by "x" and it would satisfy the requirements of
the join since it is already clustered by the key on both sides using the
same hash function (this assumes numPartitions lines up obviously). However
it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x",
"y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- Exchange hashpartitioning(x#32, 200)
   :   :+- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- Exchange hashpartitioning(x#72, 200)
   :+- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- ReusedExchange [x#52, y#53, z#54], Exchange
hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I
assume that is because it doesn't have to consider the other side of the
join.

Did this used to work or am I simply confusing it with groupBy? Either way
- any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat


[Structured Streaming] Connecting to Kafka via a Custom Consumer / Producer

2020-04-22 Thread Patrick McGloin
Hi,

The large international bank I work for has a custom Kafka implementation.
The client libraries that are used to connect to Kafka have extra security
steps.  They implement the Kafka Consumer and Producer interfaces in this
client library so once we use it to connect to Kafka, we can treat our
connections as the standard Kafka interfaces in our code.

We can't use the out-of-the-box Kafka connecter from Structured Streaming
as only a KafkaConsumer can be used.

Would it be possible / advisable / a good idea to change this to use the
Consumer interface and allow users to specify a callback somehow to create
their own connection to Kafka?

So the signature of this private method in InternalKafkaConsumer would
change to use the Consumer interface (as would the rest of the code base)
and somehow users are given the option to create their own Consumer if they
wanted.  The same would apply for Producers.

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
  ...
  val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParamsWithSecurity)
  *...*
}

At the moment we are left with two options, copy the Spark code base and
swap in our custom Consumer for the KafkaConsumer used in that function
(and a few other changes).  This leaves us with a codebase to maintain that
will be out of sync over time.  Or we can build and maintain our own custom
connecter.

Bet regards,
Patrick


Re: Save Spark dataframe as dynamic partitioned table in Hive

2020-04-16 Thread Patrick McCarthy
|
> |123456789   |
>
> +-+-+-+++-+-++
>
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
>
> == SQL ==
>
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand = dummy)
>
> -^^^
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
>   FROM tmp
>
> It fails passing partition values
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Design pattern to invert a large map

2020-03-31 Thread Patrick McCarthy
I'm not a software engineer by training and I hope that there's an existing
best practice for the problem I'm trying to solve. I'm using Spark 2.4.5,
Hadoop 2.7, Hive 1.2.

I have a large table (terabytes) from an external source (which is beyond
my control) where the data is stored in a key-value format with an array of
values:

| id | val
+ - +---
| k1 | 
| k2 | 
| k3 | 

I want to invert the map so that I have a collection of keys for each value
(let's assume I don't care about uniqueness):

| id | val
+ - + --
| v1 | 
| v2 | 
| v3 | 
| v5 | 

It seems like a lot of shuffle is required somehow, but I'm not sure what
the best approach is. I've written solutions using DataFrame (with
explode(), groupBy() and collect_set()) and with RDD but it's always very
expensive.

Is there a best practice technique for this kind of operation? My leading
thought so far is to restage the data in a partitioned, bucketed flat table
as an intermediary step but that too is costly in terms of disk space and
transform time.

Thanks,
Patrick


Spark Mllib logistic regression setWeightCol illegal argument exception

2020-01-09 Thread Patrick
Hi Spark Users,

I am trying to solve a class imbalance problem, I figured out, spark
supports setting weight in its API but I get IIlegal Argument exception
weight column do not exist, but it do exists in the dataset. Any
recommedation to go about this problem ? I am using Pipeline API with
Logistic regression model and TestTrainSplit.

LogisticRegression l;
l.setWeightCol()


Caused by: java.lang.IllegalArgumentException: Field "weight" does not
exist.

at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)

at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)

at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)

at scala.collection.AbstractMap.getOrElse(Map.scala:59)

at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)

at
org.apache.spark.ml.util.SchemaUtils$.checkNumericType(SchemaUtils.scala:71)

at
org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:58)

at org.apache.spark.ml.classification.Classifier.org
$apache$spark$ml$classification$ClassifierParams$$super$validateAndTransformSchema(Classifier.scala:58)

at
org.apache.spark.ml.classification.ClassifierParams$class.validateAndTransformSchema(Classifier.scala:42)

at org.apache.spark.ml.classification.ProbabilisticClassifier.org
$apache$spark$ml$classification$ProbabilisticClassifierParams$$super$validateAndTransformSchema(ProbabilisticClassifier.scala:53)

at
org.apache.spark.ml.classification.ProbabilisticClassifierParams$class.validateAndTransformSchema(ProbabilisticClassifier.scala:37)

at org.apache.spark.ml.classification.LogisticRegression.org
$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:278)

at
org.apache.spark.ml.classification.LogisticRegressionParams$class.validateAndTransformSchema(LogisticRegression.scala:265)

at
org.apache.spark.ml.classification.LogisticRegression.validateAndTransformSchema(LogisticRegression.scala:278)

at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:144)

at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)

at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)

at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)

at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)

at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)

at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:184)

at
org.apache.spark.ml.tuning.ValidatorParams$class.transformSchemaImpl(ValidatorParams.scala:77)

at
org.apache.spark.ml.tuning.TrainValidationSplit.transformSchemaImpl(TrainValidationSplit.scala:67)

at
org.apache.spark.ml.tuning.TrainValidationSplit.transformSchema(TrainValidationSplit.scala:180)

at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)

at
org.apache.spark.ml.tuning.TrainValidationSplit.fit(TrainValidationSplit.scala:121)


Thanks in advance,


Re: Using Percentile in Spark SQL

2019-11-11 Thread Patrick McCarthy
Depending on your tolerance for error you could also use
percentile_approx().

On Mon, Nov 11, 2019 at 10:14 AM Jerry Vinokurov 
wrote:

> Do you mean that you are trying to compute the percent rank of some data?
> You can use the SparkSQL percent_rank function for that, but I don't think
> that's going to give you any improvement over calling the percentRank
> function on the data frame. Are you currently using a user-defined function
> for this task? Because I bet that's what's slowing you down.
>
> On Mon, Nov 11, 2019 at 9:46 AM Tzahi File  wrote:
>
>> Hi,
>>
>> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
>> percentile function. I'm trying to improve this job by moving it to run
>> with spark SQL.
>>
>> Any suggestions on how to use a percentile function in Spark?
>>
>>
>> Thanks,
>> --
>> Tzahi File
>> Data Engineer
>> [image: ironSource] <http://www.ironsrc.com/>
>>
>> email tzahi.f...@ironsrc.com
>> mobile +972-546864835
>> fax +972-77-5448273
>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>> ironsrc.com <http://www.ironsrc.com/>
>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>> twitter] <https://twitter.com/ironsource>[image: facebook]
>> <https://www.facebook.com/ironSource>[image: googleplus]
>> <https://plus.google.com/+ironsrc>
>> This email (including any attachments) is for the sole use of the
>> intended recipient and may contain confidential information which may be
>> protected by legal privilege. If you are not the intended recipient, or the
>> employee or agent responsible for delivering it to the intended recipient,
>> you are hereby notified that any use, dissemination, distribution or
>> copying of this communication and/or its content is strictly prohibited. If
>> you are not the intended recipient, please immediately notify us by reply
>> email or by telephone, delete this email and destroy any copies. Thank you.
>>
>
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Best practices for data like file storage

2019-11-01 Thread Patrick McCarthy
Hi List,

I'm looking for resources to learn about how to store data on disk for
later access.

For a while my team has been using Spark on top of our existing hdfs/Hive
cluster without much agency as far as what format is used to store the
data. I'd like to learn more about how to re-stage my data to speed up my
own analyses, and to start building expertise to define new data stores.

One example of a problem I'm facing is data which is written to Hive using
a customized protobuf serde. The data contains many very complex types
(arrays of structs of arrays of... ) and I often need very few elements of
any particular record, yet the format requires Spark to deserialize the
entire object.

The sorts of information I'm looking for:

   - Do's and Dont's of laying out a parquet schema
   - Measuring / debugging read speed
   - How to bucket, index, etc.

Thanks!


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-13 Thread Patrick McCarthy
If you only care that you're deduping on one of the fields you could add an
index and count like so:

df3 = df1.withColumn('idx',lit(1))
.union(df2.withColumn('idx',lit(2))

remove_df = df3
.groupBy('id')
.agg(collect_set('idx').alias('set_size')
.filter(size(col('set_size') > 1))
.select('id', lit(2).alias('idx'))

# the duplicated ids in the above are now coded for df2, so only those will
be dropped

df3.join(remove_df, on=['id','idx'], how='leftanti')

On Fri, Sep 13, 2019 at 11:44 AM Abhinesh Hada 
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: script running in jupyter 6-7x faster than spark submit

2019-09-11 Thread Patrick McCarthy
Are you running in cluster mode? A large virtualenv zip for the driver sent
into the cluster on a slow pipe could account for much of that eight
minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati 
wrote:

> Hi,
>
> I just ran the same script in a shell in jupyter notebook and find the
> performance to be similar. So I can confirm this is because the libraries
> used jupyter notebook python is different than the spark-submit python this
> is happening.
>
> But now I have a following question. Are the dependent libraries in a
> python script also transferred to the worker machines when executing a
> python script in spark. Because though the driver python versions are
> different, the workers machines will use their same python environment to
> run the code. If anyone can explain this part, it would be helpful.
>
>
>
>
> *Regards,Dhrubajyoti Hati.Mob No: 9886428028/9652029028*
>
>
> On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati 
> wrote:
>
>> Just checked from where the script is submitted i.e. wrt Driver, the
>> python env are different. Jupyter one is running within a the virtual
>> environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But
>> the executors have the same python version right? I tried doing a
>> spark-submit from jupyter shell, it fails to find python 2.7  which is not
>> there hence throws error.
>>
>> Here is the udf which might take time:
>>
>> import base64
>> import zlib
>>
>> def decompress(data):
>>
>> bytecode = base64.b64decode(data)
>> d = zlib.decompressobj(32 + zlib.MAX_WBITS)
>> decompressed_data = d.decompress(bytecode )
>> return(decompressed_data.decode('utf-8'))
>>
>>
>> Could this because of the two python environment mismatch from Driver side? 
>> But the processing
>>
>> happens in the executor side?
>>
>>
>>
>>
>> *Regards,Dhrub*
>>
>> On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari 
>> wrote:
>>
>>> Maybe you can try running it in a python shell or
>>> jupyter-console/ipython instead of a spark-submit and check how much time
>>> it takes too.
>>>
>>> Compare the env variables to check that no additional env configuration
>>> is present in either environment.
>>>
>>> Also is the python environment for both the exact same? I ask because it
>>> looks like you're using a UDF and if the Jupyter python has (let's say)
>>> numpy compiled with blas it would be faster than a numpy without it. Etc.
>>> I.E. Some library you use may be using pure python and another may be using
>>> a faster C extension...
>>>
>>> What python libraries are you using in the UDFs? It you don't use UDFs
>>> at all and use some very simple pure spark functions does the time
>>> difference still exist?
>>>
>>> Also are you using dynamic allocation or some similar spark config which
>>> could vary performance between runs because the same resources we're not
>>> utilized on Jupyter / spark-submit?
>>>
>>>
>>> On Wed, Sep 11, 2019, 08:43 Stephen Boesch  wrote:
>>>
>>>> Sounds like you have done your homework to properly compare .   I'm
>>>> guessing the answer to the following is yes .. but in any case:  are they
>>>> both running against the same spark cluster with the same configuration
>>>> parameters especially executor memory and number of workers?
>>>>
>>>> Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <
>>>> dhruba.w...@gmail.com>:
>>>>
>>>>> No, i checked for that, hence written "brand new" jupyter notebook.
>>>>> Also the time taken by both are 30 mins and ~3hrs as i am reading a 500
>>>>> gigs compressed base64 encoded text data from a hive table and
>>>>> decompressing and decoding in one of the udfs. Also the time compared is
>>>>> from Spark UI not  how long the job actually takes after submission. Its
>>>>> just the running time i am comparing/mentioning.
>>>>>
>>>>> As mentioned earlier, all the spark conf params even match in two
>>>>> scripts and that's why i am puzzled what going on.
>>>>>
>>>>> On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <
>>>>> pmccar...@dstillery.com> wrote:
>>>>>
>>>>>> It's not obvious from what you pasted, but perhaps the juypter
>>>>>> notebook already is connected to a running spark

Re: script running in jupyter 6-7x faster than spark submit

2019-09-10 Thread Patrick McCarthy
It's not obvious from what you pasted, but perhaps the juypter notebook
already is connected to a running spark context, while spark-submit needs
to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new
cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati 
wrote:

> Hi,
>
> I am facing a weird behaviour while running a python script. Here is what
> the code looks like mostly:
>
> def fn1(ip):
>some code...
> ...
>
> def fn2(row):
> ...
> some operations
> ...
> return row1
>
>
> udf_fn1 = udf(fn1)
> cdf = spark.read.table("") //hive table is of size > 500 Gigs with
> ~4500 partitions
> ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
> .drop("colz") \
> .withColumnRenamed("colz", "coly")
>
> edf = ddf \
> .filter(ddf.colp == 'some_value') \
> .rdd.map(lambda row: fn2(row)) \
> .toDF()
>
> print edf.count() // simple way for the performance test in both platforms
>
> Now when I run the same code in a brand new jupyter notebook it runs 6x
> faster than when I run this python script using spark-submit. The
> configurations are printed and  compared from both the platforms and they
> are exact same. I even tried to run this script in a single cell of jupyter
> notebook and still have the same performance. I need to understand if I am
> missing something in the spark-submit which is causing the issue.  I tried
> to minimise the script to reproduce the same error without much code.
>
> Both are run in client mode on a yarn based spark cluster. The machines
> from which both are executed are also the same and from same user.
>
> What i found is the  the quantile values for median for one ran with
> jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not
> able to figure out why this is happening.
>
> Any one faced this kind of issue before or know how to resolve this?
>
> *Regards,*
> *Dhrub*
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Hive external table not working in sparkSQL when subdirectories are present

2019-08-07 Thread Patrick McCarthy
Do the permissions on the hive table files on HDFS correspond with what the
spark user is able to read? This might arise from spark being run as
different users.

On Wed, Aug 7, 2019 at 3:15 PM Rishikesh Gawade 
wrote:

> Hi,
> I did not explicitly create a Hive Context. I have been using the
> spark.sqlContext that gets created upon launching the spark-shell.
> Isn't this sqlContext same as the hiveContext?
> Thanks,
> Rishikesh
>
> On Wed, Aug 7, 2019 at 12:43 PM Jörn Franke  wrote:
>
>> Do you use the HiveContext in Spark? Do you configure the same options
>> there? Can you share some code?
>>
>> Am 07.08.2019 um 08:50 schrieb Rishikesh Gawade > >:
>>
>> Hi.
>> I am using Spark 2.3.2 and Hive 3.1.0.
>> Even if i use parquet files the result would be same, because after all
>> sparkSQL isn't able to descend into the subdirectories over which the table
>> is created. Could there be any other way?
>> Thanks,
>> Rishikesh
>>
>> On Tue, Aug 6, 2019, 1:03 PM Mich Talebzadeh 
>> wrote:
>>
>>> which versions of Spark and Hive are you using.
>>>
>>> what will happen if you use parquet tables instead?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 6 Aug 2019 at 07:58, Rishikesh Gawade 
>>> wrote:
>>>
>>>> Hi.
>>>> I have built a Hive external table on top of a directory 'A' which has
>>>> data stored in ORC format. This directory has several subdirectories inside
>>>> it, each of which contains the actual ORC files.
>>>> These subdirectories are actually created by spark jobs which ingest
>>>> data from other sources and write it into this directory.
>>>> I tried creating a table and setting the table properties of the same
>>>> as *hive.mapred.supports.subdirectories=TRUE* and
>>>> *mapred.input.dir.recursive**=TRUE*.
>>>> As a result of this, when i fire the simplest query of *select
>>>> count(*) from ExtTable* via the Hive CLI, it successfully gives me the
>>>> expected count of records in the table.
>>>> However, when i fire the same query via sparkSQL, i get count = 0.
>>>>
>>>> I think the sparkSQL isn't able to descend into the subdirectories for
>>>> getting the data while hive is able to do so.
>>>> Are there any configurations needed to be set on the spark side so that
>>>> this works as it does via hive cli?
>>>> I am using Spark on YARN.
>>>>
>>>> Thanks,
>>>> Rishikesh
>>>>
>>>> Tags: subdirectories, subdirectory, recursive, recursion, hive external
>>>> table, orc, sparksql, yarn
>>>>
>>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Spark Image resizing

2019-07-31 Thread Patrick McCarthy
It won't be very efficient but you could write a python UDF using
PythonMagick - https://wiki.python.org/moin/ImageMagick

If you have PyArrow > 0.10 then you might be able to get a boost by saving
images in a column as BinaryType and writing a PandasUDF.

On Wed, Jul 31, 2019 at 6:22 AM Nick Dawes  wrote:

> Any other way of resizing the image before creating the DataFrame in
> Spark? I know opencv does it. But I don't have opencv on my cluster. I have
> Anaconda python packages installed on my cluster.
>
> Any ideas will be appreciated.  Thank you!
>
> On Tue, Jul 30, 2019, 4:17 PM Nick Dawes  wrote:
>
>> Hi
>>
>> I'm new to spark image data source.
>>
>> After creating a dataframe using Spark's image data source, I would like
>> to resize the images in PySpark.
>>
>> df = spark.read.format("image").load(imageDir)
>>
>> Can you please help me with this?
>>
>> Nick
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Patrick McCarthy
; On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
> Ping? I would really appreciate advice on this! Thank you!
>
>
>
> *From:* Gautham Acharya
> *Sent:* Tuesday, July 9, 2019 4:22 PM
> *To:* user@spark.apache.org
> *Subject:* [Beginner] Run compute on large matrices and return the result
> in seconds?
>
>
>
> This is my first email to this mailing list, so I apologize if I made any
> errors.
>
>
>
> My team's going to be building an application and I'm investigating some
> options for distributed compute systems. We want to be performing computes
> on large matrices.
>
>
>
> The requirements are as follows:
>
>
>
> 1. The matrices can be expected to be up to 50,000 columns x 3
> million rows. The values are all integers (except for the row/column
> headers).
>
> 2. The application needs to select a specific row, and calculate the
> correlation coefficient (
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fpandas.pydata.org%2Fpandas-docs%2Fstable%2Freference%2Fapi%2Fpandas.DataFrame.corr.html=02%7C01%7C%7C7d44353d2dd5420bc35108d70abff11d%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C1%7C636989691818868018=e5blX8ItE1JDJRx9D3FnmsXp4TnOKvyH6fA6%2Fw2QTbI%3D=0>
>  )
> against every other row. This means up to 3 million different calculations.
>
> 3. A sorted list of the correlation coefficients and their
> corresponding row keys need to be returned in under 5 seconds.
>
> 4. Users will eventually request random row/column subsets to run
> calculations on, so precomputing our coefficients is not an option. This
> needs to be done on request.
>
>
>
> I've been looking at many compute solutions, but I'd consider Spark first
> due to the widespread use and community. I currently have my data loaded
> into Apache Hbase for a different scenario (random access of rows/columns).
> I’ve naively tired loading a dataframe from the CSV using a Spark instance
> hosted on AWS EMR, but getting the results for even a single correlation
> takes over 20 seconds.
>
>
>
> Thank you!
>
>
>
>
>
> --gautham
>
>
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: spark python script importError problem

2019-07-16 Thread Patrick McCarthy
Your module 'feature' isn't available to the yarn workers, so you'll need
to either install it on them if you have access, or else upload to the
workers at runtime using --py-files or similar.

On Tue, Jul 16, 2019 at 7:16 AM zenglong chen 
wrote:

> Hi,all,
>   When i run a run a python script on spark submit,it done well in
> local[*] mode,but not in standalone mode or yarn mode.The error like below:
>
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most
> recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/worker.py", line
> 364, in main
> func, profiler, deserializer, serializer = read_command(pickleSer,
> infile)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/worker.py", line
> 69, in read_command
> command = serializer._read_with_length(file)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/serializers.py",
> line 172, in _read_with_length
> return self.loads(obj)
>   File "/usr/local/lib/python2.7/dist-packages/pyspark/serializers.py",
> line 583, in loads
> return pickle.loads(obj)
> ImportError: No module named feature.user.user_feature
>
> The script also run well in "sbin/start-master.sh sbin/start-slave.sh",but
> it has the same importError problem in "sbin/start-master.sh
> sbin/start-slaves.sh".The conf/slaves contents is 'localhost'.
>
> What should i do to solve this import problem?Thanks!!!
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Patrick McCarthy
Thanks Gourav.

Incidentally, since the regular UDF is row-wise, we could optimize that a
bit by taking the convert() closure and simply making that the UDF.

Since there's that MGRS object that we have to create too, we could
probably optimize it further by applying the UDF via rdd.mapPartitions,
which would allow the UDF to instantiate objects once per-partition instead
of per-row and then iterate element-wise through the rows of the partition.

All that said, having done the above on prior projects I find the pandas
abstractions to be very elegant and friendly to the end-user so I haven't
looked back :)

(The common memory model via Arrow is a nice boost too!)

On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta 
wrote:

> The proof is in the pudding
>
> :)
>
>
>
> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta 
> wrote:
>
>> Hi Patrick,
>>
>> super duper, thanks a ton for sharing the code. Can you please confirm
>> that this runs faster than the regular UDF's?
>>
>> Interestingly I am also running same transformations using another geo
>> spatial library in Python, where I am passing two fields and getting back
>> an array.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy 
>> wrote:
>>
>>> Human time is considerably more expensive than computer time, so in that
>>> regard, yes :)
>>>
>>> This took me one minute to write and ran fast enough for my needs. If
>>> you're willing to provide a comparable scala implementation I'd be happy to
>>> compare them.
>>>
>>> @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
>>>
>>> def generate_mgrs_series(lat_lon_str, level):
>>>
>>> import mgrs
>>>
>>> m = mgrs.MGRS()
>>>
>>> precision_level = 0
>>>
>>> levelval = level[0]
>>>
>>> if levelval == 1000:
>>>
>>>precision_level = 2
>>>
>>> if levelval == 100:
>>>
>>>precision_level = 3
>>>
>>> def convert(ll_str):
>>>
>>>   lat, lon = ll_str.split('_')
>>>
>>>   return m.toMGRS(lat, lon,
>>>
>>>   MGRSPrecision = precision_level)
>>>
>>> return lat_lon_str.apply(lambda x: convert(x))
>>>
>>> On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> And you found the PANDAS UDF more performant ? Can you share your code
>>>> and prove it?
>>>>
>>>> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> I disagree that it's hype. Perhaps not 1:1 with pure scala
>>>>> performance-wise, but for python-based data scientists or others with a 
>>>>> lot
>>>>> of python expertise it allows one to do things that would otherwise be
>>>>> infeasible at scale.
>>>>>
>>>>> For instance, I recently had to convert latitude / longitude pairs to
>>>>> MGRS strings (
>>>>> https://en.wikipedia.org/wiki/Military_Grid_Reference_System).
>>>>> Writing a pandas UDF (and putting the mgrs python package into a conda
>>>>> environment) was _significantly_ easier than any alternative I found.
>>>>>
>>>>> @Rishi - depending on your network is constructed, some lag could come
>>>>> from just uploading the conda environment. If you load it from hdfs with
>>>>> --archives does it improve?
>>>>>
>>>>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> hi,
>>>>>>
>>>>>> Pandas UDF is a bit of hype. One of their blogs shows the used case
>>>>>> of adding 1 to a field using Pandas UDF which is pretty much pointless. 
>>>>>> So
>>>>>> you go beyond the blog and realise that your actual used case is more 
>>>>>> than
>>>>>> adding one :) and the reality hits you
>>>>>>
>>>>>> Pandas UDF in certain scenarios is actually slow, try using apply for
>>>>>> a custom or pandas function. In fact in certain scenarios I have found
>>>>>> general UDF's work much faster and use much less memory. Therefore test 
>>>>>> out
>>>>>> y

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Patrick McCarthy
Human time is considerably more expensive than computer time, so in that
regard, yes :)

This took me one minute to write and ran fast enough for my needs. If
you're willing to provide a comparable scala implementation I'd be happy to
compare them.

@F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)

def generate_mgrs_series(lat_lon_str, level):

import mgrs

m = mgrs.MGRS()

precision_level = 0

levelval = level[0]

if levelval == 1000:

   precision_level = 2

if levelval == 100:

   precision_level = 3

def convert(ll_str):

  lat, lon = ll_str.split('_')

  return m.toMGRS(lat, lon,

  MGRSPrecision = precision_level)

return lat_lon_str.apply(lambda x: convert(x))

On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta 
wrote:

> And you found the PANDAS UDF more performant ? Can you share your code and
> prove it?
>
> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy 
> wrote:
>
>> I disagree that it's hype. Perhaps not 1:1 with pure scala
>> performance-wise, but for python-based data scientists or others with a lot
>> of python expertise it allows one to do things that would otherwise be
>> infeasible at scale.
>>
>> For instance, I recently had to convert latitude / longitude pairs to
>> MGRS strings (
>> https://en.wikipedia.org/wiki/Military_Grid_Reference_System). Writing a
>> pandas UDF (and putting the mgrs python package into a conda environment)
>> was _significantly_ easier than any alternative I found.
>>
>> @Rishi - depending on your network is constructed, some lag could come
>> from just uploading the conda environment. If you load it from hdfs with
>> --archives does it improve?
>>
>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
>> wrote:
>>
>>> hi,
>>>
>>> Pandas UDF is a bit of hype. One of their blogs shows the used case of
>>> adding 1 to a field using Pandas UDF which is pretty much pointless. So you
>>> go beyond the blog and realise that your actual used case is more than
>>> adding one :) and the reality hits you
>>>
>>> Pandas UDF in certain scenarios is actually slow, try using apply for a
>>> custom or pandas function. In fact in certain scenarios I have found
>>> general UDF's work much faster and use much less memory. Therefore test out
>>> your used case (with at least 30 million records) before trying to use the
>>> Pandas UDF option.
>>>
>>> And when you start using GroupMap then you realise after reading
>>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
>>> that "Oh!! now I can run into random OOM errors and the maxrecords options
>>> does not help at all"
>>>
>>> Excerpt from the above link:
>>> Note that all data for a group will be loaded into memory before the
>>> function is applied. This can lead to out of memory exceptions, especially
>>> if the group sizes are skewed. The configuration for maxRecordsPerBatch
>>> <https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size>
>>>  is
>>> not applied on groups and it is up to the user to ensure that the grouped
>>> data will fit into the available memory.
>>>
>>> Let me know about your used case in case possible
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Sun, May 5, 2019 at 3:59 AM Rishi Shah 
>>> wrote:
>>>
>>>> Thanks Patrick! I tried to package it according to this instructions,
>>>> it got distributed on the cluster however the same spark program that takes
>>>> 5 mins without pandas UDF has started to take 25mins...
>>>>
>>>> Have you experienced anything like this? Also is Pyarrow 0.12 supported
>>>> with Spark 2.3 (according to documentation, it should be fine)?
>>>>
>>>> On Tue, Apr 30, 2019 at 9:35 AM Patrick McCarthy <
>>>> pmccar...@dstillery.com> wrote:
>>>>
>>>>> Hi Rishi,
>>>>>
>>>>> I've had success using the approach outlined here:
>>>>> https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
>>>>>
>>>>> Does this work for you?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
>>>>> wrote:
>>>>>
>>>>>> modified the subject & would like to clarify that I am looking to
>>>>>> create an anaconda parcel with pyarrow and other libraries, so t

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-05 Thread Patrick McCarthy
I disagree that it's hype. Perhaps not 1:1 with pure scala
performance-wise, but for python-based data scientists or others with a lot
of python expertise it allows one to do things that would otherwise be
infeasible at scale.

For instance, I recently had to convert latitude / longitude pairs to MGRS
strings (https://en.wikipedia.org/wiki/Military_Grid_Reference_System).
Writing a pandas UDF (and putting the mgrs python package into a conda
environment) was _significantly_ easier than any alternative I found.

@Rishi - depending on your network is constructed, some lag could come from
just uploading the conda environment. If you load it from hdfs with
--archives does it improve?

On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
wrote:

> hi,
>
> Pandas UDF is a bit of hype. One of their blogs shows the used case of
> adding 1 to a field using Pandas UDF which is pretty much pointless. So you
> go beyond the blog and realise that your actual used case is more than
> adding one :) and the reality hits you
>
> Pandas UDF in certain scenarios is actually slow, try using apply for a
> custom or pandas function. In fact in certain scenarios I have found
> general UDF's work much faster and use much less memory. Therefore test out
> your used case (with at least 30 million records) before trying to use the
> Pandas UDF option.
>
> And when you start using GroupMap then you realise after reading
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
> that "Oh!! now I can run into random OOM errors and the maxrecords options
> does not help at all"
>
> Excerpt from the above link:
> Note that all data for a group will be loaded into memory before the
> function is applied. This can lead to out of memory exceptions, especially
> if the group sizes are skewed. The configuration for maxRecordsPerBatch
> <https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size>
>  is
> not applied on groups and it is up to the user to ensure that the grouped
> data will fit into the available memory.
>
> Let me know about your used case in case possible
>
>
> Regards,
> Gourav
>
> On Sun, May 5, 2019 at 3:59 AM Rishi Shah 
> wrote:
>
>> Thanks Patrick! I tried to package it according to this instructions, it
>> got distributed on the cluster however the same spark program that takes 5
>> mins without pandas UDF has started to take 25mins...
>>
>> Have you experienced anything like this? Also is Pyarrow 0.12 supported
>> with Spark 2.3 (according to documentation, it should be fine)?
>>
>> On Tue, Apr 30, 2019 at 9:35 AM Patrick McCarthy 
>> wrote:
>>
>>> Hi Rishi,
>>>
>>> I've had success using the approach outlined here:
>>> https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html
>>>
>>> Does this work for you?
>>>
>>> On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
>>> wrote:
>>>
>>>> modified the subject & would like to clarify that I am looking to
>>>> create an anaconda parcel with pyarrow and other libraries, so that I can
>>>> distribute it on the cloudera cluster..
>>>>
>>>> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have been trying to figure out a way to build anaconda parcel with
>>>>> pyarrow included for my cloudera managed server for distribution but this
>>>>> doesn't seem to work right. Could someone please help?
>>>>>
>>>>> I have tried to install anaconda on one of the management nodes on
>>>>> cloudera cluster... tarred the directory, but this directory doesn't
>>>>> include all the packages to form a proper parcel for distribution.
>>>>>
>>>>> Any help is much appreciated!
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-04-30 Thread Patrick McCarthy
Hi Rishi,

I've had success using the approach outlined here:
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

Does this work for you?

On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
wrote:

> modified the subject & would like to clarify that I am looking to create
> an anaconda parcel with pyarrow and other libraries, so that I can
> distribute it on the cloudera cluster..
>
> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have been trying to figure out a way to build anaconda parcel with
>> pyarrow included for my cloudera managed server for distribution but this
>> doesn't seem to work right. Could someone please help?
>>
>> I have tried to install anaconda on one of the management nodes on
>> cloudera cluster... tarred the directory, but this directory doesn't
>> include all the packages to form a proper parcel for distribution.
>>
>> Any help is much appreciated!
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Spark ML with null labels

2019-01-10 Thread Patrick McCarthy
I actually tried that first. I moved away from it because the algorithm
needs to evaluate all records for all models, for instance, a model trained
on (2,4) needs to be evaluated on a record whose true label is 8. I found
that if I apply the filter in the label-creation transformer, then a record
whose label is not 2 or 4 will not be scored. I'd be curious to discover if
there's a way to make that approach work, however.

On Thu, Jan 10, 2019 at 12:20 PM Xiangrui Meng  wrote:

> In your custom transformer that produces labels, can you filter null
> labels? A transformer doesn't always need to do 1:1 mapping.
>
> On Thu, Jan 10, 2019, 7:53 AM Patrick McCarthy
> 
>> I'm trying to implement an algorithm on the MNIST digits that runs like
>> so:
>>
>>
>>- for every pair of digits (0,1), (0,2), (0,3)... assign a 0/1 label
>>to the digits and build a LogisticRegression Classifier -- 45 in total
>>- Fit every classifier on the test set separately
>>- Aggregate the results per record of the test set and compute a
>>prediction from the 45 predictions
>>
>> I tried implementing this with a Pipeline, composed of
>>
>>- stringIndexer
>>- a custom transformer which accepts a lower-digit and upper-digit
>>argument, producing the 0/1 label
>>- a custom transformer to assemble the indexed strings to VectorUDT
>>- LogisticRegression
>>
>> fed by a list of paramMaps. It failed because the fit() method of
>> logistic couldn't handle cases of null labels, i.e. a case where my 0/1
>> transformer found neither the lower nor the upper digit label. I fixed this
>> by extending the LogisticRegression class and overriding the fit() method
>> to include a filter for labels in (0,1) -- I didn't want to alter the
>> transform method.
>>
>> Now, I'd like to tune these models using CrossValidator with an estimator
>> of pipeline but when I run either fitMultiple on my paramMap or I loop over
>> the paramMaps, I get arcane Scala errors.
>>
>>
>> Is there a better way to build this procedure? Thanks!
>>
>


Spark ML with null labels

2019-01-10 Thread Patrick McCarthy
I'm trying to implement an algorithm on the MNIST digits that runs like so:


   - for every pair of digits (0,1), (0,2), (0,3)... assign a 0/1 label to
   the digits and build a LogisticRegression Classifier -- 45 in total
   - Fit every classifier on the test set separately
   - Aggregate the results per record of the test set and compute a
   prediction from the 45 predictions

I tried implementing this with a Pipeline, composed of

   - stringIndexer
   - a custom transformer which accepts a lower-digit and upper-digit
   argument, producing the 0/1 label
   - a custom transformer to assemble the indexed strings to VectorUDT
   - LogisticRegression

fed by a list of paramMaps. It failed because the fit() method of logistic
couldn't handle cases of null labels, i.e. a case where my 0/1 transformer
found neither the lower nor the upper digit label. I fixed this by
extending the LogisticRegression class and overriding the fit() method to
include a filter for labels in (0,1) -- I didn't want to alter the
transform method.

Now, I'd like to tune these models using CrossValidator with an estimator
of pipeline but when I run either fitMultiple on my paramMap or I loop over
the paramMaps, I get arcane Scala errors.


Is there a better way to build this procedure? Thanks!


Re: Need help with SparkSQL Query

2018-12-17 Thread Patrick McCarthy
Untested, but something like the below should work:

from pyspark.sql import functions as F
from pyspark.sql import window as W

(record
.withColumn('ts_rank',
F.dense_rank().over(W.Window.orderBy('timestamp').partitionBy("id"))
.filter(F.col('ts_rank')==1)
.drop('ts_rank')
)


On Mon, Dec 17, 2018 at 4:04 PM Nikhil Goyal  wrote:

> Hi guys,
>
> I have a dataframe of type Record (id: Long, timestamp: Long, isValid:
> Boolean,  other metrics)
>
> Schema looks like this:
> root
>  |-- id: long (nullable = true)
>  |-- timestamp: long (nullable = true)
>  |-- isValid: boolean (nullable = true)
> .
>
> I need to find the earliest valid record per id. In RDD world I can do
> groupBy 'id' and find the earliest one but I am not sure how I can do it in
> SQL. Since I am doing this in PySpark I cannot really use DataSet API for
> this.
>
> One thing I can do is groupBy 'id', find the earliest timestamp available
> and then join with the original dataframe to get the right record (all the
> metrics).
>
> Or I can create a single column with all the records and then implement a
> UDAF in scala and use it in pyspark.
>
> Both solutions don't seem to be straight forward. Is there a simpler
> solution to this?
>
> Thanks
> Nikhil
>


Re: Questions on Python support with Spark

2018-11-12 Thread Patrick McCarthy
I've never tried to run a stand-alone cluster alongside hadoop, but why not
run Spark as a yarn application? That way it can absolutely (in fact
preferably) use the distributed file system.

On Fri, Nov 9, 2018 at 5:04 PM, Arijit Tarafdar  wrote:

> Hello All,
>
>
>
> We have a requirement to run PySpark in standalone cluster mode and also
> reference python libraries (egg/wheel) which are not local but placed in a
> distributed storage like HDFS. From the code it looks like none of cases
> are supported.
>
>
>
> Questions are:
>
>
>
>1. Why is PySpark supported only in standalone client mode?
>2. Why –py-files only support local files and not files stored in
>remote stores?
>
>
>
> We will like to update the Spark code to support these scenarios but just
> want to be aware of any technical difficulties that the community has faced
> while trying to support those.
>
>
>
> Thanks, Arijit
>


Re: [Spark UI] Spark 2.3.1 UI no longer respects spark.ui.retainedJobs

2018-10-25 Thread Patrick Brown
Done:

https://issues.apache.org/jira/browse/SPARK-25837

On Thu, Oct 25, 2018 at 10:21 AM Marcelo Vanzin  wrote:

> Ah that makes more sense. Could you file a bug with that information
> so we don't lose track of this?
>
> Thanks
> On Wed, Oct 24, 2018 at 6:13 PM Patrick Brown
>  wrote:
> >
> > On my production application I am running ~200 jobs at once, but
> continue to submit jobs in this manner for sometimes ~1 hour.
> >
> > The reproduction code above generally only has 4 ish jobs running at
> once, and as you can see runs through 50k jobs in this manner.
> >
> > I guess I should clarify my above statement, the issue seems to appear
> when running multiple jobs at once as well as in sequence for a while and
> may as well have something to do with high master CPU usage (thus the
> collect in the code). My rough guess would be whatever is managing clearing
> out completed jobs gets overwhelmed (my master was a 4 core machine while
> running this, and htop reported almost full CPU usage across all 4 cores).
> >
> > The attached screenshot shows the state of the webui after running the
> repro code, you can see the ui is displaying some 43k completed jobs (takes
> a long time to load) after a few minutes of inactivity this will clear out,
> however as my production application continues to submit jobs every once in
> a while, the issue persists.
> >
> > On Wed, Oct 24, 2018 at 5:05 PM Marcelo Vanzin 
> wrote:
> >>
> >> When you say many jobs at once, what ballpark are you talking about?
> >>
> >> The code in 2.3+ does try to keep data about all running jobs and
> >> stages regardless of the limit. If you're running into issues because
> >> of that we may have to look again at whether that's the right thing to
> >> do.
> >> On Tue, Oct 23, 2018 at 10:02 AM Patrick Brown
> >>  wrote:
> >> >
> >> > I believe I may be able to reproduce this now, it seems like it may
> be something to do with many jobs at once:
> >> >
> >> > Spark 2.3.1
> >> >
> >> > > spark-shell --conf spark.ui.retainedJobs=1
> >> >
> >> > scala> import scala.concurrent._
> >> > scala> import scala.concurrent.ExecutionContext.Implicits.global
> >> > scala> for (i <- 0 until 5) { Future { println(sc.parallelize(0
> until i).collect.length) } }
> >> >
> >> > On Mon, Oct 22, 2018 at 11:25 AM Marcelo Vanzin 
> wrote:
> >> >>
> >> >> Just tried on 2.3.2 and worked fine for me. UI had a single job and a
> >> >> single stage (+ the tasks related to that single stage), same thing
> in
> >> >> memory (checked with jvisualvm).
> >> >>
> >> >> On Sat, Oct 20, 2018 at 6:45 PM Marcelo Vanzin 
> wrote:
> >> >> >
> >> >> > On Tue, Oct 16, 2018 at 9:34 AM Patrick Brown
> >> >> >  wrote:
> >> >> > > I recently upgraded to spark 2.3.1 I have had these same
> settings in my spark submit script, which worked on 2.0.2, and according to
> the documentation appear to not have changed:
> >> >> > >
> >> >> > > spark.ui.retainedTasks=1
> >> >> > > spark.ui.retainedStages=1
> >> >> > > spark.ui.retainedJobs=1
> >> >> >
> >> >> > I tried that locally on the current master and it seems to be
> working.
> >> >> > I don't have 2.3 easily in front of me right now, but will take a
> look
> >> >> > Monday.
> >> >> >
> >> >> > --
> >> >> > Marcelo
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Marcelo
> >>
> >>
> >>
> >> --
> >> Marcelo
>
>
>
> --
> Marcelo
>


Re: [Spark UI] Spark 2.3.1 UI no longer respects spark.ui.retainedJobs

2018-10-23 Thread Patrick Brown
I believe I may be able to reproduce this now, it seems like it may be
something to do with many jobs at once:

Spark 2.3.1

> spark-shell --conf spark.ui.retainedJobs=1

scala> import scala.concurrent._
scala> import scala.concurrent.ExecutionContext.Implicits.global
scala> for (i <- 0 until 5) { Future { println(sc.parallelize(0 until
i).collect.length) } }

On Mon, Oct 22, 2018 at 11:25 AM Marcelo Vanzin  wrote:

> Just tried on 2.3.2 and worked fine for me. UI had a single job and a
> single stage (+ the tasks related to that single stage), same thing in
> memory (checked with jvisualvm).
>
> On Sat, Oct 20, 2018 at 6:45 PM Marcelo Vanzin 
> wrote:
> >
> > On Tue, Oct 16, 2018 at 9:34 AM Patrick Brown
> >  wrote:
> > > I recently upgraded to spark 2.3.1 I have had these same settings in
> my spark submit script, which worked on 2.0.2, and according to the
> documentation appear to not have changed:
> > >
> > > spark.ui.retainedTasks=1
> > > spark.ui.retainedStages=1
> > > spark.ui.retainedJobs=1
> >
> > I tried that locally on the current master and it seems to be working.
> > I don't have 2.3 easily in front of me right now, but will take a look
> > Monday.
> >
> > --
> > Marcelo
>
>
>
> --
> Marcelo
>


[Spark UI] Spark 2.3.1 UI no longer respects spark.ui.retainedJobs

2018-10-16 Thread Patrick Brown
I recently upgraded to spark 2.3.1 I have had these same settings in my
spark submit script, which worked on 2.0.2, and according to the
documentation appear to not have changed:

spark.ui.retainedTasks=1
spark.ui.retainedStages=1
spark.ui.retainedJobs=1

However in 2.3.1 the UI doesn't seem to respect this, it still retains a
huge number of jobs:

[image: Screen Shot 2018-10-16 at 10.31.50 AM.png]


Is this a known issue? Any ideas?


Re: Spark Structured Streaming resource contention / memory issue

2018-10-15 Thread Patrick McGloin
Hi Jungtaek,

Thanks, we thought that might be the issue but haven't tested yet as
building against an unreleased version of Spark is tough for us, due to
network restrictions. We will try though. I will report back if we find
anything.

Best regards,
Patrick

On Fri, Oct 12, 2018, 2:57 PM Jungtaek Lim  wrote:

> Hi Patrick,
>
> Looks like you might be struggling with state memory, which multiple
> issues are going to be resolved in Spark 2.4.
>
> 1. SPARK-24441 [1]: Expose total estimated size of states in
> HDFSBackedStateStoreProvider
> 2. SPARK-24637 [2]: Add metrics regarding state and watermark to
> dropwizard metrics
> 3. SPARK-24717 [3]: Split out min retain version of state for memory in
> HDFSBackedStateStoreProvider
>
> There're other patches relevant to state store as well, but above issues
> are applied to map/flatmapGroupsWithState.
>
> Since Spark community is in progress on releasing Spark 2.4.0, could you
> try experimenting Spark 2.4.0 RC if you really don't mind? You could try
> out applying individual patches and see whether it helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://issues.apache.org/jira/browse/SPARK-24441
> 2. https://issues.apache.org/jira/browse/SPARK-24637
> 3. https://issues.apache.org/jira/browse/SPARK-24717
>
>
> 2018년 10월 12일 (금) 오후 9:31, Patrick McGloin 님이
> 작성:
>
>> Hi allI sent this earlier but the screenshots were not attached.
>> Hopefully this time it is correct.
>>
>> We have a Spark Structured streaming stream which is using
>> mapGroupWithState. After some time of processing in a stable manner
>> suddenly each mini batch starts taking 40 seconds. Suspiciously it looks
>> like exactly 40 seconds each time. Before this the batches were taking less
>> than a second.
>>
>>
>> Looking at the details for a particular task most partitions are
>> processed really quickly but a few take exactly 40 seconds:
>>
>>
>>
>>
>> The GC was looking ok as the data was being processed quickly but
>> suddenly the full GCs etc stop (at the same time as the 40 second issue):
>>
>>
>>
>> I have taken a thread dump from one of the executors as this issue is
>> happening but I cannot see any resource they are blocked on:
>>
>>
>>
>>
>> Are we hitting a GC problem and why is it manifesting in this way? Is
>> there another resource that is blocking and what is it?
>>
>>
>> Thanks,
>> Patrick
>>
>>
>>
>> This message has been sent by ABN AMRO Bank N.V., which has its seat at 
>> Gustav
>> Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
>> <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>,
>> and is registered in the Commercial Register of Amsterdam under number
>> 34334259.
>>
>


Spark Structured Streaming resource contention / memory issue

2018-10-12 Thread Patrick McGloin
Hi all,

We have a Spark Structured streaming stream which is using
mapGroupWithState. After some time of processing in a stable manner
suddenly each mini batch starts taking 40 seconds. Suspiciously it looks
like exactly 40 seconds each time. Before this the batches were taking less
than a second.


Looking at the details for a particular task most partitions are processed
really quickly but a few take exactly 40 seconds:




The GC was looking ok as the data was being processed quickly but suddenly
the full GCs etc stop (at the same time as the 40 second issue):



I have taken a thread dump from one of the executors as this issue is
happening but I cannot see any resource they are blocked on:




Are we hitting a GC problem and why is it manifesting in this way? Is there
another resource that is blocking and what is it?


Thanks,
Patrick


Re: Python Dependencies Issue on EMR

2018-09-14 Thread Patrick McCarthy
You didn't say how you're zipping the dependencies, but I'm guessing you
either include .egg files or zipped up a virtualenv. In either case, the
extra C stuff that scipy and pandas rely upon doesn't get included.

An approach like this solved the last problem I had that seemed like this -
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

On Thu, Sep 13, 2018 at 10:08 PM, Jonas Shomorony 
wrote:

> Hey everyone,
>
>
> I am currently trying to run a Python Spark job (using YARN client mode)
> that uses multiple libraries, on a Spark cluster on Amazon EMR. To do that,
> I create a dependencies.zip file that contains all of the
> dependencies/libraries (installed through pip) for the job to run
> successfully, such as pandas, scipy, tqdm, psycopg2, etc. The
> dependencies.zip file is contained within an outside directory (let’s call
> it “project”) that contains all the code to run my Spark job. I then zip up
> everything within project (including dependencies.zip) into project.zip.
> Then, I call spark-submit on the master node in my EMR cluster as follows:
>
>
> `spark-submit --packages … --py-files project.zip --jars ...
> run_command.py`
>
>
> Within “run_command.py” I add dependencies.zip as follows:
>
> `self.spark.sparkContext.addPyFile("dependencies.zip”)`
>
>
> The run_command.py then uses other files within project.zip to complete
> the spark job, and within those files, I import various libraries (found in
> dependencies.zip).
>
>
> I am running into a strange issue where all of the libraries are imported
> correctly (with no problems) with the exception of scipy and pandas.
>
>
> For scipy I get the following error:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/__init__.py", line 119, in
> 
>
>   File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/_lib/_ccallback.py", line
> 1, in 
>
> ImportError: cannot import name _ccallback_c`
>
>
> And for pandas I get this error message:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/pandas/pandas/__init__.py", line 35,
> in 
>
> ImportError: C extension: No module named tslib not built. If you want to
> import pandas from the source directory, you may need to run 'python
> setup.py build_ext --inplace --force' to build the C extensions first.`
>
>
> When I comment out the imports for these two libraries (and their use from
> within the code) everything works fine.
>
>
> Surprisingly, when I run the application locally (on master node) without
> passing in dependencies.zip, it picks and resolves the libraries from
> site-packages correctly and successfully runs to completion.
> dependencies.zip is created by zipping the contents of site-packages.
>
>
> Does anyone have any ideas as to what may be happening here? I would
> really appreciate it.
>
>
> pip version: 18.0
>
> spark version: 2.3.1
>
> python version: 2.7
>
>
> Thank you,
>
>
> Jonas
>
>


Re: How to make pyspark use custom python?

2018-09-06 Thread Patrick McCarthy
It looks like for whatever reason your cluster isn't using the python you
distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my
cluster at runtime:
https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon  wrote:

> Are you doubly sure if it is an issue in Spark? I used custom python
> several times with setting it in PYSPARK_PYTHON before and it was no
> problem.
>
> 2018년 9월 6일 (목) 오후 2:21, mithril 님이 작성:
>
>> For better looking , please see
>> https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python
>> > pyspark-use-custom-python>
>>
>> --
>>
>>
>> I am using zeppelin connect remote spark cluster.
>>
>> remote spark is using system python 2.7 .
>>
>> I want to switch to miniconda3, install a lib pyarrow.
>> What I do is :
>>
>> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
>> master and slaves.
>> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
>> `spark-env.sh` in spark master and slaves.
>> 3. restart spark and zeppelin
>> 4. Running code
>>
>> %spark.pyspark
>>
>> import pandas as pd
>> from pyspark.sql.functions import pandas_udf,PandasUDFType
>>
>>
>> @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
>> def process_order_items(pdf):
>>
>> pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>>
>> d = {'has_discount':'count',
>> 'clearance':'count',
>> 'count': ['count', 'sum'],
>> 'price_guide':'max',
>> 'total_price': 'sum'
>>
>> }
>>
>> pdf1 = pdf.groupby('day').agg(d)
>> pdf1.columns = pdf1.columns.map('_'.join)
>> d1 = {'has_discount_count':'discount_order_count',
>> 'clearance_count':'clearance_order_count',
>> 'count_count':'order_count',
>> 'count_sum':'sale_count',
>> 'price_guide_max':'price_guide',
>> 'total_price_sum': 'total_price'
>> }
>>
>> pdf2 = pdf1.rename(columns=d1)
>>
>> pdf2.loc[:, 'discount_sale_count'] =
>> pdf.loc[pdf.has_discount>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
>> 'count'].resample(freq).sum()
>> pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>>
>> pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>>
>> return pdf2
>>
>>
>> results = df.groupby("store_id",
>> "product_id").apply(process_order_items)
>>
>> results.select(['store_id', 'price']).show(5)
>>
>>
>> Got error :
>>
>> Py4JJavaError: An error occurred while calling o172.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 6.0 (TID 143, 10.104.33.18, executor 2):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 230, in main
>> process()
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 225, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 150, in 
>> func = lambda _, it: map(mapper, it)
>>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/
>> serializers.py",
>> line 276, in load_stream
>> import pyarrow as pa
>> ImportError: No module named pyarrow
>>
>>
>> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not
>> set
>> correctly .
>>
>> `pyspark`
>>
>> I login to master and slaves, run `pyspark interpreter` in each, and found
>> `import pyarrow` do not throw exception .
>>
>>
>> PS: `pyarrow` also installed in the machine which running zeppelin.
>>
>> --
>>
>> More info:
>>
>>
>> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
>> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
>> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B
>> ,C /
>> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
>> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
>> different with A, B ,C , but it is doesn't matter)
>>
>>
>>
>> So I completely coundn't understand why it doesn't work.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead. The ambition is to
say "divide the data into partitions, but make sure you don't move it in
doing so".



On Tue, Aug 28, 2018 at 2:06 PM, Patrick McCarthy 
wrote:

> I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
> this is actually happening, it's just wasteful overhead.
>
> On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal 
> wrote:
>
>> Hi Patrick,
>>
>> Sorry is there something here that helps you beyond repartition(number of
>> partitons) or calling your udf on foreachPartition? If your data is on
>> disk, Spark is already partitioning it for you by rows. How is adding the
>> host info helping?
>>
>> Thanks,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
>> pmccar...@dstillery.com.invalid> wrote:
>>
>>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>>> is required but shuffling is not.
>>>
>>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>>> need to repartition(5) to get the task size down to an acceptable size
>>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>>> that I could save a lot of overhead with
>>>
>>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>>> 'randkey','host').apply(udf)
>>>
>>>
>>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>>> wrote:
>>>
>>>> Well if we think of shuffling as a necessity to perform an operation,
>>>> then the problem would be that you are adding a ln aggregation stage to a
>>>> job that is going to get shuffled anyway.  Like if you need to join two
>>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>>> by hostname prior to that or not.  My question is, is there anything else
>>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>>> already bucketed? Like you could enforce that data is where it is supposed
>>>> to be, but what else would you avoid?
>>>>
>>>> Sent from my iPhone
>>>>
>>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>>> pmccar...@dstillery.com.INVALID> wrote:
>>>> >
>>>> > When debugging some behavior on my YARN cluster I wrote the following
>>>> PySpark UDF to figure out what host was operating on what row of data:
>>>> >
>>>> > @F.udf(T.StringType())
>>>> > def add_hostname(x):
>>>> >
>>>> > import socket
>>>> >
>>>> > return str(socket.gethostname())
>>>> >
>>>> > It occurred to me that I could use this to enforce node-locality for
>>>> other operations:
>>>> >
>>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>>> >
>>>> > When working on a big job without obvious partition keys, this seems
>>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>>> >
>>>> > What problems would I introduce by trying to partition on hostname
>>>> like this?
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal  wrote:

> Hi Patrick,
>
> Sorry is there something here that helps you beyond repartition(number of
> partitons) or calling your udf on foreachPartition? If your data is on
> disk, Spark is already partitioning it for you by rows. How is adding the
> host info helping?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
> pmccar...@dstillery.com.invalid> wrote:
>
>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>> is required but shuffling is not.
>>
>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>> need to repartition(5) to get the task size down to an acceptable size
>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>> that I could save a lot of overhead with
>>
>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>> 'randkey','host').apply(udf)
>>
>>
>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>> wrote:
>>
>>> Well if we think of shuffling as a necessity to perform an operation,
>>> then the problem would be that you are adding a ln aggregation stage to a
>>> job that is going to get shuffled anyway.  Like if you need to join two
>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>> by hostname prior to that or not.  My question is, is there anything else
>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>> already bucketed? Like you could enforce that data is where it is supposed
>>> to be, but what else would you avoid?
>>>
>>> Sent from my iPhone
>>>
>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>> pmccar...@dstillery.com.INVALID> wrote:
>>> >
>>> > When debugging some behavior on my YARN cluster I wrote the following
>>> PySpark UDF to figure out what host was operating on what row of data:
>>> >
>>> > @F.udf(T.StringType())
>>> > def add_hostname(x):
>>> >
>>> > import socket
>>> >
>>> > return str(socket.gethostname())
>>> >
>>> > It occurred to me that I could use this to enforce node-locality for
>>> other operations:
>>> >
>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>> >
>>> > When working on a big job without obvious partition keys, this seems
>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>> >
>>> > What problems would I introduce by trying to partition on hostname
>>> like this?
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy 
wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Pitfalls of partitioning by host?

2018-08-27 Thread Patrick McCarthy
When debugging some behavior on my YARN cluster I wrote the following
PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

import socket

return str(socket.gethostname())

It occurred to me that I could use this to enforce node-locality for other
operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a
very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like
this?


Re: How to merge multiple rows

2018-08-22 Thread Patrick McCarthy
You didn't specify which API, but in pyspark you could do

import pyspark.sql.functions as F

df.groupBy('ID').agg(F.sort_array(F.collect_set('DETAILS')).alias('DETAILS')).show()

+---++
| ID| DETAILS|
+---++
|  1|[A1, A2, A3]|
|  3|[B2]|
|  2|[B1]|
+---++


If you want to sort by PART I think you'll need a UDF.

On Wed, Aug 22, 2018 at 4:12 PM, Jean Georges Perrin  wrote:

> How do you do it now?
>
> You could use a withColumn(“newDetails”,  details_2...>)
>
> jg
>
>
> > On Aug 22, 2018, at 16:04, msbreuer  wrote:
> >
> > A dataframe with following contents is given:
> >
> > ID PART DETAILS
> > 11 A1
> > 12 A2
> > 13 A3
> > 21 B1
> > 31 C1
> >
> > Target format should be as following:
> >
> > ID DETAILS
> > 1 A1+A2+A3
> > 2 B1
> > 3 C1
> >
> > Note, the order of A1-3 is important.
> >
> > Currently I am using this alternative:
> >
> > ID DETAIL_1 DETAIL_2 DETAIL_3
> > 1 A1   A2   A3
> > 2 B1
> > 3 C1
> >
> > What would be the best method to do such transformation an a large
> dataset?
> >
> >
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Two different Hive instances running

2018-08-17 Thread Patrick Alwell
You probably need to take a look at your hive-site.xml and see what the 
location is for the Hive Metastore. As for beeline, you can explicitly use an 
instance of Hive server by passing in the JDBC url to the hiveServer when you 
launch the client; e.g. beeline –u “jdbc://example.com:5432”

Try taking a look at this 
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hive-metastore.html

There should be conf settings you can update to make sure you are using the 
same metastore as the instance of HiveServer.

Hive Wiki is a great resource as well ☺

From: Fabio Wada 
Date: Friday, August 17, 2018 at 11:22 AM
To: "user@spark.apache.org" 
Subject: Two different Hive instances running

Hi,

I am executing a insert into Hive table using SparkSession in Java. When I 
execute select via beeline, I don't see these inserted data. And when I insert 
data using beeline I don't see via my program using SparkSession.

It's looks like there are different Hive instances running.

How can I point to same Hive instance? Using SparkSession and beeline.

Thanks
[mage removed by sender.]ᐧ


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Patrick McGloin
You could use an object in Scala, of which only one instance will be
created on each JVM / Executor. E.g.

object MyDatabseSingleton {
var dbConn = ???
}

On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:

> Hi All,
>
> I understand creating a connection forEachPartition but I am wondering can
> I create one DB connection per executor and close it after the job is done?
> any sample code would help. you can imagine I am running a simple batch
> processing application.
>
> Thanks!
>


Re: Arrow type issue with Pandas UDF

2018-07-24 Thread Patrick McCarthy
Thanks Byran. I think it was ultimately groupings that were too large -
after setting spark.sql.shuffle.partitions to a much higher number I was
able to get the UDF to execute.

On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler  wrote:

> Hi Patrick,
>
> It looks like it's failing in Scala before it even gets to Python to
> execute your udf, which is why it doesn't seem to matter what's in your
> udf. Since you are doing a grouped map udf maybe your group sizes are too
> big or skewed? Could you try to reduce the size of your groups by adding
> more keys or sampling a fraction of the data? If the problem persists could
> you make a jira? At the very least a better exception would be nice.
>
> Bryan
>
> On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy 
> 
> wrote:
>
>> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>>
>> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
>> in the last stage of the job regardless of my output type.
>>
>>
>> The problem I'm trying to solve:
>> I have a column of scalar values, and each value on the same row has a
>> sorted vector. I'm trying to replace each scalar value with its closest
>> index from its vector. I'm applying the grouping arbitrarily and performing
>> a python operation row-wise because even when the same vector appears on
>> many rows it's not clear how I would get the lookup to scale.
>>
>> My input data, the product of a join of hive tables, has the following
>> schema:
>>
>> root
>>  |-- scalar_value: float (nullable = true)
>>  |-- quantilelist: array (nullable = true)
>>  ||-- element: double (containsNull = true)
>>
>>
>> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to
>> perform an operation on two columns, and because I want to take advantage
>> of Arrow to avoid serialization.
>>
>> The schema my UDF returns is this:
>>
>> pos_schema = T.StructType([
>> T.StructField('feature_value',T.FloatType(),True),
>> T.StructField('error',T.StringType())
>> ])
>>
>> ...however when I try to apply my UDF, either with saveAsTable or show(),
>> I get the following exception:
>>
>> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
>> expand the buffer
>> at org.apache.arrow.vector.BaseFixedWidthVector.
>> reallocBufferHelper(BaseFixedWidthVector.java:447)
>> at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(
>> BaseFixedWidthVector.java:426)
>> at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(
>> BaseFixedWidthVector.java:838)
>> at org.apache.arrow.vector.Float8Vector.setSafe(
>> Float8Vector.java:221)
>> at org.apache.spark.sql.execution.arrow.DoubleWriter.
>> setValue(ArrowWriter.scala:223)
>> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>> at org.apache.spark.sql.execution.arrow.ArrayWriter.
>> setValue(ArrowWriter.scala:308)
>> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>> at org.apache.spark.sql.execution.arrow.ArrowWriter.
>> write(ArrowWriter.scala:87)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(
>> ArrowPythonRunner.scala:84)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
>> scala:1380)
>> at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2.writeIteratorToStream(
>> ArrowPythonRunner.scala:95)
>> at org.apache.spark.api.python.BasePythonRunner$WriterThread$
>> $anonfun$run$1.apply(PythonRunner.scala:215)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
>> scala:1991)
>> at org.apache.spark.api.python.BasePythonRunner$WriterThread.
>> run(PythonRunner.scala:170)
>>
>> I assumed it was the result of some bad typing on my part, until I did a
>> test with a degenerate UDF that only returns a column of 1:
>>
>> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>>
>> F.PandasUDFType.GROUPED_MAP)
>>
>> def groupedPercentileInt(df):
>>
>>

Arrow type issue with Pandas UDF

2018-07-19 Thread Patrick McCarthy
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in
the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a
sorted vector. I'm trying to replace each scalar value with its closest
index from its vector. I'm applying the grouping arbitrarily and performing
a python operation row-wise because even when the same vector appears on
many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following
schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 ||-- element: double (containsNull = true)


My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
an operation on two columns, and because I want to take advantage of Arrow
to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
T.StructField('feature_value',T.FloatType(),True),
T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I
get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

I assumed it was the result of some bad typing on my part, until I did a
test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),

F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

return
pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)


This clearly only has one return value of type int, yet I get the same
exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at

Spark accessing fakes3

2018-07-11 Thread Patrick Roemer
Hi,

does anybody if (and how) it's possible to get a (dev-local) Spark
installation to talk to fakes3 for s3[n|a]:// URLs?

I have managed to connect to AWS S3 from my local installation by adding
hadoop-aws and aws-java-sdk to jars, using s3:// URLs as arguments for
SparkContext#textFile(), but I'm at loss how to get it to work with a
local fakes3.

The only reference I've found so far is this issue, where somebody seems
to have gotten close, but unfortunately he's forgotten about the details:

https://github.com/jubos/fake-s3/issues/108

Thanks and best regards,
Patrick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataTypes of an ArrayType

2018-07-11 Thread Patrick McCarthy
Arrays need to be a single type, I think you're looking for a Struct
column. See:
https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803

On Wed, Jul 11, 2018 at 6:37 AM, dimitris plakas 
wrote:

> Hello everyone,
>
> I am new to Pyspark and i would like to ask if there is any way to have a
> Dataframe column which is ArrayType and have a different DataType for each
> elemnt of the ArrayType. For example
> to have something like :
>
> StructType([StructField("Column_Name", ArrayType(ArrayType(FloatType(),
> FloatType(), DecimalType(), False),False), False)]).
>
> I want to have an ArrayType column with 2 elements as FloatType and 1
> element as DecimalType
>
> Thank you in advance
>


Re: Building SparkML vectors from long data

2018-07-03 Thread Patrick McCarthy
I'm still validating my results, but my solution for the moment looks like
the below. I'm presently dealing with one-hot encoded values, so all the
numbers in my array are 1:

def udfMaker(feature_len):

return F.udf(lambda x: SparseVector(feature_len, sorted(x),
[1.0]*len(x)), VectorUDT())

indexer =
StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df)

makeVec = udfMaker(len(indexer.labels))

indexed_data = indexer.transform(source_df)

sparse_content = (indexed_data.groupBy('ID').
.agg(F.collect_set('indexedContent').alias('contentIdx'))
.withColumn('content', makeVec(F.col('contentIdx')))
.drop('contentIdx')
)

On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> I don't know if this is the best way or not, but:
>
> val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
> val indexModel = indexer.fit(data)
> val indexedData = indexModel.transform(data)
> val variables = indexModel.labels.length
>
> val toSeq = udf((a: Double, b: Double) => Seq(a, b))
> val toVector = udf((seq: Seq[Seq[Double]]) => {
>   new SparseVector(variables, seq.map(_(0).toInt).toArray, 
> seq.map(_(1)).toArray)
> })
> val result = indexedData
>   .withColumn("val", toSeq(col("vrIdx"), col("value")))
>   .groupBy("ID")
>   .agg(collect_set(col("val")).name("collected_val"))
>   .withColumn("collected_val", 
> toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))
>
>
> at least works.  The indices still aren't in order in the vector - I don't
> know if this matters much, but if it does, it's easy enough to sort them in
> toVector (and to remove duplicates)
>
>
> On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy  > wrote:
>
>> I work with a lot of data in a long format, cases in which an ID column
>> is repeated, followed by a variable and a value column like so:
>>
>> +---+-+---+
>> |ID | var | value |
>> +---+-+---+
>> | A | v1  | 1.0   |
>> | A | v2  | 2.0   |
>> | B | v1  | 1.5   |
>> | B | v3  | -1.0  |
>> +---+-+---+
>>
>> It seems to me that Spark doesn't provide any clear native way to
>> transform data of this format into a Vector() or VectorUDT() type suitable
>> for machine learning algorithms.
>>
>> The best solution I've found so far (which isn't very good) is to group
>> by ID, perform a collect_list, and then use a UDF to translate the
>> resulting array into a vector datatype.
>>
>> I can get kind of close like so:
>>
>> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')
>>
>> (indexed_df
>> .withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
>> F.lit(':'),F.col('value')))
>> .groupBy('ID')
>> .agg(F.collect_set('val'))
>> )
>>
>> But the resultant 'val' vector is out of index order, and still would
>> need to be parsed.
>>
>> What's the current preferred way to solve a problem like this?
>>
>
>


Re: How to handle java.sql.Date inside Maps with to_json / from_json

2018-06-28 Thread Patrick McGloin
Hi all,

I tested this with a Date outside a map and it works fine so I think the
issue is simply for Dates inside Maps. I will create a Jira for this unless
there are objections.

Best regards,
Patrick

On Thu, 28 Jun 2018, 11:53 Patrick McGloin, 
wrote:

> Consider the following test, which will fail on the final show:
>
> * case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int])
>
> test(*"Test a Date as key in a Map"*) {
>  *val *map =  *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*(
> *"2018-06-28"*) -> 1))
>  *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd HH:mm:ss.SSS"*,
> *"dateFormat" *-> *"/MM/dd"*)
>  *val *schema = Encoders.*product*
> [UnitTestCaseClassWithDateInsideMap].schema
>
>  *val *mapDF = *Seq*(map).toDF()
>  *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head,
> mapDF.columns.tail:_*), options))
>  jsonDF.show()
>
>  *val *jsonString = jsonDF.map(_.getString(0)).collect().head
>
>  *val *stringDF = *Seq*(jsonString).toDF(*"json"*)
>  *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options))
>  parsedDF.show()
> }
>
>
> The result of the line "jsonDF.show()" is as follows:
>
> +---+
> |structstojson(named_struct(NamePlaceholder(), map))|
> +---+
> |{"map":{"17710":1}}|
> +---+
>
> As can be seen the date is not formatted correctly.  The error with
>  "parsedDF.show()" is:
>
> java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String
> cannot be cast to java.lang.Integer
>
> I have tried adding the options to to_json / from_json but it hasn't
> helped.  Am I using the wrong options?
>
> Is there another way to do this?
>
> Best regards,
> Patrick
> This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav
> Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
> <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>,
> and is registered in the Commercial Register of Amsterdam under number
> 34334259.
>


How to handle java.sql.Date inside Maps with to_json / from_json

2018-06-28 Thread Patrick McGloin
Consider the following test, which will fail on the final show:

* case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int])

test(*"Test a Date as key in a Map"*) {
 *val *map =  *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*(
*"2018-06-28"*) -> 1))
 *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd
HH:mm:ss.SSS"*, *"dateFormat"
*-> *"/MM/dd"*)
 *val *schema = Encoders.*product*
[UnitTestCaseClassWithDateInsideMap].schema

 *val *mapDF = *Seq*(map).toDF()
 *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head,
mapDF.columns.tail:_*), options))
 jsonDF.show()

 *val *jsonString = jsonDF.map(_.getString(0)).collect().head

 *val *stringDF = *Seq*(jsonString).toDF(*"json"*)
 *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options))
 parsedDF.show()
}


The result of the line "jsonDF.show()" is as follows:

+---+
|structstojson(named_struct(NamePlaceholder(), map))|
+---+
|{"map":{"17710":1}}|
+---+

As can be seen the date is not formatted correctly.  The error with
 "parsedDF.show()" is:

java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String
cannot be cast to java.lang.Integer

I have tried adding the options to to_json / from_json but it hasn't
helped.  Am I using the wrong options?

Is there another way to do this?

Best regards,
Patrick
This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav
Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
<https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>,
and is registered in the Commercial Register of Amsterdam under number
34334259.


Building SparkML vectors from long data

2018-06-12 Thread Patrick McCarthy
I work with a lot of data in a long format, cases in which an ID column is
repeated, followed by a variable and a value column like so:

+---+-+---+
|ID | var | value |
+---+-+---+
| A | v1  | 1.0   |
| A | v2  | 2.0   |
| B | v1  | 1.5   |
| B | v3  | -1.0  |
+---+-+---+

It seems to me that Spark doesn't provide any clear native way to transform
data of this format into a Vector() or VectorUDT() type suitable for
machine learning algorithms.

The best solution I've found so far (which isn't very good) is to group by
ID, perform a collect_list, and then use a UDF to translate the resulting
array into a vector datatype.

I can get kind of close like so:

indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')

(indexed_df
.withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
F.lit(':'),F.col('value')))
.groupBy('ID')
.agg(F.collect_set('val'))
)

But the resultant 'val' vector is out of index order, and still would need
to be parsed.

What's the current preferred way to solve a problem like this?


Poor performance reading Hive table made of sequence files

2018-05-01 Thread Patrick McCarthy
I recently ran a query with the following form:

select a.*, b.*
from some_small_table a
inner join
(
  select things from someother table
  lateral view explode(s) ss as sss
  where a_key is in (x,y,z)
) b
on a.key = b.key
where someothercriterion

On hive, this query took about five minutes. In Spark, using either the
same syntax in a spark.sql call or using the dataframe API, it appeared as
if it was going to take on the order of 10 hours. I didn't let it finish.

The data underlying the hive table are sequence files, ~30mb each, ~1000 to
a partition, and my query ran over only five partitions. A single partition
is about 25gb.

How can Spark perform so badly? Do I need to handle sequence files in a
special way?


Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Patrick Alwell
I don’t think sql context is “deprecated” in this sense. It’s still accessible 
by earlier versions of Spark.

But yes, at first glance it looks like you are correct. I don’t see a 
recordWriter method for parquet outside of the SQL package.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

Here is an example that uses Sql context.  I believe the SQL context  is 
necessary for strongly typed, self describing, binary, columnar formatted files 
like Parquet.
https://community.hortonworks.com/articles/72941/writing-parquet-on-hdfs-using-spark-streaming.html

Otherwise you’ll probably be looking at a customWriter.
https://parquet.apache.org/documentation/latest/

AFAIK,

If you were to implement a custom writer, you still wouldn’t escape the parquet 
formatting paradigm the DF API solves. Spark needs a way to map data types for 
Parquet conversion.

Hope this helps,

-Pat


On 2/28/18, 11:09 AM, "karthikus"  wrote:

Hi all,

I have a Kafka stream data and I need to save the data in parquet format
without using Structured Streaming (due to the lack of Kafka Message header
support). 

val kafkaStream =
  KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](
  topics,
  kafkaParams
)
  )
// process the messages
val messages = kafkaStream.map(record => (record.key, record.value))
val lines = messages.map(_._2)

Now, how do I save it as parquet ? All the examples that I have come across
uses SQLContext which is deprecated. ! Any help appreciated ! 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Patrick Alwell
+1

AFAIK,

vCores are not the same as Cores in AWS. 
https://samrueby.com/2015/01/12/what-are-amazon-aws-vcpus/

I’ve always understood it as cores = num concurrent threads

These posts might help you with your research and why exceeding 5 cores per 
executor doesn’t make sense.

https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/

AWS/ EMR was always a challenge for me. Never understood why it didn’t seem to 
be using all my resources; as you noted.

I would see this as –num-executors = 15 –executor-cores= 5 –executor-memory = 
10gb and then test my application from there.

I only got better performance out of a different class of nodes, e.g. R-series 
instance types. Costs more than the M class; but wound up using less of them 
and my jobs ran faster. I was in the 10+TB jobs territory with TPC data.  ☺ The 
links I provided have a few use cases and trials.

Hope that helps,

-Pat


From: Selvam Raman 
Date: Monday, February 26, 2018 at 1:52 PM
To: Vadim Semenov 
Cc: user 
Subject: Re: Spark EMR executor-core vs Vcores

Thanks. That’s make sense.

I want to know one more think , available vcore per machine is 16 but threads 
per node 8. Am I missing to relate here.

What I m thinking now is number of vote = number of threads.



On Mon, 26 Feb 2018 at 18:45, Vadim Semenov 
> wrote:
All used cores aren't getting reported correctly in EMR, and YARN itself has no 
control over it, so whatever you put in `spark.executor.cores` will be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman 
> wrote:
Hi,

spark version - 2.0.0
spark distribution - EMR 5.0.0

Spark Cluster - one master, 5 slaves
Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage



Cluster Metrics
Apps Submitted

Apps Pending

Apps Running

Apps Completed

Containers Running

Memory Used

Memory Total

Memory Reserved

VCores Used

VCores Total

VCores Reserved

Active Nodes

Decommissioning Nodes

Decommissioned Nodes

Lost Nodes

Unhealthy Nodes

Rebooted Nodes

16

0

1

15

5

88.88 GB

90.50 GB

22 GB

5

79

1

5

0

0

5

0

0


I have submitted job with below configuration
--num-executors 5 --executor-cores 10 --executor-memory 20g







spark.task.cpus - be default 1


My understanding is there will be 5 executore each can run 10 task at a time 
and task can share total memory of 20g. Here, i could see only 5 vcores used 
which means 1 executor instance use 20g+10%overhead ram(22gb), 10 core(number 
of threads), 1 Vcore(cpu).

please correct me if my understand is wrong.


how can i utilize number of vcore in EMR effectively. Will Vcore boost 
performance?



--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Out of memory Error when using Collection Accumulator Spark 2.2

2018-02-26 Thread Patrick
Hi,

We were getting OOM error when we are accumulating the results of each
worker. We were trying to avoid collecting data to driver node instead used
accumulator as per below code snippet,

Is there any spark config to set the accumulator settings Or am i doing the
wrong way to collect the huge data set?

  CollectionAccumulator accumulate;
  Dataset bin;

bin.foreach((ForeachFunction) row -> {
  accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
});

accumulate.value().forEach(element -> {
  String[] arr = element.split("\\|");
  String count = arr[2];
  double percentage =
  (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
  PayloadBin payload = new PayloadBin(arr[0],
  arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
  binArray.add(payload);

});


18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)

18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034 on
rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)

18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
92) (14/200)

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
Java heap space

at java.util.Arrays.copyOf(Arrays.java:3181)

at java.util.ArrayList.toArray(ArrayList.java:376)

at
java.util.Collections$SynchronizedCollection.toArray(Collections.java:2024)

at java.util.ArrayList.(ArrayList.java:177)

at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:470)


Reservoir sampling in parallel

2018-02-23 Thread Patrick McCarthy
I have a large dataset composed of scores for several thousand segments,
and the timestamps at which time those scores occurred. I'd like to apply
some techniques like reservoir sampling[1], where for every segment I
process records in order of their timestamps, generate a sample, and then
at intervals compute the quantiles in the sample. Ideally I'd like to write
a pyspark udf to do the sampling/quantizing procedure.

It seems like something I should be doing via rdd.map, but it's not really
clear how I can enforce a function to process records in order within a
partition. Any pointers?

Thanks,
Patrick

[1] https://en.wikipedia.org/wiki/Reservoir_sampling


Re: Spark Dataframe and HIVE

2018-02-09 Thread Patrick Alwell
Might sound silly, but are you using a Hive context?
What errors do the Hive query results return?

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

The second part of your questions, you are creating a temp table and then 
subsequently creating another table from that temp view. Doesn’t seem like you 
are reading the table from the spark or hive warehouse.

This works fine for me; albeit I was using spark thrift to communicate with my 
directory of choice.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, types
from pyspark.sql.types import *
from pyspark.sql import functions as f
from decimal import *
from datetime import datetime

# instantiate our sparkSession and context
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# Generating customer orc table files
# load raw data as an RDD
customer_data = sc.textFile("/data/tpch/customer.tbl")
# map the data into an RDD split with pipe delimitations
customer_split = customer_data.map(lambda l: l.split("|"))
# map the split data with a row method; this is where we specificy column names 
and types
# default type is string- UTF8
# there are issues with converting string to date and these issues have been 
addressed
# in those tables with dates: See notes below
customer_row = customer_split.map( lambda r: Row(
custkey=long(r[0]),
name=r[1],
address=r[2],
nationkey=long(r[3]),
phone=r[4],
acctbal=Decimal(r[5]),
mktsegment=r[6],
comment=r[7]
))

# we can have Spark infer the schema, or apply a strict schema and identify 
whether or not we want null values
# in this case we don't want null values for keys; and we want explicit data 
types to support the TPCH tables/ data model
customer_schema = types.StructType([
   types.StructField('custkey',types.LongType(),False)
   ,types.StructField('name',types.StringType())
   ,types.StructField('address',types.StringType())
   ,types.StructField('nationkey',types.LongType(),False)
   ,types.StructField('phone',types.StringType())
   ,types.StructField('acctbal',types.DecimalType())
   ,types.StructField('mktsegment',types.StringType())
   ,types.StructField('comment',types.StringType())])

# we create a dataframe object by referencing our sparkSession class and the 
createDataFrame method
# this method takes two arguments by default (row, schema)
customer_df = spark.createDataFrame(customer_row,customer_schema)

# we can now write a file of type orc by referencing our dataframe object we 
created
customer_df.write.orc("/data/tpch/customer.orc")

# read that same file we created but create a seperate dataframe object
customer_df_orc = spark.read.orc("/data/tpch/customer.orc")

# reference the newly created dataframe object and create a tempView for QA 
purposes
customer_df_orc.createOrReplaceTempView("customer")

# reference the sparkSession class and SQL method in order to issue SQL 
statements to the materialized view
spark.sql("SELECT * FROM customer LIMIT 10").show()

From: "☼ R Nair (रविशंकर नायर)" 
Date: Friday, February 9, 2018 at 7:03 AM
To: "user @spark/'user @spark'/spark users/user@spark" 
Subject: Re: Spark Dataframe and HIVE

An update: (Sorry I missed)

When I do

passion_df.createOrReplaceTempView("sampleview")

spark.sql("create table sample table as select * from sample view")

Now, I can see table and can query as well.

So why this do work from Spark and other method discussed below is not?

Thanks



On Fri, Feb 9, 2018 at 9:49 AM, ☼ R Nair (रविशंकर नायर) 
> wrote:
All,

It has been three days continuously I am on this issue. Not getting any clue.

Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is in 
spark's conf.

1) Step 1: I created a data frame DF1 reading a csv file.

2) Did  manipulations on DF1. Resulting frame is passion_df.

3) passion_df.write.format("orc").saveAsTable("sampledb.passion")

4) The metastore shows the hive table., when I do "show tables" in HIVE, I can 
see table name

5) I can't select in HIVE, though I can select from SPARK as spark.sql("select 
* from sampledb.passion")

Whats going on here? Please help. Why I am not seeing data from HIVE prompt?
The "describe formatted " command on the table in HIVE shows he data is is in 
default warehouse location ( /user/hive/warehouse) since I set it.

I am not getting any definite answer anywhere. Many suggestions and answers 
given in Stackoverflow et al.Nothing really works.

So asking experts here for some light on this, thanks

Best,
Ravion





--
[mage removed by sender.]


Re: Type Casting Error in Spark Data Frame

2018-01-29 Thread Patrick McCarthy
You can't select from an array like that, try instead using 'lateral view
explode' in the query for that element, or before the sql stage
(py)spark.sql.functions.explode.

On Mon, Jan 29, 2018 at 4:26 PM, Arnav kumar  wrote:

> Hello Experts,
>
> I would need your advice in resolving the below issue when I am trying to
> retrieving the data from a dataframe.
>
> Can you please let me know where I am going wrong.
>
> code :
>
>
> // create the dataframe by parsing the json
> // Message Helper describes the JSON Struct
> //data out is the json string received from Streaming Engine.
>
> val dataDF = sparkSession.createDataFrame(dataOut,
> MessageHelper.sqlMapping)
> dataDF.printSchema()
> /* -- out put of dataDF.printSchema
>
> root
>  |-- messageID: string (nullable = true)
>  |-- messageType: string (nullable = true)
>  |-- meta: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- messageParsedTimestamp: string (nullable = true)
>  |||-- ipaddress: string (nullable = true)
>  |-- messageData: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- packetID: string (nullable = true)
>  |||-- messageID: string (nullable = true)
>  |||-- unixTime: string (nullable = true)
>
>
>
> */
>
>
> dataDF.createOrReplaceTempView("message")
> val routeEventDF=sparkSession.sql("select messageId 
> ,messageData.unixTime,messageData.packetID,
> messageData.messageID from message")
> routeEventDF.show
>
>
> Error  on routeEventDF.show
> Caused by: java.lang.RuntimeException: 
> org.apache.spark.sql.catalyst.expressions.GenericRow
> is not a valid external type for schema of array messageParsedTimestamp:string,ipaddress:string,port:string,
> message:string
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.evalIfFalseExpr14$(Unknown Source)
>
>
> Appreciate your help
>
> Best Regards
> Arnav Kumar.
>
>
>


Re: I can't save DataFrame from running Spark locally

2018-01-23 Thread Patrick Alwell
Spark cannot read locally from S3 without an S3a protocol; you’ll more than 
likely need a local copy of the data or you’ll need to utilize the proper jars 
to enable S3 communication from the edge to the datacenter.

https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark

Here are the jars: 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

Looks like you already have them, in which case you’ll have to make small 
configuration changes, e.g. s3 --> s3a

Keep in mind: The Amazon JARs have proven very brittle: the version of the 
Amazon libraries must match the versions against which the Hadoop binaries were 
built.

https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client




From: Toy 
Date: Tuesday, January 23, 2018 at 11:33 AM
To: "user@spark.apache.org" 
Subject: I can't save DataFrame from running Spark locally

Hi,

First of all, my Spark application runs fine in AWS EMR. However, I'm trying to 
run it locally to debug some issue. My application is just to parse log files 
and convert to DataFrame then convert to ORC and save to S3. However, when I 
run locally I get this error

java.io.IOException: /orc/dt=2018-01-23 doesn't exist
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at 
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)

Here's what I have in sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.1.0"
val hadoopVersion = "2.7.3"
val awsVersion = "1.11.155"

lazy val sparkAndDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,

  "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-common" % hadoopVersion
)

And this is where the code failed

val sparrowWriter = 
sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
sparrowWriter.save(sparrowOutputPath)

sparrowOutputPath is something like s3://bucket/folder and it exists I checked 
it with aws command line

I put a breakpoint there and the full path looks like this 
s3://bucket/orc/dt=2018-01-23 which exists.

I have also set up the credentials like this

sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")

My confusion is this code runs fine in the cluster but I get this error running 
locally.




Re: Spark vs Snowflake

2018-01-22 Thread Patrick McCarthy
Last I heard of them a year or two ago, they basically repackage AWS
services behind their own API/service layer for convenience. There's
probably a value-add if you're not familiar with optimizing AWS, but if you
already have that expertise I don't expect they would add much extra
performance if any.

On Mon, Jan 22, 2018 at 4:51 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> Has anyone had experience of using Snowflake
>  which touts itself as data warehouse
> built for the cloud? In reviews
> one
> recommendation states
>
> "DEFINITELY AN ALTERNATIVE TO AMAZON RED SHIFT AND SPARK"
>
> Now I am not sure about inner workings of this product but I will be
> surprised if the product is actually faster than using Spark on HDFS.
>
> There is very little literature on this product except that it shouts "me
> too" among Amazon Redshift and Google BigQuery.
>
> anyone has used this product?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: StreamingLogisticRegressionWithSGD : Multiclass Classification : Options

2018-01-19 Thread Patrick McCarthy
Rather than use a fancy purpose-built class, I was thinking that you could
rather generate a series of label vectors, vector A is 1 when class a is
positive and 0 when any other is, vector B is 1 when class b is positive
and 0 when any other is, etc.

I don't know anything about streaming in particular so I don't know if this
introduces any lag or concurrency problems, but you could perform the
logistic regression on each of these label vectors independently using the
classifier algorithm of your choice and then, concatenating the predictions
into a dataframe, take a rowmax to do your multiclass evaluation.

On Fri, Jan 19, 2018 at 11:29 AM, Sundeep Kumar Mehta <sunnyjai...@gmail.com
> wrote:

> Thanks a lot Patrick, I do see a class OneVsRest classifier which only
> takes classifier instance of ml package and not mlib package, do you see
> any alternative for using OneVsRest with StreamingLogisticRegressionWithSGD
> ?
>
> Regards
> Sundeep
>
> On Thu, Jan 18, 2018 at 8:18 PM, Patrick McCarthy <pmccar...@dstillery.com
> > wrote:
>
>> As a hack, you could perform a number of 1 vs. all classifiers and then
>> post-hoc select among the highest prediction probability to assign class.
>>
>> On Thu, Jan 18, 2018 at 12:17 AM, Sundeep Kumar Mehta <
>> sunnyjai...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I was looking for Logistic Regression with Multi Class classifier on
>>> Streaming data do we have any alternative options or library/github prj.
>>>
>>> As StreamingLogisticRegressionWithSGD only supports binary
>>> classification
>>>
>>> Regards
>>> Sundeep
>>>
>>
>>
>


Re: StreamingLogisticRegressionWithSGD : Multiclass Classification : Options

2018-01-18 Thread Patrick McCarthy
As a hack, you could perform a number of 1 vs. all classifiers and then
post-hoc select among the highest prediction probability to assign class.

On Thu, Jan 18, 2018 at 12:17 AM, Sundeep Kumar Mehta  wrote:

> Hi,
>
> I was looking for Logistic Regression with Multi Class classifier on
> Streaming data do we have any alternative options or library/github prj.
>
> As StreamingLogisticRegressionWithSGD only supports binary classification
>
> Regards
> Sundeep
>


Re: Spark on EMR suddenly stalling

2017-12-28 Thread Patrick Alwell
Joren,

Anytime there is a shuffle in the network, Spark moves to a new stage. It seems 
like you are having issues either pre or post shuffle. Have you looked at a 
resource management tool like ganglia to determine if this is a memory or 
thread related issue? The spark UI?

You are using groupByKey() have you thought of an alternative like 
aggregateByKey() or combineByKey() to reduce shuffling?
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re/avoid-groupbykey-when-performing-a-group-of-multiple-items-by-key.html

Dynamic allocation is great; but sometimes I’ve found explicitly setting the 
num executors, cores per executor, and memory per executor to be a better 
alternative.

Take a look at the yarn logs as well for the particular executor in question. 
Executors can have multiple tasks; and will often fail if they have more tasks 
than available threads.

As for partitioning the data; you could also look into your level of 
parallelism which is correlated to the splitablity (blocks) of data. This will 
be based on your largest RDD.
https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism

Spark is like C/C++ you need to manage the memory buffer or the compiler will 
through you out  ;)
https://spark.apache.org/docs/latest/hardware-provisioning.html

Hang in there, this is the more complicated stage of placing a spark 
application into production. The Yarn logs should point you in the right 
direction.

It’s tough to debug over email, so hopefully this information is helpful.

-Pat


On 12/28/17, 9:57 AM, "Jeroen Miller"  wrote:

On 28 Dec 2017, at 17:41, Richard Qiao  wrote:
> Are you able to specify which path of data filled up?

I can narrow it down to a bunch of files but it's not so straightforward.

> Any logs not rolled over?

I have to manually terminate the cluster but there is nothing more in the 
driver's log when I check it from the AWS console when the cluster is still 
running. 

JM


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark based Data Warehouse

2017-11-12 Thread Patrick Alwell
Alcon,

You can most certainly do this. I’ve done benchmarking with Spark SQL and the 
TPCDS queries using S3 as the filesystem.

Zeppelin and Livy server work well for the dash boarding and concurrent query 
issues:  https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/

Livy Server will allow you to create multiple spark contexts via REST: 
https://livy.incubator.apache.org/

If you are looking for broad SQL functionality I’d recommend instantiating a 
Hive context. And Spark is able to spill to disk --> 
https://spark.apache.org/faq.html

There are multiple companies running spark within their data warehouse 
solutions: 
https://ibmdatawarehousing.wordpress.com/2016/10/12/steinbach_dashdb_local_spark/

Edmunds used Spark to allow business analysts to point Spark to files in S3 and 
infer schema: https://www.youtube.com/watch?v=gsR1ljgZLq0

Recommend running some benchmarks and testing query scenarios for your end 
users; but it sounds like you’ll be using it for exploratory analysis. Spark is 
great for this ☺

-Pat


From: Vadim Semenov 
Date: Sunday, November 12, 2017 at 1:06 PM
To: Gourav Sengupta 
Cc: Phillip Henry , ashish rawat 
, Jörn Franke , Deepak Sharma 
, spark users 
Subject: Re: Spark based Data Warehouse

It's actually quite simple to answer

> 1. Is Spark SQL and UDF, able to handle all the workloads?
Yes

> 2. What user interface did you provide for data scientist, data engineers and 
> analysts
Home-grown platform, EMR, Zeppelin

> What are the challenges in running concurrent queries, by many users, over 
> Spark SQL? Considering Spark still does not provide spill to disk, in many 
> scenarios, are there frequent query failures when executing concurrent queries
You can run separate Spark Contexts, so jobs will be isolated

> Are there any open source implementations, which provide something similar?
Yes, many.


On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta 
> wrote:
Dear Ashish,
what you are asking for involves at least a few weeks of dedicated 
understanding of your used case and then it takes at least 3 to 4 months to 
even propose a solution. You can even build a fantastic data warehouse just 
using C++. The matter depends on lots of conditions. I just think that your 
approach and question needs a lot of modification.

Regards,
Gourav

On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
> wrote:
Hi, Ashish.
You are correct in saying that not *all* functionality of Spark is 
spill-to-disk but I am not sure how this pertains to a "concurrent user 
scenario". Each executor will run in its own JVM and is therefore isolated from 
others. That is, if the JVM of one user dies, this should not effect another 
user who is running their own jobs in their own JVMs. The amount of resources 
used by a user can be controlled by the resource manager.
AFAIK, you configure something like YARN to limit the number of cores and the 
amount of memory in the cluster a certain user or group is allowed to use for 
their job. This is obviously quite a coarse-grained approach as (to my 
knowledge) IO is not throttled. I believe people generally use something like 
Apache Ambari to keep an eye on network and disk usage to mitigate problems in 
a shared cluster.

If the user has badly designed their query, it may very well fail with OOMEs 
but this can happen irrespective of whether one user or many is using the 
cluster at a given moment in time.

Does this help?
Regards,
Phillip

On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat 
> wrote:
Thanks Jorn and Phillip. My question was specifically to anyone who have tried 
creating a system using spark SQL, as Data Warehouse. I was trying to check, if 
someone has tried it and they can help with the kind of workloads which worked 
and the ones, which have problems.

Regarding spill to disk, I might be wrong but not all functionality of spark is 
spill to disk. So it still doesn't provide DB like reliability in execution. In 
case of DBs, queries get slow but they don't fail or go out of memory, 
specifically in concurrent user scenarios.

Regards,
Ashish

On Nov 12, 2017 3:02 PM, "Phillip Henry" 
> wrote:
Agree with Jorn. The answer is: it depends.

In the past, I've worked with data scientists who are happy to use the Spark 
CLI. Again, the answer is "it depends" (in this case, on the skills of your 
customers).
Regarding sharing resources, different teams were limited to their own queue so 
they could not hog all the resources. However, people within a team had to do 
some horse trading if they had a particularly intensive job to run. I did feel 
that this was an 

  1   2   3   4   >