Accelerating Spark SQL / Dataframe using GPUs & Alluxio

2021-04-23 Thread Bin Fan
Hi Spark users,

We have been working on GPU acceleration for Apache Spark SQL / Dataframe
using the RAPIDS Accelerator for Apache Spark
<https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/>
and open source project Alluxio <https://github.com/Alluxio/alluxio>
without any code changes.
Our preliminary results suggest 2x improvement in performance and 70% in
ROI compared to a CPU-based cluster.

Feel free to read the developer blog <https://bit.ly/2QkXjxo> for more
details of the benchmark. If you are interested to discuss further with the
authors, join our free online meetup
<https://go.alluxio.io/community-alluxio-day-2021> next Tuesday morning
(April 27) Pacific time.

Best,

- Bin Fan


Evaluating Apache Spark with Data Orchestration using TPC-DS

2021-04-08 Thread Bin Fan
Dear Spark Users,

I am sharing a whitepaper on “Evaluating Apache Spark and Alluxio for Data
Analytics <https://bit.ly/2Pg2jms>”
which talks about how to benchmark Spark on Alluxio to accelerate TPCDS
benchmark results with details. Hope this helps. If you have any questions,
feel free to reach out to me

Best regards

- Bin Fan


Bursting Your On-Premises Data Lake Analytics and AI Workloads on AWS

2021-02-18 Thread Bin Fan
Hi everyone!

I am sharing this article about running Spark / Presto workloads on
AWS: Bursting
On-Premise Datalake Analytics and AI Workloads on AWS
<https://bit.ly/3qA1Tom> published on AWS blog. Hope you enjoy it. Feel
free to discuss with me here <https://alluxio.io/slack>.

- Bin Fan
Powered by Alluxio <https://www.alluxio.io/powered-by-alluxio/> | Alluxio
Slack Channel <https://alluxio.io/slack> | Data Orchestration Summit 2020
<https://www.alluxio.io/data-orchestration-summit-2020/>


Spark in hybrid cloud in AWS & GCP

2020-12-07 Thread Bin Fan
Dear Spark users,

If you are interested in running Spark in Hybrid Cloud? Checkout talks from
AWS & GCP at the virtual Data Orchestration Summit
<https://www.alluxio.io/data-orchestration-summit-2020/> on Dec. 8-9, 2020,
register for free <https://www.alluxio.io/data-orchestration-summit-2020/>.

The summit has speaker lineup spans creators and committers of Alluxio,
Spark, Presto, Tensorflow, K8s to data engineers and software engineers
building cloud-native data and AI platforms at Amazon, Alibaba, Comcast,
Facebook, Google, ING Bank, Microsoft, Tencent, and more!


- Bin Fan


Building High-performance Lake for Spark using OSS, Hudi, Alluxio

2020-11-23 Thread Bin Fan
 Hi Spark Users,

Check out this blog on Building High-performance Data Lake using Apache
Hudi, Spark and Alluxio at T3Go <https://bit.ly/373RYPi>
<https://bit.ly/373RYPi>

Cheers

- Bin Fan


Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Bin Fan
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
similar HDFS interface?
Like in this article:
https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/


On Wed, May 27, 2020 at 6:52 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
 Have you looked in Spark UI why this is the case ?
 S3 Reading can take more time - it depends also what s3 url you are
 using : s3a vs s3n vs S3.

 It could help after some calculation to persist in-memory or on HDFS.
 You can also initially load from S3 and store on HDFS and work from there .

 HDFS offers Data locality for the tasks, ie the tasks start on the
 nodes where the data is. Depending on what s3 „protocol“ you are using you
 might be also more punished with performance.

 Try s3a as a protocol (replace all s3n with s3a).

 You can also use s3 url but this requires a special bucket
 configuration, a dedicated empty bucket and it lacks some ineroperability
 with other AWS services.

 Nevertheless, it could be also something else with the code. Can you
 post an example reproducing the issue?

 > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
 relinquisheddra...@gmail.com>:
 >
 > 
 > Hi all,
 >
 > I am reading data from hdfs in the form of parquet files (around 3
 GB) and running an algorithm from the spark ml library.
 >
 > If I create the same spark dataframe by reading data from S3, the
 same algorithm takes considerably more time.
 >
 > I don't understand why this is happening. Is this a chance occurence
 or are the spark dataframes created different?
 >
 > I don't understand how the data store would effect the algorithm
 performance.
 >
 > Any help would be appreciated. Thanks a lot.

