Re: Difference between Checkpointing and Persist

2019-04-19 Thread Gene Pang
Hi Subash,

I'm not sure how the checkpointing works, but with
StorageLevel.MEMORY_AND_DISK, Spark will store the RDD in on-heap memory,
and spill to disk if necessary. However, the data is only usable by that
Spark job. Saving the RDD will write the data out to an external storage
system, like HDFS or Alluxio
.

There are some advantages of saving the RDD, mainly allowing different jobs
or even different frameworks to read that data. One possibility is to save
the RDD to Alluxio, which can store the data in-memory, improving the
throughput by avoiding the disk. Here is an article discussing different
ways to store RDDs

.

Thanks,
Gene

On Thu, Apr 18, 2019 at 10:49 AM Subash Prabakar 
wrote:

> Hi All,
>
> I have a doubt about checkpointing and persist/saving.
>
> Say we have one RDD - containing huge data,
> 1. We checkpoint and perform join
> 2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
> 3. We save that intermediate RDD and perform join (using same RDD - saving
> is to just persist intermediate result before joining)
>
>
> Which of the above is faster and whats the difference?
>
>
> Thanks,
> Subash
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-14 Thread Gene Pang
Yes, I think that is the case. I haven't tried that before, but it should
work.

Thanks,
Gene

On Fri, Apr 13, 2018 at 11:32 AM, Jason Boorn <jbo...@gmail.com> wrote:

> Hi Gene -
>
> Are you saying that I just need to figure out how to get the Alluxio jar
> into the classpath of my parent application?  If it shows up in the
> classpath then Spark will automatically know that it needs to use it when
> communicating with Alluxio?
>
> Apologies for going back-and-forth on this - I feel like my particular use
> case is clouding what is already a tricky issue.
>
> On Apr 13, 2018, at 2:26 PM, Gene Pang <gene.p...@gmail.com> wrote:
>
> Hi Jason,
>
> Alluxio does work with Spark in master=local mode. This is because both
> spark-submit and spark-shell have command-line options to set the classpath
> for the JVM that is being started.
>
> If you are not using spark-submit or spark-shell, you will have to figure
> out how to configure that JVM instance with the proper properties.
>
> Thanks,
> Gene
>
> On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn <jbo...@gmail.com> wrote:
>
>> Ok thanks - I was basing my design on this:
>>
>> https://databricks.com/blog/2016/08/15/how-to-use-sparksessi
>> on-in-apache-spark-2-0.html
>>
>> Wherein it says:
>> Once the SparkSession is instantiated, you can configure Spark’s runtime
>> config properties.
>> Apparently the suite of runtime configs you can change does not include
>> classpath.
>>
>> So the answer to my original question is basically this:
>>
>> When using local (pseudo-cluster) mode, there is no way to add external
>> jars to the spark instance.  This means that Alluxio will not work with
>> Spark when Spark is run in master=local mode.
>>
>> Thanks again - often getting a definitive “no” is almost as good as a
>> yes.  Almost ;)
>>
>> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin <van...@cloudera.com> wrote:
>>
>> There are two things you're doing wrong here:
>>
>> On Thu, Apr 12, 2018 at 6:32 PM, jb44 <jbo...@gmail.com> wrote:
>>
>> Then I can add the alluxio client library like so:
>> sparkSession.conf.set("spark.driver.extraClassPath",
>> ALLUXIO_SPARK_CLIENT)
>>
>>
>> First one, you can't modify JVM configuration after it has already
>> started. So this line does nothing since it can't re-launch your
>> application with a new JVM.
>>
>> sparkSession.conf.set("spark.executor.extraClassPath",
>> ALLUXIO_SPARK_CLIENT)
>>
>>
>> There is a lot of configuration that you cannot set after the
>> application has already started. For example, after the session is
>> created, most probably this option will be ignored, since executors
>> will already have started.
>>
>> I'm not so sure about what happens when you use dynamic allocation,
>> but these post-hoc config changes in general are not expected to take
>> effect.
>>
>> The documentation could be clearer about this (especially stuff that
>> only applies to spark-submit), but that's the gist of it.
>>
>>
>> --
>> Marcelo
>>
>>
>>
>
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Gene Pang
Hi Jason,

Alluxio does work with Spark in master=local mode. This is because both
spark-submit and spark-shell have command-line options to set the classpath
for the JVM that is being started.

If you are not using spark-submit or spark-shell, you will have to figure
out how to configure that JVM instance with the proper properties.

Thanks,
Gene

On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn  wrote:

> Ok thanks - I was basing my design on this:
>
> https://databricks.com/blog/2016/08/15/how-to-use-
> sparksession-in-apache-spark-2-0.html
>
> Wherein it says:
> Once the SparkSession is instantiated, you can configure Spark’s runtime
> config properties.
> Apparently the suite of runtime configs you can change does not include
> classpath.
>
> So the answer to my original question is basically this:
>
> When using local (pseudo-cluster) mode, there is no way to add external
> jars to the spark instance.  This means that Alluxio will not work with
> Spark when Spark is run in master=local mode.
>
> Thanks again - often getting a definitive “no” is almost as good as a
> yes.  Almost ;)
>
> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin  wrote:
>
> There are two things you're doing wrong here:
>
> On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:
>
> Then I can add the alluxio client library like so:
> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
>
>
> First one, you can't modify JVM configuration after it has already
> started. So this line does nothing since it can't re-launch your
> application with a new JVM.
>
> sparkSession.conf.set("spark.executor.extraClassPath",
> ALLUXIO_SPARK_CLIENT)
>
>
> There is a lot of configuration that you cannot set after the
> application has already started. For example, after the session is
> created, most probably this option will be ignored, since executors
> will already have started.
>
> I'm not so sure about what happens when you use dynamic allocation,
> but these post-hoc config changes in general are not expected to take
> effect.
>
> The documentation could be clearer about this (especially stuff that
> only applies to spark-submit), but that's the gist of it.
>
>
> --
> Marcelo
>
>
>


Re: share datasets across multiple spark-streaming applications for lookup

2017-10-31 Thread Gene Pang
Hi,

Alluxio enables sharing dataframes across different applications. This blog
post 
talks
about dataframes and Alluxio, and this Spark Summit presentation

has additional information.

Thanks,
Gene

On Tue, Oct 31, 2017 at 6:04 PM, Revin Chalil  wrote:

> Any info on the below will be really appreciated.
>
>
>
> I read about Alluxio and Ignite. Has anybody used any of them? Do they
> work well with multiple Apps doing lookups simultaneously? Are there better
> options? Thank you.
>
>
>
> *From: *roshan joe 
> *Date: *Monday, October 30, 2017 at 7:53 PM
> *To: *"user@spark.apache.org" 
> *Subject: *share datasets across multiple spark-streaming applications
> for lookup
>
>
>
> Hi,
>
>
>
> What is the recommended way to share datasets across multiple
> spark-streaming applications, so that the incoming data can be looked up
> against this shared dataset?
>
>
>
> The shared dataset is also incrementally refreshed and stored on S3. Below
> is the scenario.
>
>
>
> Streaming App-1 consumes data from Source-1 and writes to DS-1 in S3.
>
> Streaming App-2 consumes data from Source-2 and writes to DS-2 in S3.
>
>
>
>
> Streaming App-3 consumes data from Source-3, *needs to lookup against
> DS-1 and DS-2* and write to DS-3 in S3.
>
> Streaming App-4 consumes data from Source-4, *needs to lookup against
> DS-1 and DS-2 *and write to DS-3 in S3.
>
> Streaming App-n consumes data from Source-n, *needs to lookup against
> DS-1 and DS-2 *and write to DS-n in S3.
>
>
>
> So DS-1 and DS-2 ideally should be shared for lookup across multiple
> streaming apps. Any input is appreciated. Thank you!
>