>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: What is directory "/path/_spark_metadata" for?

2019-11-11 Thread Bin Fan
Hey Mark,

I believe this is the name of the subdirectory that is used to store
metadata about which files are valid, see comment in code
https://github.com/apache/spark/blob/v2.3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L33
Do you see the exception as warnings or as errors in Alluxio master log? It
will be helpful to post the stack trace if it is available.
My hypothesis is that Spark in your case was testing creating such
directory

-Bin

On Wed, Aug 28, 2019 at 1:59 AM Mark Zhao  wrote:

> Hey,
>
>  When running Spark on Alluxio-1.8.2, I encounter the following exception:
> “alluxio.exception.FileDoseNotExistException: Path
> “/test-data/_spark_metadata” does not exist” in Alluxio master.log. What
> exactly is the directory "_spark_metadata" used for? And how can I fix this
> problem?
>
> Thanks.
>
> Mark
>


Re: Low cache hit ratio when running Spark on Alluxio

2019-09-19 Thread Bin Fan
Depending on the Alluxio version you are running, e..g, for 2.0, the
metrics of the local short-circuit read is not turned on by default.
So I would suggest you to first turn on the metrics collecting local
short-circuit reads by setting
alluxio.user.metrics.collection.enabled=true

Regarding the generic question to achieve high data locality when running
Spark on Alluxio, can you read
this article
https://www.alluxio.io/blog/top-10-tips-for-making-the-spark-alluxio-stack-blazing-fast/
and follow the suggests there. E.g., things can be weird on running Spark
on YARN for this case.

If you need more detailed instructions, feel free to join Alluxio community
channel https://slackin.alluxio.io <https://www.alluxio.io/slack>

- Bin Fan
alluxio.io <http://bit.ly/2JctWrJ> | powered by <http://bit.ly/2JdD0N2> | Data
Orchestration Summit 2019
<https://www.alluxio.io/data-orchestration-summit-2019/>

On Wed, Aug 28, 2019 at 1:49 AM Jerry Yan  wrote:

> Hi,
>
> We are running Spark jobs on an Alluxio Cluster which is serving 13
> gigabytes of data with 99% of the data is in memory. I was hoping to speed
> up the Spark jobs by reading the in-memory data in Alluxio, but found
> Alluxio local hit rate is only 1.68%, while Alluxio remote hit rate is
> 98.32%. By monitoring the network IO across all worker nodes through
> "dstat" command, I found that only two nodes had about 1GB of recv or send
> in the whole precessand, and it is sending  1GB or receiving 1GB during
> Spark Shuffle Stage. Is there any metrics I could check or configuration
> to tune ?
>
>
> Best,
>
> Jerry
>


Re: Can I set the Alluxio WriteType in Spark applications?

2019-09-19 Thread Bin Fan
Hi Mark,

You can follow the instructions here:
https://docs.alluxio.io/os/user/stable/en/compute/Spark.html#customize-alluxio-user-properties-for-individual-spark-jobs

Something like this:

$ spark-submit \--conf
'spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH'
\--conf 
'spark.executor.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH'
\...


Hope it helps

- Bin

On Tue, Sep 17, 2019 at 7:53 AM Mark Zhao  wrote:

> Hi,
>
> If Spark applications write data into alluxio, can WriteType be configured?
>
> Thanks,
> Mark
>
>


Re: How to fix ClosedChannelException

2019-05-16 Thread Bin Fan
Hi

This *java.nio.channels.ClosedChannelException* is often caused by a
connection timeout
between your Spark executors and Alluxio workers.
One simple and quick fix is to increase the timeout value to be larger
alluxio.user.network.netty.timeout

in
your Spark jobs.

Checkout how to run Spark with customized alluxio properties

.

- Bin