Re: "Sharing" dataframes...

2017-06-21 Thread Gene Pang
Hi Jean,

As others have mentioned, you can use Alluxio with Spark dataframes
 to
keep the data in memory, and for other jobs to read them from memory again.

Hope this helps,
Gene

On Wed, Jun 21, 2017 at 8:08 AM, Jean Georges Perrin  wrote:

> I have looked at Livy in the (very recent past) past and it will not do
> the trick for me. It seems pretty greedy in terms of resources (or at least
> that was our experience). I will investigate how job-server could do the
> trick.
>
> (on a side note I tried to find a paper on memory lifecycle within Spark
> but was not very successful, maybe someone has a link to spare.)
>
> My need is to keep one/several dataframes in memory (well, within Spark)
> so it/they can be reused at a later time, without persisting it/them to
> disk (unless Spark wants to, of course).
>
>
>
> On Jun 21, 2017, at 10:47 AM, Michael Mior  wrote:
>
> This is a puzzling suggestion to me. It's unclear what features the OP
> needs, so it's really hard to say whether Livy or job-server aren't
> sufficient. It's true that neither are particularly mature, but they're
> much more mature than a homemade project which hasn't started yet.
>
> That said, I'm not very familiar with either project, so perhaps there are
> some big concerns I'm not aware of.
>
> --
> Michael Mior
> mm...@apache.org
>
> 2017-06-21 3:19 GMT-04:00 Rick Moritz :
>
>> Keeping it inside the same program/SparkContext is the most performant
>> solution, since you can avoid serialization and deserialization.
>> In-Memory-Persistance between jobs involves a memcopy, uses a lot of RAM
>> and invokes serialization and deserialization. Technologies that can help
>> you do that easily are Ignite (as mentioned) but also Alluxio, Cassandra
>> with in-memory tables and a memory-backed HDFS-directory (see tiered
>> storage).
>> Although livy and job-server provide the functionality of providing a
>> single SparkContext to mutliple programs, I would recommend you build your
>> own framework for integrating different jobs, since many features you may
>> need aren't present yet, while others may cause issues due to lack of
>> maturity. Artificially splitting jobs is in general a bad idea, since it
>> breaks the DAG and thus prevents some potential push-down optimizations.
>>
>> On Tue, Jun 20, 2017 at 10:17 PM, Jean Georges Perrin 
>> wrote:
>>
>>> Thanks Vadim & Jörn... I will look into those.
>>>
>>> jg
>>>
>>> On Jun 20, 2017, at 2:12 PM, Vadim Semenov 
>>> wrote:
>>>
>>> You can launch one permanent spark context and then execute your jobs
>>> within the context. And since they'll be running in the same context, they
>>> can share data easily.
>>>
>>> These two projects provide the functionality that you need:
>>> https://github.com/spark-jobserver/spark-jobserver#persisten
>>> t-context-mode---faster--required-for-related-jobs
>>> https://github.com/cloudera/livy#post-sessions
>>>
>>> On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin 
>>> wrote:
>>>
 Hey,

 Here is my need: program A does something on a set of data and produces
 results, program B does that on another set, and finally, program C
 combines the data of A and B. Of course, the easy way is to dump all on
 disk after A and B are done, but I wanted to avoid this.

 I was thinking of creating a temp view, but I do not really like the
 temp aspect of it ;). Any idea (they are all worth sharing)

 jg



 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>
>
>


Re: An Architecture question on the use of virtualised clusters

2017-06-02 Thread Gene Pang
As Vincent mentioned earlier, I think Alluxio can work for this. You can mount
your (potentially remote) storage systems to Alluxio
,
and deploy Alluxio co-located to the compute cluster. The computation
framework will still achieve data locality since Alluxio workers are
co-located, even though the existing storage systems may be remote. You can
also use tiered storage
 to
deploy using only memory, and/or other physical media.

Here are some blogs (Alluxio with Minio
,
Alluxio with HDFS
,
Alluxio with S3
)
which use similar architecture.

Hope that helps,
Gene

On Thu, Jun 1, 2017 at 1:45 AM, Mich Talebzadeh 
wrote:

> As a matter of interest what is the best way of creating virtualised
> clusters all pointing to the same physical data?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 June 2017 at 09:27, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> If mandatory, you can use a local cache like alluxio
>>
>> Le 1 juin 2017 10:23 AM, "Mich Talebzadeh"  a
>> écrit :
>>
>>> Thanks Vincent. I assume by physical data locality you mean you are
>>> going through Isilon and HCFS and not through direct HDFS.
>>>
>>> Also I agree with you that shared network could be an issue as well.
>>> However, it allows you to reduce data redundancy (you do not need R3 in
>>> HDFS anymore) and also you can build virtual clusters on the same data. One
>>> cluster for read/writes and another for Reads? That is what has been
>>> suggestes!.
>>>
>>> regards
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 1 June 2017 at 08:55, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
 I don't recommend this kind of design because you loose physical data
 locality and you will be affected by "bad neighboors" that are also using
 the network storage... We have one similar design but restricted to small
 clusters (more for experiments than production)

 2017-06-01 9:47 GMT+02:00 Mich Talebzadeh :

> Thanks Jorn,
>
> This was a proposal made by someone as the firm is already using this
> tool on other SAN based storage and extend it to Big Data
>
> On paper it seems like a good idea, in practice it may be a Wandisco
> scenario again..  Of course as ever one needs to EMC for reference calls
> ans whether anyone is using this product in anger.
>
>
>
> At the end of the day it's not HDFS.  It is OneFS with a HCFS API.
>  However that may suit our needs.  But  would need to PoC it and test it
> thoroughly!
>
>
> Cheers
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 1 June 2017 at 08:21, Jörn 

Re: Are tachyon and akka removed from 2.1.1 please

2017-05-22 Thread Gene Pang
Hi,

Tachyon has been renamed to Alluxio. Here is the documentation for running
Alluxio with Spark
.

Hope this helps,
Gene

On Sun, May 21, 2017 at 6:15 PM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> HI all,
> Iread some paper about source code, the paper base on version 1.2.  they
> refer the tachyon and akka.  When i read the 2.1code. I can not find the
> code abiut akka and tachyon.
>
> Are tachyon and akka removed from 2.1.1  please
>


Re: Spark <--> S3 flakiness

2017-05-12 Thread Gene Pang
Hi,

Yes, you can use Alluxio with Spark to read/write to S3. Here is a blog
post on Spark + Alluxio + S3
,
and here is some documentation for configuring Alluxio + S3

and configuring Spark + Alluxio
.

You mentioned that it required a lot of effort to get working. May I ask
what you ran into, and how you got it to work?

Thanks,
Gene

On Thu, May 11, 2017 at 11:55 AM, Miguel Morales 
wrote:

> Might want to try to use gzip as opposed to parquet.  The only way i
> ever reliably got parquet to work on S3 is by using Alluxio as a
> buffer, but it's a decent amount of work.
>
> On Thu, May 11, 2017 at 11:50 AM, lucas.g...@gmail.com
>  wrote:
> > Also, and this is unrelated to the actual question... Why don't these
> > messages show up in the archive?
> >
> > http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > Ideally I'd want to post a link to our internal wiki for these questions,
> > but can't find them in the archive.
> >
> > On 11 May 2017 at 07:16, lucas.g...@gmail.com 
> wrote:
> >>
> >> Looks like this isn't viable in spark 2.0.0 (and greater I presume).
> I'm
> >> pretty sure I came across this blog and ignored it due to that.
> >>
> >> Any other thoughts?  The linked tickets in:
> >> https://issues.apache.org/jira/browse/SPARK-10063
> >> https://issues.apache.org/jira/browse/HADOOP-13786
> >> https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.
> >>
> >> On 10 May 2017 at 22:24, Miguel Morales 
> wrote:
> >>>
> >>> Try using the DirectParquetOutputCommiter:
> >>> http://dev.sortable.com/spark-directparquetoutputcommitter/
> >>>
> >>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
> >>>  wrote:
> >>> > Hi users, we have a bunch of pyspark jobs that are using S3 for
> loading
> >>> > /
> >>> > intermediate steps and final output of parquet files.
> >>> >
> >>> > We're running into the following issues on a semi regular basis:
> >>> > * These are intermittent errors, IE we have about 300 jobs that run
> >>> > nightly... And a fairly random but small-ish percentage of them fail
> >>> > with
> >>> > the following classes of errors.
> >>> >
> >>> > S3 write errors
> >>> >
> >>> >> "ERROR Utils: Aborting task
> >>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code:
> 404,
> >>> >> AWS
> >>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
> >>> >> Error
> >>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
> >>> >
> >>> >
> >>> >>
> >>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
> >>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException:
> Status
> >>> >> Code:
> >>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null,
> AWS
> >>> >> Error
> >>> >> Message: One or more objects could not be deleted, S3 Extended
> Request
> >>> >> ID:
> >>> >> null"
> >>> >
> >>> >
> >>> >
> >>> > S3 Read Errors:
> >>> >
> >>> >> [Stage 1:=>
>  (27
> >>> >> + 4)
> >>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in
> stage
> >>> >> 1.0
> >>> >> (TID 11)
> >>> >> java.net.SocketException: Connection reset
> >>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> >>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> >>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> >>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> >>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> >>> >> at sun.security.ssl.SSLSocketImpl.readRecord(
> SSLSocketImpl.java:927)
> >>> >> at
> >>> >> sun.security.ssl.SSLSocketImpl.readDataRecord(
> SSLSocketImpl.java:884)
> >>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(
> AbstractSessionInputBuffer.java:198)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:178)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:200)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.impl.io.ContentLengthInputStream.close(
> ContentLengthInputStream.java:103)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(
> BasicManagedEntity.java:168)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.conn.EofSensorInputStream.checkClose(
> EofSensorInputStream.java:228)
> >>> >> at
> >>> >>
> >>> >> org.apache.http.conn.EofSensorInputStream.close(
> EofSensorInputStream.java:174)
> >>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> 

Re: [Spark Streaming] Dynamic Broadcast Variable Update

2017-05-04 Thread Gene Pang
As Tim pointed out, Alluxio (renamed from Tachyon) may be able to help you.
Here is some documentation on how to run Alluxio and Spark together
, and
here is a blog post on a Spark streaming + Alluxio use case

.

Hope that helps,
Gene

On Tue, May 2, 2017 at 11:56 AM, Nipun Arora 
wrote:

> Hi All,
>
> To support our Spark Streaming based anomaly detection tool, we have made
> a patch in Spark 1.6.2 to dynamically update broadcast variables.
>
> I'll first explain our use-case, which I believe should be common to
> several people using Spark Streaming applications. Broadcast variables are
> often used to store values "machine learning models", which can then be
> used on streaming data to "test" and get the desired results (for our case
> anomalies). Unfortunately, in the current spark, broadcast variables are
> final and can only be initialized once before the initialization of the
> streaming context. Hence, if a new model is learned the streaming system
> cannot be updated without shutting down the application, broadcasting
> again, and restarting the application. Our goal was to re-broadcast
> variables without requiring a downtime of the streaming service.
>
> The key to this implementation is a live re-broadcastVariable() interface,
> which can be triggered in between micro-batch executions, without any
> re-boot required for the streaming application. At a high level the task is
> done by re-fetching broadcast variable information from the spark driver,
> and then re-distribute it to the workers. The micro-batch execution is
> blocked while the update is made, by taking a lock on the execution. We
> have already tested this in our prototype deployment of our anomaly
> detection service and can successfully re-broadcast the broadcast variables
> with no downtime.
>
> We would like to integrate these changes in spark, can anyone please let
> me know the process of submitting patches/ new features to spark. Also. I
> understand that the current version of Spark is 2.1. However, our changes
> have been done and tested on Spark 1.6.2, will this be a problem?
>
> Thanks
> Nipun
>


Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-24 Thread Gene Pang
As Vincent mentioned, Alluxio helps with sharing data across different
Spark contexts. This blog post about Spark dataframes and Alluxio discusses
that use case
.

Thanks,
Gene

On Sat, Apr 22, 2017 at 2:14 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Look at alluxio for sharing across drivers or spark jobserver
>
> Le 22 avr. 2017 10:24 AM, "Hemanth Gudela"  a
> écrit :
>
>> Thanks for your reply.
>>
>>
>>
>> Creating a table is an option, but such approach slows down reads &
>> writes for a real-time analytics streaming use case that I’m currently
>> working on.
>>
>> If at all global temporary view could have been accessible across
>> sessions/spark contexts, that would have simplified my usecase a lot.
>>
>>
>>
>> But yeah, thanks for explaining the behavior of global temporary view,
>> now it’s clear J
>>
>>
>>
>> -Hemanth
>>
>>
>>
>> *From: *Felix Cheung 
>> *Date: *Saturday, 22 April 2017 at 11.05
>> *To: *Hemanth Gudela , "user@spark.apache.org"
>> 
>> *Subject: *Re: Spark SQL - Global Temporary View is not behaving as
>> expected
>>
>>
>>
>> Cross session is this context is multiple spark sessions from the same
>> spark context. Since you are running two shells, you are having different
>> spark context.
>>
>>
>>
>> Do you have to you a temp view? Could you create a table?
>>
>>
>>
>> _
>> From: Hemanth Gudela 
>> Sent: Saturday, April 22, 2017 12:57 AM
>> Subject: Spark SQL - Global Temporary View is not behaving as expected
>> To: 
>>
>>
>>
>> Hi,
>>
>>
>>
>> According to documentation
>> ,
>> global temporary views are cross-session accessible.
>>
>>
>>
>> But when I try to query a global temporary view from another spark shell
>> like thisà
>>
>> *Instance 1 of spark-shell*
>>
>> --
>>
>> scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")
>>
>>
>>
>> *Instance 2 of spark-shell *(while Instance 1 of spark-shell is still
>> alive)
>>
>> -
>>
>> scala> spark.sql("select * from global_temp.gView1").show()
>>
>> org.apache.spark.sql.AnalysisException: Table or view not found:
>> `global_temp`.`gView1`
>>
>> 'Project [*]
>>
>> +- 'UnresolvedRelation `global_temp`.`gView1`
>>
>>
>>
>> I am expecting that global temporary view created in shell 1 should be
>> accessible in shell 2, but it isn’t!
>>
>> Please correct me if I missing something here.
>>
>>
>>
>> Thanks (in advance),
>>
>> Hemanth
>>
>>
>>
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-21 Thread Gene Pang
Hi Georg,

Yes, that should be possible with Alluxio. Tachyon was renamed to Alluxio.

This article on how Alluxio is used for a Spark streaming use case

may be helpful.

Thanks,
Gene

On Fri, Apr 21, 2017 at 8:22 AM, Georg Heiler 
wrote:

> You could write your views to hive or maybe tachyon.
>
> Is the periodically updated data big?
>
> Hemanth Gudela  schrieb am Fr. 21. Apr. 2017
> um 16:55:
>
>> Being new to spark, I think I need your suggestion again.
>>
>>
>>
>> #2 you can always define a batch Dataframe and register it as view, and
>> then run a background then periodically creates a new Dataframe with
>> updated data and re-registers it as a view with the same name
>>
>>
>>
>> I seem to have misunderstood your statement and tried registering static
>> dataframe as a temp view (“myTempView”) using createOrReplaceView in one
>> spark session, and tried re-registering another refreshed dataframe as temp
>> view with same name (“myTempView”) in another session. However, with this
>> approach, I have failed to achieve what I’m aiming for, because views are
>> local to one spark session.
>>
>> From spark 2.1.0 onwards, Global view is a nice feature, but still would
>> not solve my problem, because global view cannot be updated.
>>
>>
>>
>> So after much thinking, I understood that you would have meant to use a
>> background running process in the same spark job that would periodically
>> create a new dataframe and re-register temp view with same name, within the
>> same spark session.
>>
>> Could you please give me some pointers to documentation on how to create
>> such asynchronous background process in spark streaming? Is Scala’s
>> “Futures” the way to achieve this?
>>
>>
>>
>> Thanks,
>>
>> Hemanth
>>
>>
>>
>>
>>
>> *From: *Tathagata Das 
>>
>>
>> *Date: *Friday, 21 April 2017 at 0.03
>> *To: *Hemanth Gudela 
>>
>> *Cc: *Georg Heiler , "user@spark.apache.org" <
>> user@spark.apache.org>
>>
>>
>> *Subject: *Re: Spark structured streaming: Is it possible to
>> periodically refresh static data frame?
>>
>>
>>
>> Here are couple of ideas.
>>
>> 1. You can set up a Structured Streaming query to update in-memory table.
>>
>> Look at the memory sink in the programming guide -
>> http://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#output-sinks
>>
>> So you can query the latest table using a specified table name, and also
>> join that table with another stream. However, note that this in-memory
>> table is maintained in the driver, and so you have be careful about the
>> size of the table.
>>
>>
>>
>> 2. If you cannot define a streaming query in the slow moving due to
>> unavailability of connector for your streaming data source, then you can
>> always define a batch Dataframe and register it as view, and then run a
>> background then periodically creates a new Dataframe with updated data and
>> re-registers it as a view with the same name. Any streaming query that
>> joins a streaming dataframe with the view will automatically start using
>> the most updated data as soon as the view is updated.
>>
>>
>>
>> Hope this helps.
>>
>>
>>
>>
>>
>> On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela <
>> hemanth.gud...@qvantel.com> wrote:
>>
>> Thanks Georg for your reply.
>>
>> But I’m not sure if I fully understood your answer.
>>
>>
>>
>> If you meant to join two streams (one reading Kafka, and another reading
>> database table), then I think it’s not possible, because
>>
>> 1.   According to documentation
>> ,
>> Structured streaming does not support database as a streaming source
>>
>> 2.   Joining between two streams is not possible yet.
>>
>>
>>
>> Regards,
>>
>> Hemanth
>>
>>
>>
>> *From: *Georg Heiler 
>> *Date: *Thursday, 20 April 2017 at 23.11
>> *To: *Hemanth Gudela , "user@spark.apache.org"
>> 
>> *Subject: *Re: Spark structured streaming: Is it possible to
>> periodically refresh static data frame?
>>
>>
>>
>> What about treating the static data as a (slow) stream as well?
>>
>>
>>
>> Hemanth Gudela  schrieb am Do., 20. Apr.
>> 2017 um 22:09 Uhr:
>>
>> Hello,
>>
>>
>>
>> I am working on a use case where there is a need to join streaming data
>> frame with a static data frame.
>>
>> The streaming data frame continuously gets data from Kafka topics,
>> whereas static data frame fetches data from a database table.
>>
>>
>>
>> However, as the underlying database table is getting updated often, I
>> must somehow manage to refresh my static data frame periodically to get the
>> latest information from underlying database table.
>>
>>
>>
>> 

Re: Spark 2.x OFF_HEAP persistence

2017-01-09 Thread Gene Pang
Yes, as far as I can tell, your description is accurate.

Thanks,
Gene

On Wed, Jan 4, 2017 at 9:37 PM, Vin J <winjos...@gmail.com> wrote:

> Thanks for the reply Gene. Looks like this means, with Spark 2.x, one has
> to change from rdd.persist(StorageLevel.OFF_HEAP) to 
> rdd.saveAsTextFile(alluxioPath)
> / rdd.saveAsObjectFile (alluxioPath) for guarantees like persisted rdd
> surviving a Spark JVM crash etc,  as also the other benefits you mention.
>
> Vin.
>
> On Thu, Jan 5, 2017 at 2:50 AM, Gene Pang <gene.p...@gmail.com> wrote:
>
>> Hi Vin,
>>
>> From Spark 2.x, OFF_HEAP was changed to no longer directly interface with
>> an external block store. The previous tight dependency was restrictive and
>> reduced flexibility. It looks like the new version uses the executor's off
>> heap memory to allocate direct byte buffers, and does not interface with
>> any external system for the data storage. I am not aware of a way to
>> connect the new version of OFF_HEAP to Alluxio.
>>
>> You can experience similar benefits of the old OFF_HEAP <-> Tachyon mode
>> as well as additional benefits like unified namespace
>> <http://www.alluxio.org/docs/master/en/Unified-and-Transparent-Namespace.html>
>>  or
>> sharing in-memory data across applications, by using the Alluxio
>> filesystem API
>> <http://www.alluxio.org/docs/master/en/File-System-API.html>.
>>
>> I hope this helps!
>>
>> Thanks,
>> Gene
>>
>> On Wed, Jan 4, 2017 at 10:50 AM, Vin J <winjos...@gmail.com> wrote:
>>
>>> Until Spark 1.6 I see there were specific properties to configure such
>>> as the external block store master url (spark.externalBlockStore.url) etc
>>> to use OFF_HEAP storage level which made it clear that an external Tachyon
>>> type of block store as required/used for OFF_HEAP storage.
>>>
>>> Can someone clarify how this has been changed in Spark 2.x - because I
>>> do not see config settings anymore that point Spark to an external block
>>> store like Tachyon (now Alluxio) (or am i missing seeing it?)
>>>
>>> I understand there are ways to use Alluxio with Spark, but how about
>>> OFF_HEAP storage - can Spark 2.x OFF_HEAP rdd persistence still exploit
>>> alluxio/external block store? Any pointers to design decisions/Spark JIRAs
>>> related to this will also help.
>>>
>>> Thanks,
>>> Vin.
>>>
>>
>>
>


Re: Spark 2.x OFF_HEAP persistence

2017-01-04 Thread Gene Pang
Hi Vin,