On Thu, May 9, 2019 at 4:39 AM u9g  wrote:

> Hey,
>
> When I run Spark on Alluxio, I encounter the following error. How can I
> fix this? Thanks
>
> Lost task 63.0 in stage 0.0 (TID 63, 172.28.172.165, executor 7):
> java.io.lOException: java.util.concurrent.ExecutionExcep tion:
> java.nio.channels.ClosedC hannelException
>
> Best,
> Andy Li
>
>
>
>


Re: How to configure alluxio cluster with spark in yarn

2019-05-16 Thread Bin Fan
hi Andy

Assuming you are running Spark with YARN, then I would recommend deploying
Alluxio in the same YARN cluster if you are looking for best performance.
Alluxio can also be deployed separated as a standalone service, but in that
case, you may need to transfer data from Alluxio cluster to your Spark/YARN
cluster.

Here is the documentation

about
deploying Alluxio with YARN.

- Bin

On Thu, May 9, 2019 at 4:19 AM u9g  wrote:

> Hey,
>
> I want to speed up the Spark task running in the Yarn cluster through
> Alluxio. Is Alluxio recommended to run in the same yarn cluster on the yarn
> mode? Should I deploy Alluxio independently on the nodes of the yarn
> cluster? Or deploy a cluster separately?
> Best,
> Andy Li
>
>
>
>


Re: cache table vs. parquet table performance

2019-04-17 Thread Bin Fan
Hi Tomas,

One option is to cache your table as Parquet files into Alluxio (which can
serve as an in-memory distributed caching layer for Spark in your case).

The code on Spark will be like

> df.write.parquet("alluxio://master:19998/data.parquet")> df = 
> sqlContext.read.parquet("alluxio://master:19998/data.parquet")

(See more details at the documentation
http://www.alluxio.org/docs/1.8/en/compute/Spark.html

)

This would require running Alluxio as a separate service (ideally colocated
with Spark servers), of course.
But also enables data sharing across Spark jobs.

- Bin




On Tue, Jan 15, 2019 at 10:29 AM Tomas Bartalos 
wrote:

> Hello,
>
> I'm using spark-thrift server and I'm searching for best performing
> solution to query hot set of data. I'm processing records with nested
> structure, containing subtypes and arrays. 1 record takes up several KB.
>
> I tried to make some improvement with cache table:
>
> cache table event_jan_01 as select * from events where day_registered =
> 20190102;
>
>
> If I understood correctly, the data should be stored in *in-memory
> columnar* format with storage level MEMORY_AND_DISK. So data which
> doesn't fit to memory will be spille to disk (I assume also in columnar
> format (?))
> I cached 1 day of data (1 M records) and according to spark UI storage tab
> none of the data was cached to memory and everything was spilled to disk.
> The size of the data was *5.7 GB.*
> Typical queries took ~ 20 sec.
>
> Then I tried to store the data to parquet format:
>
> CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02"
> as
>
> select * from event_jan_01;
>
>
> The whole parquet took up only *178MB.*
> And typical queries took 5-10 sec.
>
> Is it possible to tune spark to spill the cached data in parquet format ?
> Why the whole cached table was spilled to disk and nothing stayed in
> memory ?
>
> Spark version: 2.4.0
>
> Best regards,
> Tomas
>
>


Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?

2019-04-04 Thread Bin Fan
oops, sorry for the confusion. I mean "20% of the size of your input data
set" allocated to Alluxio as memory resource as the starting point.
after that, you can checkout the cache hit ratio into Alluxio space based
on the metrics collected in Alluxio web UI
<http://www.alluxio.org/docs/1.8/en/basic/Web-Interface.html#master-metrics>
.
If you see lower hit ratio, increase Alluxio storage size and vice versa.

Hope this helps,

- Bin

On Thu, Apr 4, 2019 at 9:29 PM Bin Fan  wrote:

> Hi Andy,
>
> It really depends on your workloads. I would suggest to allocate 20% of
> the size of your input data set
> as the starting point and see how it works.
>
> Also depending on your data source as the under store of Alluxio, if it is
> remote (e.g., cloud storage like S3 or GCS),
> you can perhaps use Alluxio to manage local disk or SSD storage resource
> rather than memory resource.
> In this case, the "local Alluxio storage" is still much faster compared to
> reading from remote storage.
> Check out the documentation on tiered storage configuration here (
> http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage
> )
>
> - Bin
>
> On Thu, Mar 21, 2019 at 8:26 AM u9g  wrote:
>
>> Hey,
>>
>> We have a cluster of 10 nodes each of which consists 128GB memory. We are
>> about to running Spark and Alluxio on the cluster.  We wonder how shall
>> allocate the memory to the Spark executor and the Alluxio worker on a
>> machine? Are there some recommendations? Thanks!
>>
>> Best,
>> Andy Li
>>
>>
>>
>>
>


Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?

2019-04-04 Thread Bin Fan
Hi Andy,

It really depends on your workloads. I would suggest to allocate 20% of the
size of your input data set
as the starting point and see how it works.

Also depending on your data source as the under store of Alluxio, if it is
remote (e.g., cloud storage like S3 or GCS),
you can perhaps use Alluxio to manage local disk or SSD storage resource
rather than memory resource.
In this case, the "local Alluxio storage" is still much faster compared to
reading from remote storage.
Check out the documentation on tiered storage configuration here (
http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage
)

- Bin

On Thu, Mar 21, 2019 at 8:26 AM u9g  wrote:

> Hey,
>
> We have a cluster of 10 nodes each of which consists 128GB memory. We are
> about to running Spark and Alluxio on the cluster.  We wonder how shall
> allocate the memory to the Spark executor and the Alluxio worker on a
> machine? Are there some recommendations? Thanks!
>
> Best,
> Andy Li
>
>
>
>


Re: Questions about caching

2018-12-24 Thread Bin Fan
Hi Andrew,

Since you mentioned the alternative solution with Alluxio
, here is a more comprehensive
tutorial on caching Spark dataframes on Alluxio:
https://www.alluxio.com/blog/effective-spark-dataframes-with-alluxio

Namely, caching your dataframe is simply running
df.write.parquet(alluxioFilePath)
and your dataframes are stored in Alluxio as parquet files and you can
share them with more users.
One advantage with Alluxio here is you can manually free the cached data
from memory tier or
set the TTL for the cached data if you'd like more control on the data.


- Bin

On Tue, Dec 11, 2018 at 9:13 AM Andrew Melo  wrote:

> Greetings, Spark Aficionados-
>
> I'm working on a project to (ab-)use PySpark to do particle physics
> analysis, which involves iterating with a lot of transformations (to
> apply weights and select candidate events) and reductions (to produce
> histograms of relevant physics objects). We have a basic version
> working, but I'm looking to exploit some of Spark's caching behavior
> to speed up the interactive computation portion of the analysis,
> probably by writing a thin convenience wrapper. I have a couple
> questions I've been unable to find definitive answers to, which would
> help me design this wrapper an efficient way:
>
> 1) When cache()-ing a dataframe where only a subset of the columns are
> used, is the entire dataframe placed into the cache, or only the used
> columns. E.G. does "df2" end up caching only "a", or all three
> columns?
>
> df1 = sc.read.load('test.parquet') # Has columns a, b, c
> df2 = df1.cache()
> df2.select('a').collect()
>
> 2) Are caches reference-based, or is there some sort of de-duplication
> based on the logical/physical plans. So, for instance, does spark take
> advantage of the fact that these two dataframes should have the same
> content:
>
> df1 = sc.read.load('test.parquet').cache()
> df2 = sc.read.load('test.parquet').cache()
>
> ...or are df1 and df2 totally independent WRT caching behavior?
>
> 2a) If the cache is reference-based, is it sufficient to hold a
> weakref to the python object to keep the cache in-scope?
>
> 3) Finally, the spark.externalBlockStore.blockManager is intriguing in
> our environment where we have multiple users concurrently analyzing
> mostly the same input datasets. We have enough RAM in our clusters to
> cache a high percentage of the very common datasets, but only if users
> could somehow share their caches (which, conveniently, are the larger
> datasets), We also have very large edge SSD cache servers we use to
> cache trans-oceanic I/O we could throw at this as well.
>
> It looks, however, like that API was removed in 2.0.0 and there wasn't
> a replacement. There are products like Alluxio, but they aren't
> transparent, requiring the user to manually cache their dataframes by
> doing save/loads to external files using "alluxio://" URIs. Is there
> no way around this behavior now?
>
> Sorry for the long email, and thanks!
> Andrew
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread Bin Fan
Hi,