>From Spark 2.x, OFF_HEAP was changed to no longer directly interface with
an external block store. The previous tight dependency was restrictive and
reduced flexibility. It looks like the new version uses the executor's off
heap memory to allocate direct byte buffers, and does not interface with
any external system for the data storage. I am not aware of a way to
connect the new version of OFF_HEAP to Alluxio.

You can experience similar benefits of the old OFF_HEAP <-> Tachyon mode as
well as additional benefits like unified namespace

or
sharing in-memory data across applications, by using the Alluxio filesystem
API .

I hope this helps!

Thanks,
Gene

On Wed, Jan 4, 2017 at 10:50 AM, Vin J  wrote:

> Until Spark 1.6 I see there were specific properties to configure such as
> the external block store master url (spark.externalBlockStore.url) etc to
> use OFF_HEAP storage level which made it clear that an external Tachyon
> type of block store as required/used for OFF_HEAP storage.
>
> Can someone clarify how this has been changed in Spark 2.x - because I do
> not see config settings anymore that point Spark to an external block store
> like Tachyon (now Alluxio) (or am i missing seeing it?)
>
> I understand there are ways to use Alluxio with Spark, but how about
> OFF_HEAP storage - can Spark 2.x OFF_HEAP rdd persistence still exploit
> alluxio/external block store? Any pointers to design decisions/Spark JIRAs
> related to this will also help.
>
> Thanks,
> Vin.
>


Re: Sharing RDDS across applications and users

2016-10-27 Thread Gene Pang
Hi Mich,

Yes, Alluxio is commonly used to cache and share Spark RDDs and DataFrames
among different applications and contexts. The data typically stays in
memory, but with Alluxio's tiered storage, the "colder" data can be evicted
out to other medium, like SSDs and HDDs. Here is a blog post discussing
Spark RDDs and Alluxio:
https://www.alluxio.com/blog/effective-spark-rdds-with-alluxio

Also, Alluxio also has the concept of an "Under filesystem", which can help
you access your existing data across different storage systems. Here is
more information about the unified namespace abilities:
http://www.alluxio.org/docs/master/en/Unified-and-Transparent-Namespace.html

Hope that helps,
Gene

On Thu, Oct 27, 2016 at 3:39 AM, Mich Talebzadeh 
wrote:

> Thanks Chanh,
>
> Can it share RDDs.
>
> Personally I have not used either Alluxio or Ignite.
>
>
>1. Are there major differences between these two
>2. Have you tried Alluxio for sharing Spark RDDs and if so do you have
>any experience you can kindly share
>
> Regards
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 27 October 2016 at 11:29, Chanh Le  wrote:
>
>> Hi Mich,
>> Alluxio is the good option to go.
>>
>> Regards,
>> Chanh
>>
>> On Oct 27, 2016, at 5:28 PM, Mich Talebzadeh 
>> wrote:
>>
>>
>> There was a mention of using Zeppelin to share RDDs with many users. From
>> the notes on Zeppelin it appears that this is sharing UI and I am not sure
>> how easy it is going to be changing the result set with different users
>> modifying say sql queries.
>>
>> There is also the idea of caching RDDs with something like Apache Ignite.
>> Has anyone really tried this. Will that work with multiple applications?
>>
>> It looks feasible as RDDs are immutable and so are registered tempTables
>> etc.
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>
>


Re: Is there such thing as cache fusion with the underlying tables/files on HDFS

2016-09-19 Thread Gene Pang
Hi Mich,

While Alluxio is not a database (it exposes a file system interface), you
can use Alluxio to keep certain data in memory. With Alluxio, you can
selectively pin data in memory (http://www.alluxio.org/docs/
master/en/Command-Line-Interface.html#pin). There are also ways to control
how to read and write the data in Alluxio memory (
http://www.alluxio.org/docs/master/en/File-System-API.html). These options
and features can help you control how you access your data.

Thanks,
Gene

On Sat, Sep 17, 2016 at 9:53 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> I am seeing similar issues when I was working on Oracle with Tableau as
> the dashboard.
>
> Currently I have a batch layer that gets streaming data from
>
> source -> Kafka -> Flume -> HDFS
>
> It stored on HDFS as text files and a cron process sinks Hive table with
> the the external table build on the directory. I tried both ORC and Parquet
> but I don't think the query itself is the issue.
>
> Meaning it does not matter how clever your execution engine is, the fact
> you still have to do  considerable amount of Physical IO (PIO) as opposed
> to Logical IO (LIO) to get the data to Zeppelin is on the critical path.
>
> One option is to limit the amount of data in Zeppelin to certain number of
> rows or something similar. However, you cannot tell a user he/she cannot
> see the full data.
>
> We resolved this with Oracle by using Oracle TimesTen
> IMDB
> to cache certain tables in memory and get them refreshed (depending on
> refresh frequency) from the underlying table in Oracle when data is
> updated). That is done through cache fusion.
>
> I was looking around and came across Alluxio .
> Ideally I like to utilise such concept like TimesTen. Can one distribute
> Hive table data (or any table data) across the nodes cached. In that case
> we will be doing Logical IO which is about 20 times or more lightweight
> compared to Physical IO.
>
> Anyway this is the concept.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Question About OFF_HEAP Caching

2016-07-18 Thread Gene Pang
Hi,

If you want to use Alluxio with Spark 2.x, it is recommended to write to
and read from Alluxio with files. You can save an RDD with saveAsObjectFile
with an Alluxio path (alluxio://host:port/path/to/file), and you can read
that file from any other Spark job. Here is additional information on how
to run Spark with Alluxio:
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html

Hope that helps,
Gene

On Mon, Jul 18, 2016 at 12:11 AM, condor join 
wrote:

> Hi All,
>
> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use
> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in
> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP  For
> Caching
>
> (
> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
> I am confuse about this and I have follow questions:
>
> 1.In Spark 2.X, how should we use Tachyon for caching?
>
> 2.Is there any reason that must change in this way(I mean use off_heap
> directly instead of using Tachyon)
>
> Thanks a lot!
>
>
>
>


Re: Why so many parquet file part when I store data in Alluxio or File?

2016-07-08 Thread Gene Pang
Hi Chanh,

You should be able to set the Alluxio block size with:

sc.hadoopConfiguration.set("alluxio.user.block.size.bytes.default", "256mb")

I think you have many parquet files because you have many Spark executors
writing out their partition of the files.

Hope that helps,
Gene

On Sun, Jul 3, 2016 at 8:02 PM, Chanh Le  wrote:

> Hi Gene,
> Could you give some suggestions on that?
>
>
>
> On Jul 1, 2016, at 5:31 PM, Ted Yu  wrote:
>
> The comment from zhangxiongfei was from a year ago.
>
> Maybe something changed since them ?
>
> On Fri, Jul 1, 2016 at 12:07 AM, Chanh Le  wrote:
>
>> Hi Ted,
>> I set sc.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache",
>> true)
>>
>> sc.hadoopConfiguration.setLong("fs.local.block.size", 268435456)
>>
>> but It seems not working.
>>
>> 
>>
>>
>> On Jul 1, 2016, at 11:38 AM, Ted Yu  wrote:
>>
>> Looking under Alluxio source, it seems only "fs.hdfs.impl.disable.cache"
>> is in use.
>>
>> FYI
>>
>> On Thu, Jun 30, 2016 at 9:30 PM, Deepak Sharma 
>> wrote:
>>
>>> Ok.
>>> I came across this issue.
>>> Not sure if you already assessed this:
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6921
>>>
>>> The workaround mentioned may work for you .
>>>
>>> Thanks
>>> Deepak
>>> On 1 Jul 2016 9:34 am, "Chanh Le"  wrote:
>>>
 Hi Deepark,
 Thank for replying. The way to write into alluxio is
 df.write.mode(SaveMode.Append).partitionBy("network_id", "time"
 ).parquet("alluxio://master1:1/FACT_ADMIN_HOURLY”)


 I partition by 2 columns and store. I just want when I write
 it automatic write a size properly for what I already set in Alluxio 512MB
 per block.


 On Jul 1, 2016, at 11:01 AM, Deepak Sharma 
 wrote:

 Before writing coalesing your rdd to 1 .
 It will create only 1 output file .
 Multiple part file happens as all your executors will be writing their
 partitions to separate part files.

 Thanks
 Deepak
 On 1 Jul 2016 8:01 am, "Chanh Le"  wrote:

 Hi everyone,
 I am using Alluxio for storage. But I am little bit confuse why I am do
 set block size of alluxio is 512MB and my file part only few KB and too
 many part.
 Is that normal? Because I want to read it fast? Is that many part
 effect the read operation?
 How to set the size of file part?

 Thanks.
 Chanh





 



>>
>>
>
>


Re: Best practice for handing tables between pipeline components

2016-06-27 Thread Gene Pang
Yes, Alluxio (http://www.alluxio.org/) can be used to store data in-memory
between stages in a pipeline.

Here is more information about running Spark with Alluxio:
http://www.alluxio.org/documentation/v1.1.0/en/Running-Spark-on-Alluxio.html

Hope that helps,
Gene

On Mon, Jun 27, 2016 at 10:38 AM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Alluxio off heap memory would help to share cached objects
>
> On Mon, Jun 27, 2016 at 11:14 AM Everett Anderson 
> wrote:
>
>> Hi,
>>
>> We have a pipeline of components strung together via Airflow running on
>> AWS. Some of them are implemented in Spark, but some aren't. Generally they
>> can all talk to a JDBC/ODBC end point or read/write files from S3.
>>
>> Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS
>> or S3 and reading it back in, again, in every component, if it could stay
>> cached in memory in a Spark cluster.
>>
>> Our current investigation seems to lead us towards exploring if the
>> following things are possible:
>>
>>- Using a Hive metastore with S3 as its backing data store to try to
>>keep a mapping from table name to files on S3 (not sure if one can cache a
>>Hive table in Spark across contexts, though)
>>- Using something like the spark-jobserver to keep a Spark SQLContext
>>open across Spark components so they could avoid file I/O for cached 
>> tables
>>
>> What's the best practice for handing tables between Spark programs? What
>> about between Spark and non-Spark programs?
>>
>> Thanks!
>>
>> - Everett
>>
>>


Re: Spark 2.0 Preview After caching query didn't work and can't kill job.

2016-06-21 Thread Gene Pang
Hi,

It looks like this is not related to Alluxio. Have you tried running the
same job with different storage?

Maybe you could increase the Spark JVM heap size to see if that helps your
issue?

Hope that helps,
Gene

On Wed, Jun 15, 2016 at 8:52 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi everyone,
> I added more logs for my use case:
>
> When I cached all my data 500 mil records and count.
> I receive this.
> 16/06/16 10:09:25 ERROR TaskSetManager: Total size of serialized results
> of 27 tasks (1876.7 MB) is bigger than spark.driver.maxResultSize (1024.0
> MB)
> >>> that weird because I just count
> After increase maxResultSize to 10g
> I still waiting slow for result and error
> 16/06/16 10:09:25 INFO BlockManagerInfo: Removed taskresult_94 on
> slave1:27743 in memory (size: 69.5 MB, free: 6.2 GB)
> org.apache.spark.SparkException: Job aborted due to stage failure: Total
> size of serialized results of 15 tasks (1042.6 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB
> )
>   at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at scala.Option.foreach(Option.scala:257)
>   at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1876)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1889)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>   at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:883)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
>   at org.apache.spark.rdd.RDD.collect(RDD.scala:882)
>   at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
>   at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
>   at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
>   at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121)
>   at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collect(Dataset.scala:2128)
>   at
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2156)
>   at
> org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2155)
>   at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2155)
>   ... 48 elided
>
> I lost all my executors.
>
>
>
> On Jun 15, 2016, at 8:44 PM, Chanh Le <giaosu...@gmail.com> wrote:
>
> Hi Gene,
> I am using Alluxio 1.1.0.
> Spark 2.0 Preview version.
> Load from alluxio then cached and query for 2nd time. Spark will stuck.
>
>
>
> On Jun 15, 2016, at 8:42 PM, Gene Pang <gene.p...@gmail.com> wrote:
>
> Hi,
>
> Which version of Alluxio are you using?
>
> Thanks,
> Gene
>
> On Tue, Jun 14, 2016 at 3:45 AM, Chanh Le <giaosu...@gmail.com> wrote:
>
>> I am testing Spark 2.0
>> I load data from alluxio and cached then I query but the first query is
>> ok because it kick off cache action. But after that I run the query again
>> and it’s stuck.
>> I ran in cluster 5 nodes in spark-shell.
>>
>> Did anyone has this issue?
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: Limit pyspark.daemon threads

2016-06-15 Thread Gene Pang
As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory,
and you can then share that RDD across different jobs. If you would like to
run Spark on Alluxio, this documentation can help:
http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html

Thanks,
Gene

On Tue, Jun 14, 2016 at 12:44 AM, agateaaa  wrote:

> Hi,
>
> I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set
> spark.executor.cores to 1, but I see that whenever streaming batch starts
> processing data, see python -m pyspark.daemon processes increase gradually
> to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon
> takes up around 100 % CPU)
>
> After the processing is done 4 pyspark.daemon processes go away and we are
> left with one till the next batch run. Also sometimes the  CPU usage for
> executor process spikes to about 800% even though spark.executor.core is
> set to 1
>
> e.g. top output
> PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
> 19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33
> /usr/lib/j+ <--EXECUTOR
>
> 13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17
> python -m + <--pyspark.daemon
> 13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18
> python -m + <--pyspark.daemon
> 14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25
> python -m + <--pyspark.daemon
> 14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72
> python -m + <--pyspark.daemon
> 14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python
> -m + <--pyspark.daemon
>
>
>
> Is there any way to control the number of pyspark.daemon processes that
> get spawned ?
>
> Thank you
> Agateaaa
>
> On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser  wrote:
>
>> Hey Ken,
>>
>> 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap
>> storage option using Alluxio, formerly Tachyon, with which I have no
>> experience however.)
>>
>> 2. The worker memory setting is not a hard maximum unfortunately. What
>> happens is that during aggregation the Python daemon will check its process
>> size. If the size is larger than this setting, it will start spilling to
>> disk. I've seen many occasions where my daemons grew larger. Also, you're
>> relying on Python's memory management to free up space again once objects
>> are evicted. In practice, leave this setting reasonably small but make sure
>> there's enough free memory on the machine so you don't run into OOM
>> conditions. If the lower memory setting causes strains for your users, make
>> sure they increase the parallelism of their jobs (smaller partitions
>> meaning less data is processed at a time).
>>
>> 3. I believe that is the behavior you can expect when setting
>> spark.executor.cores. I've not experimented much with it and haven't looked
>> at that part of the code, but what you describe also reflects my
>> understanding. Please share your findings here, I'm sure those will be very
>> helpful to others, too.
>>
>> One more suggestion for your users is to move to the Pyspark DataFrame
>> API. Much of the processing will then happen in the JVM, and you will bump
>> into fewer Python resource contention issues.
>>
>> Best,
>> -Sven
>>
>>
>> On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken 
>> wrote:
>>
>>> This is extremely helpful!
>>>
>>> I’ll have to talk to my users about how the python memory limit should
>>> be adjusted and what their expectations are. I’m fairly certain we bumped
>>> it up in the dark past when jobs were failing because of insufficient
>>> memory for the python processes.
>>>
>>> So just to make sure I’m understanding correctly:
>>>
>>>
>>>- JVM memory (set by SPARK_EXECUTOR_MEMORY and/or
>>>SPARK_WORKER_MEMORY?) is where the RDDs are stored. Currently both of 
>>> those
>>>values are set to 90GB
>>>- spark.python.worker.memory controls how much RAM each python task
>>>can take maximum (roughly speaking. Currently set to 4GB
>>>- spark.task.cpus controls how many java worker threads will exist
>>>and thus indirectly how many pyspark daemon processes will exist
>>>
>>>
>>> I’m also looking into fixing my cron jobs so they don’t stack up by
>>> implementing flock in the jobs and changing how teardowns of the spark
>>> cluster work as far as failed workers.
>>>
>>> Thanks again,
>>> —Ken
>>>
>>> On Mar 26, 2016, at 4:08 PM, Sven Krasser  wrote:
>>>
>>> My understanding is that the spark.executor.cores setting controls the
>>> number of worker threads in the executor in the JVM. Each worker thread
>>> communicates then with a pyspark daemon process (these are not threads) to
>>> stream data into Python. There should be one daemon process per worker
>>> thread (but as I mentioned I sometimes see a low multiple).
>>>
>>> Your 4GB limit for Python is fairly high, that means even for 12 

Re: Spark 2.0 Preview After caching query didn't work and can't kill job.

2016-06-15 Thread Gene Pang
Hi,

Which version of Alluxio are you using?

Thanks,
Gene

On Tue, Jun 14, 2016 at 3:45 AM, Chanh Le  wrote:

> I am testing Spark 2.0
> I load data from alluxio and cached then I query but the first query is ok
> because it kick off cache action. But after that I run the query again and
> it’s stuck.
> I ran in cluster 5 nodes in spark-shell.
>
> Did anyone has this issue?
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Silly Question on my part...

2016-05-17 Thread Gene Pang
Hi Michael,

Yes, you can use Alluxio to share Spark RDDs. Here is a blog post about
getting started with Spark and Alluxio (
http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/),
and some documentation (
http://alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html).

I hope that helps,
Gene

On Tue, May 17, 2016 at 8:36 AM, Michael Segel 
wrote:

> Thanks for the response.
>
> That’s what I thought, but I didn’t want to assume anything.
> (You know what happens when you ass u me … :-)
>
>
> Not sure about Tachyon though.  Its a thought, but I’m very conservative
> when it comes to design choices.
>
>
> On May 16, 2016, at 5:21 PM, John Trengrove 
> wrote:
>
> If you are wanting to share RDDs it might be a good idea to check out
> Tachyon / Alluxio.
>
> For the Thrift server, I believe the datasets are located in your Spark
> cluster as RDDs and you just communicate with it via the Thrift
> JDBC Distributed Query Engine connector.
>
> 2016-05-17 5:12 GMT+10:00 Michael Segel :
>
>> For one use case.. we were considering using the thrift server as a way
>> to allow multiple clients access shared RDDs.
>>
>> Within the Thrift Context, we create an RDD and expose it as a hive table.
>>
>> The question  is… where does the RDD exist. On the Thrift service node
>> itself, or is that just a reference to the RDD which is contained with
>> contexts on the cluster?
>>
>>
>> Thx
>>
>> -Mike
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: Spark partition size tuning

2016-01-26 Thread Gene Pang
Hi Jia,

If you want to change the Tachyon block size, you can set the
tachyon.user.block.size.bytes.default parameter (
http://tachyon-project.org/documentation/Configuration-Settings.html). You
can set it via extraJavaOptions per job, or adding it to
tachyon-site.properties.

I hope that helps,
Gene

On Mon, Jan 25, 2016 at 8:13 PM, Jia Zou  wrote:

> Dear all,
>
> First to update that the local file system data partition size can be
> tuned by:
> sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize)
>
> However, I also need to tune Spark data partition size for input data that
> is stored in Tachyon (default is 512MB), but above method can't work for
> Tachyon data.
>
> Do you have any suggestions? Thanks very much!
>
> Best Regards,
> Jia
>
>
> -- Forwarded message --
> From: Jia Zou 
> Date: Thu, Jan 21, 2016 at 10:05 PM
> Subject: Spark partition size tuning
> To: "user @spark" 
>
>
> Dear all!
>
> When using Spark to read from local file system, the default partition
> size is 32MB, how can I increase the partition size to 128MB, to reduce the
> number of tasks?
>
> Thank you very much!
>
> Best Regards,
> Jia
>
>


Re: How to query data in tachyon with spark-sql

2016-01-24 Thread Gene Pang
Hi,

You should be able to point Hive to Tachyon instead of HDFS, and that
should allow Hive to access data in Tachyon. If Spark SQL was pointing to
an HDFS file, you could instead point it to a Tachyon file, and that should
work too.

Hope that helps,
Gene

On Wed, Jan 20, 2016 at 2:06 AM, Sea <261810...@qq.com> wrote:

> Hi,all
>  I want to mount some hive table in tachyon, but I don't know how to
> query data in tachyon with spark-sql, who knows?
>


Re: Reuse Executor JVM across different JobContext

2016-01-19 Thread Gene Pang
Yes, you can share RDDs with Tachyon, while keeping the data in memory.
Spark jobs can write to a Tachyon path (tachyon://host:port/path/) and
other jobs can read from the same path.

Here is a presentation that includes that use case:
http://www.slideshare.net/TachyonNexus/tachyon-presentation-at-ampcamp-6-november-2015

Thanks,
Gene

On Sun, Jan 17, 2016 at 1:56 PM, Mark Hamstra 
wrote:

> Yes, that is one of the basic reasons to use a
> jobserver/shared-SparkContext.  Otherwise, in order share the data in an
> RDD you have to use an external storage system, such as a distributed
> filesystem or Tachyon.
>
> On Sun, Jan 17, 2016 at 1:52 PM, Jia  wrote:
>
>> Thanks, Mark. Then, I guess JobServer can fundamentally solve my problem,
>> so that jobs can be submitted at different time and still share RDDs.
>>
>> Best Regards,
>> Jia
>>
>>
>> On Jan 17, 2016, at 3:44 PM, Mark Hamstra 
>> wrote:
>>
>> There is a 1-to-1 relationship between Spark Applications and
>> SparkContexts -- fundamentally, a Spark Applications is a program that
>> creates and uses a SparkContext, and that SparkContext is destroyed when
>> then Application ends.  A jobserver generically and the Spark JobServer
>> specifically is an Application that keeps a SparkContext open for a long
>> time and allows many Jobs to be be submitted and run using that shared
>> SparkContext.
>>
>> More than one Application/SparkContext unavoidably implies more than one
>> JVM process per Worker -- Applications/SparkContexts cannot share JVM
>> processes.
>>
>> On Sun, Jan 17, 2016 at 1:15 PM, Jia  wrote:
>>
>>> Hi, Mark, sorry for the confusion.
>>>
>>> Let me clarify, when an application is submitted, the master will tell
>>> each Spark worker to spawn an executor JVM process. All the task sets  of
>>> the application will be executed by the executor. After the application
>>> runs to completion. The executor process will be killed.
>>> But I hope that all applications submitted can run in the same executor,
>>> can JobServer do that? If so, it’s really good news!
>>>
>>> Best Regards,
>>> Jia
>>>
>>> On Jan 17, 2016, at 3:09 PM, Mark Hamstra 
>>> wrote:
>>>
>>> You've still got me confused.  The SparkContext exists at the Driver,
>>> not on an Executor.
>>>
>>> Many Jobs can be run by a SparkContext -- it is a common pattern to use
>>> something like the Spark Jobserver where all Jobs are run through a shared
>>> SparkContext.
>>>
>>> On Sun, Jan 17, 2016 at 12:57 PM, Jia Zou 
>>> wrote:
>>>
 Hi, Mark, sorry, I mean SparkContext.
 I mean to change Spark into running all submitted jobs (SparkContexts)
 in one executor JVM.

 Best Regards,
 Jia

 On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra 
 wrote:

> -dev
>
> What do you mean by JobContext?  That is a Hadoop mapreduce concept,
> not Spark.
>
> On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou 
> wrote:
>
>> Dear all,
>>
>> Is there a way to reuse executor JVM across different JobContexts?
>> Thanks.
>>
>> Best Regards,
>> Jia
>>
>
>

>>>
>>>
>>
>>
>


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Gene Pang
ctive, it's not yet clear to me what the best
> option(s) would be. Any thoughts / recommendations would be appreciated.
>
>
>
>
>
> On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Gene.
>>
>> Does Spark use Tachyon under the covers anyway for implementing its
>> "cluster memory" support?
>>
>> It seems that the practice I hear the most about is the idea of loading
>> resources as RDD's and then doing join's against them to achieve the lookup
>> effect.
>>
>> The other approach would be to load the resources into broadcast
>> variables but I've heard concerns about memory.  Could we run out of memory
>> if we load too much into broadcast vars?  Is there any memory_to_disk/spill
>> to disk capability for broadcast variables in Spark?
>>
>>
>> On Tue, Jan 12, 2016 at 11:19 AM, Gene Pang <gene.p...@gmail.com> wrote:
>>
>>> Hi Dmitry,
>>>
>>> Yes, Tachyon can help with your use case. You can read and write to
>>> Tachyon via the filesystem api (
>>> http://tachyon-project.org/documentation/File-System-API.html). There
>>> is a native Java API as well as a Hadoop-compatible API. Spark is also able
>>> to interact with Tachyon via the Hadoop-compatible API, so Spark jobs can
>>> read input files from Tachyon and write output files to Tachyon.
>>>
>>> I hope that helps,
>>> Gene
>>>
>>> On Tue, Jan 12, 2016 at 4:26 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> I'd guess that if the resources are broadcast Spark would put them into
>>>> Tachyon...
>>>>
>>>> On Jan 12, 2016, at 7:04 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>> Would it make sense to load them into Tachyon and read and broadcast
>>>> them from there since Tachyon is already a part of the Spark stack?
>>>>
>>>> If so I wonder if I could do that Tachyon read/write via a Spark API?
>>>>
>>>>
>>>> On Jan 12, 2016, at 2:21 AM, Sabarish Sasidharan <
>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>
>>>> One option could be to store them as blobs in a cache like Redis and
>>>> then read + broadcast them from the driver. Or you could store them in HDFS
>>>> and read + broadcast from the driver.
>>>>
>>>> Regards
>>>> Sab
>>>>
>>>> On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> We have a bunch of Spark jobs deployed and a few large resource files
>>>>> such as e.g. a dictionary for lookups or a statistical model.
>>>>>
>>>>> Right now, these are deployed as part of the Spark jobs which will
>>>>> eventually make the mongo-jars too bloated for deployments.
>>>>>
>>>>> What are some of the best practices to consider for maintaining and
>>>>> sharing large resource files like these?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Architect - Big Data
>>>> Ph: +91 99805 99458
>>>>
>>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>>> Sullivan India ICT)*
>>>> +++
>>>>
>>>>
>>>
>>
>


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-12 Thread Gene Pang
Hi Dmitry,

Yes, Tachyon can help with your use case. You can read and write to Tachyon
via the filesystem api (
http://tachyon-project.org/documentation/File-System-API.html). There is a
native Java API as well as a Hadoop-compatible API. Spark is also able to
interact with Tachyon via the Hadoop-compatible API, so Spark jobs can read
input files from Tachyon and write output files to Tachyon.

I hope that helps,
Gene

On Tue, Jan 12, 2016 at 4:26 AM, Dmitry Goldenberg  wrote:

> I'd guess that if the resources are broadcast Spark would put them into
> Tachyon...
>
> On Jan 12, 2016, at 7:04 AM, Dmitry Goldenberg 
> wrote:
>
> Would it make sense to load them into Tachyon and read and broadcast them
> from there since Tachyon is already a part of the Spark stack?
>
> If so I wonder if I could do that Tachyon read/write via a Spark API?
>
>
> On Jan 12, 2016, at 2:21 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
> One option could be to store them as blobs in a cache like Redis and then
> read + broadcast them from the driver. Or you could store them in HDFS and
> read + broadcast from the driver.
>
> Regards
> Sab
>
> On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We have a bunch of Spark jobs deployed and a few large resource files
>> such as e.g. a dictionary for lookups or a statistical model.
>>
>> Right now, these are deployed as part of the Spark jobs which will
>> eventually make the mongo-jars too bloated for deployments.
>>
>> What are some of the best practices to consider for maintaining and
>> sharing large resource files like these?
>>
>> Thanks.
>>
>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>
>


Re: org.apache.spark.storage.BlockNotFoundException in Spark1.5.2+Tachyon0.7.1

2016-01-09 Thread Gene Pang
Yes, the tiered storage feature in Tachyon can address this issue. Here is
a link to more information:
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html

Thanks,
Gene

On Wed, Jan 6, 2016 at 8:44 PM, Ted Yu  wrote:

> Have you seen this thread ?
>
> http://search-hadoop.com/m/q3RTtAiQta22XrCI
>
> On Wed, Jan 6, 2016 at 8:41 PM, Jia Zou  wrote:
>
>> Dear all,
>>
>> I am using Spark1.5.2 and Tachyon0.7.1 to run KMeans with
>> inputRDD.persist(StorageLevel.OFF_HEAP()).
>>
>> I've set tired storage for Tachyon. It is all right when working set is
>> smaller than available memory. However, when working set exceeds available
>> memory, I keep getting errors like below:
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.1 in stage
>> 0.0 (TID 206) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.1 in stage
>> 0.0 (TID 207) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.2 in stage
>> 0.0 (TID 208) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.2 in stage
>> 0.0 (TID 209) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.3 in stage
>> 0.0 (TID 210) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>>
>> Can any one give me some suggestions? Thanks a lot!
>>
>>
>> Best Regards,
>> Jia
>>
>
>