If you are looking for how to run Spark on Alluxio (formerly Tachyon),
here is the documentation from Alluxio doc site:
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
It still works for Spark 2.x.

Alluxio team also published articles on when and why running Spark (2.x)
with Alluxio may benefit performance:
http://www.alluxio.com/2016/08/effective-spark-rdds-with-alluxio/

- Bin


On Mon, Sep 19, 2016 at 7:56 AM, aka.fe2s  wrote:

> Hi folks,
>
> What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention
> it no longer.
>
> --
> Oleksiy Dyagilev
>


Re: Question About OFF_HEAP Caching

2016-07-18 Thread Bin Fan
Here is one blog illustrating how to use Spark on Alluxio for this purpose.
Hope it will help:

http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/

On Mon, Jul 18, 2016 at 6:36 AM, Gene Pang  wrote:

> Hi,
>
> If you want to use Alluxio with Spark 2.x, it is recommended to write to
> and read from Alluxio with files. You can save an RDD with saveAsObjectFile
> with an Alluxio path (alluxio://host:port/path/to/file), and you can read
> that file from any other Spark job. Here is additional information on how
> to run Spark with Alluxio:
> http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
>
> Hope that helps,
> Gene
>
> On Mon, Jul 18, 2016 at 12:11 AM, condor join 
> wrote:
>
>> Hi All,
>>
>> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use
>> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in
>> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP  For
>> Caching
>>
>> (
>> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
>> I am confuse about this and I have follow questions:
>>
>> 1.In Spark 2.X, how should we use Tachyon for caching?
>>
>> 2.Is there any reason that must change in this way(I mean use off_heap
>> directly instead of using Tachyon)
>>
>> Thanks a lot!
>>
>>
>>
>>
>


Re: Possible to broadcast a function?

2016-06-29 Thread Bin Fan
following this suggestion, Aaron, you may take a look at Alluxio as the
off-heap in-memory data storage as input/output for Spark jobs if that
works for you.

See more intro on how to run Spark with Alluxio as data input / output.

http://www.alluxio.org/documentation/en/Running-Spark-on-Alluxio.html

- Bin

On Wed, Jun 29, 2016 at 8:40 AM, Sonal Goyal  wrote:

> Have you looked at Alluxio? (earlier tachyon)
>
> Best Regards,
> Sonal
> Founder, Nube Technologies 
> Reifier at Strata Hadoop World
> 
> Reifier at Spark Summit 2015
> 
>
> 
>
>
>
> On Wed, Jun 29, 2016 at 7:30 PM, Aaron Perrin 
> wrote:
>
>> The user guide describes a broadcast as a way to move a large dataset to
>> each node:
>>
>> "Broadcast variables allow the programmer to keep a read-only variable
>> cached on each machine rather than shipping a copy of it with tasks. They
>> can be used, for example, to give every node a copy of a large input
>> dataset in an efficient manner."
>>
>> And the broadcast example shows it being used with a variable.
>>
>> But, is it somehow possible to instead broadcast a function that can be
>> executed once, per node?
>>
>> My use case is the following:
>>
>> I have a large data structure that I currently create on each executor.
>> The way that I create it is a hack.  That is, when the RDD function is
>> executed on the executor, I block, load a bunch of data (~250 GiB) from an
>> external data source, create the data structure as a static object in the
>> JVM, and then resume execution.  This works, but it ends up costing me a
>> lot of extra memory (i.e. a few TiB when I have a lot of executors).
>>
>> What I'd like to do is use the broadcast mechanism to load the data
>> structure once, per node.  But, I can't serialize the data structure from
>> the driver.
>>
>> Any ideas?
>>
>> Thanks!
>>
>> Aaron
>>
>>
>