Re: Choice of IDE for Spark

2021-10-02 Thread Christian Pfarr
We use Jupyter on Hadoop https://jupyterhub-on-hadoop.readthedocs.io/en/latest/ 
for developing spark jobs directly inside the Cluster it should run.




With that you have direct access to yarn and hdfs (fully secured) without any 
migration steps.




You can control the size of your Jupyter yarn container and of course your 
spark session.







Regards,




Christian











\ Original-Nachricht 
Am 2. Okt. 2021, 01:21, Holden Karau schrieb:

>
>
>
> Personally I like Jupyter notebooks for my interactive work and then once 
> I’ve done my exploration I switch back to emacs with either scala-metals or 
> Python mode.
>
>
>
>
> I think the main takeaway is: do what feels best for you, there is no one 
> true way to develop in Spark.
>
>
>
>
> On Fri, Oct 1, 2021 at 1:28 AM Mich Talebzadeh 
> <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote:
>
>
> > Thanks guys for your comments.
> >
> >
> >
> >
> > I agree with you Florian that opening a terminal say in VSC allows you to 
> > run a shell script (an sh file) to submit your spark code, however, this 
> > really makes sense if your IDE is running on a Linux host submitting a job 
> > to a Kubernetes cluster or YARN cluster.
> >
> >
> >
> >
> > For Python, I will go with PyCharm which is specific to the Python world. 
> > With Spark, I have used IntelliJ with Spark plug in on MAC for development 
> > work. Then created a JAR file, gzipped the whole project and scped to an 
> > IBM sandbox, untarred it and ran it with a pre-prepared shell with 
> > environment plugin for dev, test, staging etc.
> >
> >
> >
> >
> > IDE is also useful for looking at csv, tsv type files or creating json from 
> > one form to another. For json validation,especially if the file is too 
> > large, you may have restriction loading the file to web json validator 
> > because of the risk of proprietary data being exposed. There is a tool 
> > called[ jq][jq] (a lightweight and flexible command-line JSON processor), 
> > that comes pretty handy to validate json. Download and install it on OS and 
> > run it as
> >
> >
> >
> >
> > zcat .tgz \| jq
> >
> >
> >
> >
> > That will validate the whole tarred and gzipped json file. Otherwise most 
> > of these IDE tools come with add-on plugins, for various needs. My 
> > preference would be to use the best available IDE for the job. VSC I would 
> > consider as a general purpose tool. If all fails, one can always use OS 
> > stuff like vi, vim, sed, awk etc 樂
> >
> >
> >
> >
> >
> >
> >
> > Cheers
> >
> >
> >
> >
> > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view
> >  my Linkedin profile][]
> >
> > **Disclaimer:** Use it at your own risk.Any and all responsibility for any 
> > loss, damage or destruction of data or any other property which may arise 
> > from relying on this email's technical content is explicitly disclaimed. 
> > The author will in no case be liable for any monetary damages arising from 
> > such loss, damage or destruction.
> >
> >
> >
> >
> >
> >
> >
> > On Fri, 1 Oct 2021 at 06:55, Florian CASTELAIN 
> > <[florian.castel...@redlab.io][Florian.CASTELAIN_redlab.io]> wrote:
> >
> >
> > > Hello.
> > >
> > >
> > >
> > >
> > > Any "evolved" code editor allows you to create tasks (or builds, or 
> > > whatever they are called in the IDE you chose). If you do not find 
> > > anything that packages by default all you need, you could just create 
> > > your own tasks.
> > >
> > > *For yarn, one needs to open a terminal and submit from there.*
> > >
> > >
> > >
> > >
> > >
> > > You can create task(s) that launch your yarn commands.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *With VSC, you get stuff for working with json files but I am not sure 
> > > with a plugin for Python*
> > >
> > >
> > >
> > >
> > >
> > > In your json task configuration, you can launch whatever you want: 
> > > python, shell. I bet you could launch your favorite video game (just make 
> > > a task called "let's have a break" )
> > >
> > >
> > >
> > >
> > > Just to say,

Re: Bechmarks on Spark running on Yarn versus Spark on K8s

2021-07-05 Thread Christian Pfarr



Does anyone know where the data for this benchmark was stored?







Spark on YARN gets performance because of data locality via co-allocation of 
YARN Nodemanager and HDFS Datanode, not because of the job scheduler, right?







Regards,




z0ltrix

















\ Original-Nachricht 
Am 5. Juli 2021, 21:27, Madaditya .Maddy schrieb:

>
>
>
> I came across an article that benchmarked spark on k8s vs yarn by 
> Datamechanics.
>
>
>
>
> Link : 
> https://www.datamechanics.co/blog-post/apache-spark-performance-benchmarks-show-kubernetes-has-caught-up-with-yarn
>
>
>
>
> \-Regards
>
> Aditya
>
>
>
>
> On Mon, Jul 5, 2021, 23:49 Mich Talebzadeh 
> <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote:
>
>
> > Thanks Yuri. Those are very valid points.
> >
> >
> >
> >
> > Let me clarify my point. Let us assume that we will be using Yarn versus 
> > K8s doing the same job. Spark-submit will use Yarn at first instance and 
> > will then switch to using k8s for the same task.
> >
> >
> >
> >
> > 1.  Have there been such benchmarks?
> > 2.  When should I choose PaaS versus k8s for example for small to medium 
> > size jobs
> > 3.  I can see the flexibility of running Spark on Dataproc, then some may 
> > argue that k8s are the way forward
> > 4.  Bear in mind that I am only considering Spark. For example for Kafka 
> > and Zookeeper we opt for dockers as they do a single function.
> >
> >
> >
> >
> > Cheers,
> >
> >
> >
> >
> > Mich
> >
> >
> >
> >
> > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view
> >  my Linkedin profile][]
> >
> > **Disclaimer:** Use it at your own risk.Any and all responsibility for any 
> > loss, damage or destruction of data or any other property which may arise 
> > from relying on this email's technical content is explicitly disclaimed. 
> > The author will in no case be liable for any monetary damages arising from 
> > such loss, damage or destruction.
> >
> >
> >
> >
> >
> >
> >
> > ‪On Mon, 5 Jul 2021 at 19:06, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ 
> > <[yur...@gmail.com][yurkao_gmail.com]> wrote:‬
> >
> >
> > > Not a big expert on Spark, but I’m not really understand how you are 
> > > going to compare and what? Reading-writing to and from Hdfs? How does it 
> > > related to yarn and k8s… these are recourse managers (YARN yet another 
> > > resource manager) : what and how much to allocate and when… (cpu, ram).
> > >
> > > Local Disk spilling? Depends on disk throughput…
> > >
> > > So what you are going to measure?
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > Best regards
> > >
> > >
> > >
> > >
> > > > On 5 Jul 2021, at 20:43, Mich Talebzadeh 
> > > > <[mich.talebza...@gmail.com][mich.talebzadeh_gmail.com]> wrote:
> > > >
> > > >
> > >
> > > > 
> > > >
> > > >
> > > >
> > > >
> > > > I was curious to know if there are benchmarks around on comparison 
> > > > between Spark on Yarn compared to Kubernetes.
> > > >
> > > >
> > > >
> > > >
> > > > This question arose because traditionally in Google Cloud we have been 
> > > > using Spark on Dataproc clusters.[ Dataproc][Dataproc] provides Spark, 
> > > > Hadoop plus others (optional install) for data and analytic processing. 
> > > > It is PaaS
> > > >
> > > >
> > > >
> > > >
> > > > Now they have GKE clusters as well and also introduced [Apache Spark 
> > > > with Cloud Dataproc on Kubernetes][] which allows one to submit Spark 
> > > > jobs to k8s using Dataproc stub as a platform to submit the job as 
> > > > below from cloud console or local
> > > >
> > > >
> > > >
> > > >
> > > > gcloud dataproc jobs submit pyspark --cluster="dataproc-for-gke" 
> > > > gs://bucket/testme.py --region="europe-west2" --py-files 
> > > > gs://bucket/DSBQ.zip
> > > > Job \[e5fc19b62cf744f0b13f3e6d9cc66c19\] submitted.
> > > > Waiting for job output...
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > At the moment it is a struggle to see what merits using k8s instead of 
> > > > dataproc bar notebooks etc. Actually there is not much literature 
> > > > around with PySpark on k8s.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > For me Spark on bare metal is the preferred option as I cannot see how 
> > > > one can pigeon hole Spark into a container and make it performant but I 
> > > > may be totally wrong.
> > > >
> > > >
> > > >
> > > >
> > > > Thanks
> > > >
> > > >
> > > >
> > > >
> > > > ![uc_export_download_id_1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ_revid_0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ][][view
> > > >  my Linkedin profile][]
> > > >
> > > > **Disclaimer:** Use it at your own risk.Any and all responsibility for 
> > > > any loss, damage or destruction of data or any other property which may 
> > > > arise from relying on this email's technical content is explicitly 
> > > > disclaimed. The author will in no case be liable for any monetary 
> > > > damages arising from such loss, damage or 

unsubscribe

2020-01-17 Thread Christian Acuña



RE: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-26 Thread van den Heever, Christian CC
Hi,

How do I get the filename from

textFileStream


Using streaming.

Thanks a mill
Standard Bank email disclaimer and confidentiality note
Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read 
our email disclaimer and confidentiality note. Kindly email 
disclai...@standardbank.co.za (no content or subject line necessary) if you 
cannot view that page and we will email our email disclaimer and 
confidentiality note to you.


RE: Dose pyspark supports python3.6?

2017-11-01 Thread van den Heever, Christian CC
Dear Spark users

I have been asked to provide a presentation / business case as to why to use 
spark and java as ingestion tool for HDFS and HIVE
And why to move away from an etl tool.

Could you be so kind as to provide with some pros and cons to this.

I have the following :

Pros:
In house build – code can be changes on the fly to suite business need.
Software is free
Can out of the box run on all nodes
Will support all Apache based software.
Fast deu to in memory processing
Spark UI can visualise execution
Support checkpoint data loads
Support echama regesty for custom schema and inference.
Support Yarn execution
Mlibs can be used in need.
Data linage support deu to spar usage.

Cons
Skills needed to maintain and build
In memory cabibility can become bottleneck if not managed
No ETL gui.

Maybe point be to an article if you have one.

Thanks a mill.
Christian

Standard Bank email disclaimer and confidentiality note
Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read 
our email disclaimer and confidentiality note. Kindly email 
disclai...@standardbank.co.za (no content or subject line necessary) if you 
cannot view that page and we will email our email disclaimer and 
confidentiality note to you.


RE: Is Spark suited for this use case?

2017-10-15 Thread van den Heever, Christian CC
Hi,

We basically have the same scenario but worldwide as we have bigger Datasets we 
use OGG --> local --> Sqoop Into Hadoop.
By all means you can have spark reading the oracle tables and then do some 
changes to data in need which will not be done on scoop qry. Ie fraudulent 
detection on transaction records.

But some time the simplest way is the best. Unless you need a change or need 
more then I would advise not using another hop.
I would rather move away from files as OGG can do files and direct table 
loading then sqoop for the rest.

Simpler is better.

Hope this helps.
C.

From: Saravanan Thirumalai [mailto:saravanan.thiruma...@gmail.com]
Sent: Monday, 16 October 2017 4:29 AM
To: user@spark.apache.org
Subject: Is Spark suited for this use case?

We are an Investment firm and have a MDM platform in oracle at a vendor 
location and use Oracle Golden Gate to replicat data to our data center for 
reporting needs.
Our data is not big data (total size 6 TB including 2 TB of archive data). 
Moreover our data doesn't get updated often, nightly once (around 50 MB) and 
some correction transactions during the day (<10 MB). We don't have external 
users and hence data doesn't grow real-time like e-commerce.

When we replicate data from source to target, we transfer data through files. 
So, if there are DML operations (corrections) during day time on a source 
table, the corresponding file would have probably 100 lines of table data that 
needs to be loaded into the target database. Due to low volume of data we 
designed this through Informatica and this works in less than 2-5 minutes. Can 
Spark be used in this case or would it be an overkill of technology use?



Standard Bank email disclaimer and confidentiality note
Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read 
our email disclaimer and confidentiality note. Kindly email 
disclai...@standardbank.co.za (no content or subject line necessary) if you 
cannot view that page and we will email our email disclaimer and 
confidentiality note to you.


[ANNOUNCE] Apache Bahir 2.1.0 Released

2017-02-22 Thread Christian Kadner
The Apache Bahir community is pleased to announce the release
of Apache Bahir 2.1.0 which provides the following extensions for
Apache Spark 2.1.0:

   - Akka Streaming
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

http://bahir.apache.org

The Apache Bahir streaming connectors are also available at:

https://spark-packages.org/?q=bahir

---
Best regards,
Christian Kadner


[ANNOUNCE] Apache Bahir 2.0.2

2017-01-28 Thread Christian Kadner
The Apache Bahir PMC approved the release of Apache Bahir 2.0.2
which provides the following extensions for Apache Spark 2.0.2:

   - Akka Streaming
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

http://bahir.apache.org

The Apache Bahir streaming connectors are also available at:

https://spark-packages.org/?q=bahir

---
Best regards,
Christian Kadner

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Nested Array of JSON with empty field

2016-06-03 Thread Christian Hellström
If that's your JSON file, then the first problem is that it's incorrectly
formatted.

Apart from that you can just read the JSON into a DataFrame with
sqlContext.read.json() and then select directly on the DataFrame without
having to register a temporary table: jsonDF.select("firstname",
"address.state", ...). Works for me (with a properly formatted JSON
document). To make sure that your JSON is read correctly, check
jsonDF.printSchema. If there is an entry with corrupt records (or similar),
you know there's a problem with the JSON structure.

On 3 June 2016 at 21:31, Jerry Wong  wrote:

> Hi,
>
> I met a problem of empty field in the nested JSON file with Spark SQL. For
> instance,
> There are two lines of JSON file as follows,
>
> {
> "firstname": "Jack",
> "lastname": "Nelson",
> "address": {
> "state": "New York",
> "city": "New York"
> }
> }{
> "firstname": "Landy",
> "middlename": "Ken",
> "lastname": "Yong",
> "address": {
> "state": "California",
> "city": "Los Angles"
> }
> }
>
> I use Spark SQL to get the files like,
> val row = sqlContext.sql("SELECT firstname, middlename, lastname,
> address.state, address.city FROM jsontable")
> The compile will tell me the error of line1: no "middlename".
> How do I handle this case in the SQL sql?
>
> Many thanks in advance!
> Jerry
>
>
>


RE: Spark Streaming heap space out of memory

2016-05-30 Thread Dancuart, Christian
While it has heap space, batches run well below 15 seconds.

Once it starts to run out of space, processing time takes about 1.5 minutes. 
Scheduling delay is around 4 minutes and total delay around 5.5 minutes. I 
usually shut it down at that point.

The number of stages (and pending stages) does seem to be quite high and 
increases over time.

4584foreachRDD at HDFSPersistence.java:52 2016/05/30 16:23:52  1.9 min  
  36/36 (4964 skipped) 285/285 (28026 skipped)
4586transformToPair at SampleCalculator.java:88  2016/05/30 
16:25:02  0.2 s  1/1   4/4
4585(Unknown Stage Name) 2016/05/30 16:23:52  1.2 min
1/1   1/1
4582(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1 (4063 
skipped)  12/12 (22716 skipped)
4583(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1   1/1
4580(Unknown Stage Name) 2016/05/30 16:16:38  4.0 min
36/36 (4879 skipped)285/285 (27546 skipped)
4581(Unknown Stage Name) 2016/05/30 16:16:38  0.1 s1/1   4/4
4579(Unknown Stage Name) 2016/05/30 16:15:53  45 s 1/1   1/1
4578(Unknown Stage Name) 2016/05/30 16:14:38  1.3 min
1/1 (3993 skipped)  12/12 (22326 skipped)
4577(Unknown Stage Name) 2016/05/30 16:14:37  0.8 s1/1   
1/1Is this what you mean by pending stages?

I have taken a few heap dumps but I’m not sure what I am looking at for the 
problematic classes.

From: Shahbaz [mailto:shahzadh...@gmail.com]
Sent: 2016, May, 30 3:25 PM
To: Dancuart, Christian
Cc: user
Subject: Re: Spark Streaming heap space out of memory

Hi Christian,


  *   What is the processing time of each of your Batch,is it exceeding 15 
seconds.
  *   How many jobs are queued.
  *   Can you take a heap dump and see which objects are occupying the heap.

Regards,
Shahbaz


On Tue, May 31, 2016 at 12:21 AM, 
christian.dancu...@rbc.com<mailto:christian.dancu...@rbc.com> 
<christian.dancu...@rbc.com<mailto:christian.dancu...@rbc.com>> wrote:
Hi All,

We have a spark streaming v1.4/java 8 application that slows down and
eventually runs out of heap space. The less driver memory, the faster it
happens.

Appended is our spark configuration and a snapshot of the of heap taken
using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
keep growing as we observe. We also tried to use G1GC, but it acts the same.

Our dependency graph contains multiple updateStateByKey() calls. For each,
we explicitly set the checkpoint interval to 240 seconds.

We have our batch interval set to 15 seconds; with no delays at the start of
the process.

Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
spark.streaming.minRememberDuration=180s
spark.ui.showConsoleProgress=false
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=10
spark.ui.retainedJobs=10
spark.ui.retainedStages=10
spark.worker.ui.retainedExecutors=10
spark.worker.ui.retainedDrivers=10
spark.sql.ui.retainedExecutions=10
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=128m

num #instances #bytes  class name
--
   1:   8828200  565004800  org.apache.spark.storage.RDDInfo
   2:  20794893  499077432  scala.collection.immutable.$colon$colon
   3:   9646097  459928736  [C
   4:   9644398  231465552  java.lang.String
   5:  12760625  20417  java.lang.Integer
   6: 21326  98632  [B
   7:556959   44661232  [Lscala.collection.mutable.HashEntry;
   8:   1179788   37753216
java.util.concurrent.ConcurrentHashMap$Node
   9:   1169264   37416448  java.util.Hashtable$Entry
  10:552707   30951592  org.apache.spark.scheduler.StageInfo
  11:367107   23084712  [Ljava.lang.Object;
  12:556948   22277920  scala.collection.mutable.HashMap
  13:  2787   22145568
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  14:116997   12167688  org.apache.spark.executor.TaskMetrics
  15:3604258650200
java.util.concurrent.LinkedBlockingQueue$Node
  16:3604178650008
org.apache.spark.deploy.history.yarn.HandleSparkEvent
  17:  83328478088  [Ljava.util.Hashtable$Entry;
  18:3510618425464  scala.collection.mutable.ArrayBuffer
  19:1169638421336  org.apache.spark.scheduler.TaskInfo
  20:4461367138176  scala.Some
  21:2119685087232
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  22:1169634678520
org.apache.spark.scheduler.SparkListenerTaskEnd
  23:1076794307160
org.apache.spark.executor.ShuffleWriteMetrics
  24: 72

Re: problem about RDD map and then saveAsTextFile

2016-05-27 Thread Christian Hellström
Internally, saveAsTextFile uses saveAsHadoopFile:
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
.

The final bit in the method first creates the output path and then saves
the data set. However, if there is an issue with the saveAsHadoopDataset
call, the path still remains. Technically, we could add an
exception-handling section that removes the path in case of problems. I
think that would be a nice way of making sure that we don’t litter the FS
with empty files and directories in case of exceptions.

So, to your question: parameter to saveAsTextFile is a path (not a file)
and it has to be empty. Spark automatically names the files PART-N with N
the partition number. This follows immediately from the partitioning scheme
of the RDD itself.

The real problem is that there is a problem with the calculation. You might
want to fix that first. Just post the relevant bits from the log.
Hi all:
 I’ve tried to execute something as below:

 result.map(transform).saveAsTextFile(hdfsAddress)

 Result is a RDD caluculated from mlilib algorithm.


I submit this to yarn, and after two attempts , the application failed.

But the exception in log is very missleading. It said  hdfsAddress already
exits.

Actually, the first attempt log showed that the exception is from the
calculation of

result. Though the attempt failed it created the file. And then attempt 2
began with

exception ‘file already exists’.


 Why was RDD calculation before already failed but also the file created?
That’s not so good I think.


REST-API for Killing a Streaming Application

2016-03-24 Thread Christian Kurz

Hello Spark Streaming Gurus,

for better automation I need to manage my Spark Streaming Applications 
remotely. These applications read from Kafka and therefore have a 
receiver job and are started via spark-submit. For now I have only found 
a REST-API for killing Spark applications remotely, but that only works 
if the Spark application runs on a Standalone Spark Server.


Q1: Is it correct that the Spark WebUI does not provide any way of 
killing an application via a REST call? Should I file a JIRA for this?



My second question/ problem is around the fact that even for the 
Standalone Spark Server I am unable to get timely/complete shutdowns:


I see long delays until the spark-submit OS process terminates after the 
kill has been sent. This delay happens even though spark-submit 
immediately recognizes the kill on stdout:


org.apache.spark.SparkException: Job aborted due to stage failure: 
Master removed our application: KILLED


But then hangs and to me it looks like the Spark receiver is never shut 
down and may even be running for ever on the Spark Standalone server: 
the application is marked in status "KILLED", but the link "Application 
Detail UI" still works and shows that "Streaming job running receiver 0 
(Job 2)" is still running.


Note 1: when I kill the same spark-submit job using Ctrl-C, the 
application immediately stops as expected. Including the shutdown of 
Kafka Receiver. On the Spark Standalone server the application is then 
in status "FINISHED" and the link "Application Detail UI" takes me to 
"Application history not found (app-20160324022723-0015)".


Q2: Should the REST-API call have (immediately) shut down my 
spark-submit job? Should I file a JIRA for this problem?



Background: Even thought Ctrl-C on spark-submit seems to work fine, this 
is no options for my Spark automation. Besides general design concerns 
my monitoring JVM may have been restarted after spark-submit has been 
started and therefore I cannot rely on the monitoring application to 
have access to the spark-submit OS process.



Any thoughts are much appreciated,
Christian


Re: Spark Streaming - stream between 2 applications

2015-11-21 Thread Christian
Instead of sending the results of the one spark app directly to the other
one, you could write the results to a Kafka topic which is consumed by your
other spark application.

On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa <saiph.ka...@gmail.com> wrote:

> I think my problem persists whether I use Kafka or sockets. Or am I wrong?
> How would you use Kafka here?
>
> On Fri, Nov 20, 2015 at 7:12 PM, Christian <engr...@gmail.com> wrote:
>
>> Have you considered using Kafka?
>>
>> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa <saiph.ka...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a basic spark streaming application like this:
>>>
>>> «
>>> ...
>>>
>>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>> val rawStreams = (1 to numStreams).map(_ =>
>>>   ssc.rawSocketStream[String](host, port, 
>>> StorageLevel.MEMORY_ONLY_SER)).toArray
>>> val union = ssc.union(rawStreams)
>>>
>>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => {
>>>
>>>   // TODO
>>>
>>> }
>>> ...
>>> »
>>>
>>>
>>> My question is: what is the best and fastest way to send the resulting rdds
>>> as input to be consumed by another spark streaming application?
>>>
>>> I tried to add this code in place of the "TODO" comment:
>>>
>>> «
>>> val serverSocket = new ServerSocket(9998)
>>> while (true) {
>>>   val socket = serverSocket.accept()
>>>   @transient val out = new PrintWriter(socket.getOutputStream)
>>>   try {
>>> rdd.foreach(out.write)
>>>   } catch {
>>> case e: IOException =>
>>>   socket.close()
>>>   }
>>> }
>>> »
>>>
>>>
>>> I also tried to create a thread in the driver application code to launch the
>>> socket server and then share state (the PrintWriter object) between the 
>>> driver program and tasks.
>>> But got an exception saying that task is not serializable - PrintWriter is 
>>> not serializable
>>> (despite the @trasient annotation). I know this is not a very elegant 
>>> solution, but what other
>>> directions should I explore?
>>>
>>> Thanks.
>>>
>>>
>


Spark EC2 script on Large clusters

2015-11-05 Thread Christian
For starters, thanks for the awesome product!

When creating ec2-clusters of 20-40 nodes, things work great. When we
create a cluster with the provided spark-ec2 script, it takes hours. When
creating a 200 node cluster, it takes 2 1/2 hours and for a 500 node
cluster it takes over 5 hours. One other problem we are having is that some
nodes don't come up when the other ones do, the process seems to just move
on, skipping the rsync and any installs on those ones.

My guess as to why it takes so long to set up a large cluster is because of
the use of rsync. What if instead of using rsync, you synched to s3 and
then did a pdsh to pull it down on all of the machines. This is a big deal
for us and if we can come up with a good plan, we might be able help out
with the required changes.

Are there any suggestions on how to deal with some of the nodes not being
ready when the process starts?

Thanks for your time,
Christian


Re: Spark EC2 script on Large clusters

2015-11-05 Thread Christian
Let me rephrase. Emr cost is about twice as much as the spot price, making
it almost 2/3 of the overall cost.
On Thu, Nov 5, 2015 at 11:50 AM Christian <engr...@gmail.com> wrote:

> Hi Johnathan,
>
> We are using EMR now and it's costing way too much. We do spot pricing and
> the emr addon cost is about 2/3 the price of the actual spot instance.
> On Thu, Nov 5, 2015 at 11:31 AM Jonathan Kelly <jonathaka...@gmail.com>
> wrote:
>
>> Christian,
>>
>> Is there anything preventing you from using EMR, which will manage your
>> cluster for you? Creating large clusters would take mins on EMR instead of
>> hours. Also, EMR supports growing your cluster easily and recently added
>> support for shrinking your cluster gracefully (even while jobs are running).
>>
>> ~ Jonathan
>>
>> On Thu, Nov 5, 2015 at 9:48 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Yeah, as Shivaram mentioned, this issue is well-known. It's documented
>>> in SPARK-5189 <https://issues.apache.org/jira/browse/SPARK-5189> and a
>>> bunch of related issues. Unfortunately, it's hard to resolve this issue in
>>> spark-ec2 without rewriting large parts of the project. But if you take a
>>> crack at it and succeed I'm sure a lot of people will be happy.
>>>
>>> I've started a separate project <https://github.com/nchammas/flintrock> --
>>> which Shivaram also mentioned -- which aims to solve the problem of
>>> long launch times and other issues
>>> <https://github.com/nchammas/flintrock#motivation> with spark-ec2. It's
>>> still very young and lacks several critical features, but we are making
>>> steady progress.
>>>
>>> Nick
>>>
>>> On Thu, Nov 5, 2015 at 12:30 PM Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>
>>>> It is a known limitation that spark-ec2 is very slow for large
>>>> clusters and as you mention most of this is due to the use of rsync to
>>>> transfer things from the master to all the slaves.
>>>>
>>>> Nick cc'd has been working on an alternative approach at
>>>> https://github.com/nchammas/flintrock that is more scalable.
>>>>
>>>> Thanks
>>>> Shivaram
>>>>
>>>> On Thu, Nov 5, 2015 at 8:12 AM, Christian <engr...@gmail.com> wrote:
>>>> > For starters, thanks for the awesome product!
>>>> >
>>>> > When creating ec2-clusters of 20-40 nodes, things work great. When we
>>>> create
>>>> > a cluster with the provided spark-ec2 script, it takes hours. When
>>>> creating
>>>> > a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster
>>>> it takes
>>>> > over 5 hours. One other problem we are having is that some nodes
>>>> don't come
>>>> > up when the other ones do, the process seems to just move on,
>>>> skipping the
>>>> > rsync and any installs on those ones.
>>>> >
>>>> > My guess as to why it takes so long to set up a large cluster is
>>>> because of
>>>> > the use of rsync. What if instead of using rsync, you synched to s3
>>>> and then
>>>> > did a pdsh to pull it down on all of the machines. This is a big deal
>>>> for us
>>>> > and if we can come up with a good plan, we might be able help out
>>>> with the
>>>> > required changes.
>>>> >
>>>> > Are there any suggestions on how to deal with some of the nodes not
>>>> being
>>>> > ready when the process starts?
>>>> >
>>>> > Thanks for your time,
>>>> > Christian
>>>> >
>>>>
>>>
>>


Re: Spark EC2 script on Large clusters

2015-11-05 Thread Christian
Hi Johnathan,

We are using EMR now and it's costing way too much. We do spot pricing and
the emr addon cost is about 2/3 the price of the actual spot instance.
On Thu, Nov 5, 2015 at 11:31 AM Jonathan Kelly <jonathaka...@gmail.com>
wrote:

> Christian,
>
> Is there anything preventing you from using EMR, which will manage your
> cluster for you? Creating large clusters would take mins on EMR instead of
> hours. Also, EMR supports growing your cluster easily and recently added
> support for shrinking your cluster gracefully (even while jobs are running).
>
> ~ Jonathan
>
> On Thu, Nov 5, 2015 at 9:48 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Yeah, as Shivaram mentioned, this issue is well-known. It's documented in
>> SPARK-5189 <https://issues.apache.org/jira/browse/SPARK-5189> and a
>> bunch of related issues. Unfortunately, it's hard to resolve this issue in
>> spark-ec2 without rewriting large parts of the project. But if you take a
>> crack at it and succeed I'm sure a lot of people will be happy.
>>
>> I've started a separate project <https://github.com/nchammas/flintrock> --
>> which Shivaram also mentioned -- which aims to solve the problem of long
>> launch times and other issues
>> <https://github.com/nchammas/flintrock#motivation> with spark-ec2. It's
>> still very young and lacks several critical features, but we are making
>> steady progress.
>>
>> Nick
>>
>> On Thu, Nov 5, 2015 at 12:30 PM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> It is a known limitation that spark-ec2 is very slow for large
>>> clusters and as you mention most of this is due to the use of rsync to
>>> transfer things from the master to all the slaves.
>>>
>>> Nick cc'd has been working on an alternative approach at
>>> https://github.com/nchammas/flintrock that is more scalable.
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Thu, Nov 5, 2015 at 8:12 AM, Christian <engr...@gmail.com> wrote:
>>> > For starters, thanks for the awesome product!
>>> >
>>> > When creating ec2-clusters of 20-40 nodes, things work great. When we
>>> create
>>> > a cluster with the provided spark-ec2 script, it takes hours. When
>>> creating
>>> > a 200 node cluster, it takes 2 1/2 hours and for a 500 node cluster it
>>> takes
>>> > over 5 hours. One other problem we are having is that some nodes don't
>>> come
>>> > up when the other ones do, the process seems to just move on, skipping
>>> the
>>> > rsync and any installs on those ones.
>>> >
>>> > My guess as to why it takes so long to set up a large cluster is
>>> because of
>>> > the use of rsync. What if instead of using rsync, you synched to s3
>>> and then
>>> > did a pdsh to pull it down on all of the machines. This is a big deal
>>> for us
>>> > and if we can come up with a good plan, we might be able help out with
>>> the
>>> > required changes.
>>> >
>>> > Are there any suggestions on how to deal with some of the nodes not
>>> being
>>> > ready when the process starts?
>>> >
>>> > Thanks for your time,
>>> > Christian
>>> >
>>>
>>
>


Re: Spark RDD cache persistence

2015-11-05 Thread Christian
I've never had this need and I've never done it. There are options that
allow this. For example, I know there are web apps out there that work like
the spark REPL. One of these I think is called Zepplin. . I've never used
them, but I've seen them demoed. There is also Tachyon that Spark
supports.. Hopefully, that gives you a place to start.
On Thu, Nov 5, 2015 at 9:21 PM Deepak Sharma <deepakmc...@gmail.com> wrote:

> Thanks Christian.
> So is there any inbuilt mechanism in spark or api integration  to other
> inmemory cache products such as redis to load the RDD to these system upon
> program exit ?
> What's the best approach to have long lived RDD cache ?
> Thanks
>
>
> Deepak
> On 6 Nov 2015 8:34 am, "Christian" <engr...@gmail.com> wrote:
>
>> The cache gets cleared out when the job finishes. I am not aware of a way
>> to keep the cache around between jobs. You could save it as an object file
>> to disk and load it as an object file on your next job for speed.
>> On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharma <deepakmc...@gmail.com>
>> wrote:
>>
>>> Hi All
>>> I am confused on RDD persistence in cache .
>>> If I cache RDD , is it going to stay there in memory even if my spark
>>> program completes execution , which created it.
>>> If not , how can I guarantee that RDD is persisted in cache even after
>>> the program finishes execution.
>>>
>>> Thanks
>>>
>>>
>>> Deepak
>>>
>>


Re: Spark RDD cache persistence

2015-11-05 Thread Christian
The cache gets cleared out when the job finishes. I am not aware of a way
to keep the cache around between jobs. You could save it as an object file
to disk and load it as an object file on your next job for speed.
On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharma  wrote:

> Hi All
> I am confused on RDD persistence in cache .
> If I cache RDD , is it going to stay there in memory even if my spark
> program completes execution , which created it.
> If not , how can I guarantee that RDD is persisted in cache even after the
> program finishes execution.
>
> Thanks
>
>
> Deepak
>


streaming and piping to R, sending all data in window to pipe()

2015-07-17 Thread PAULI, KEVIN CHRISTIAN [AG-Contractor/1000]
Spark newbie here, using Spark 1.3.1.

I’m consuming a stream and trying to pipe the data from the entire window to R 
for analysis.  The R algorithm needs the entire dataset from the stream 
(everything in the window) in order to function properly; it can’t be broken up.

So I tried doing a coalesce(1) before calling pipe(), but it still seems to be 
breaking up the data and invoking R, but it still seems to to be breaking up 
the data and invoking R multiple times with small pieces of data.  Is there 
some other approach I should try?

Here’s a small snippet:

val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, 
inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
  .window(duration)
inputs.foreachRDD {
  windowRdd = {
if (windowRdd.count()  0) processWindow(windowRdd)
  }
}

...

  def processWindow(windowRdd: RDD[String]) = {
// call R script to process data
windowRdd.coalesce(1)
val outputsRdd: RDD[String] = 
windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
outputsRdd.cache()

if (outputsRdd.count()  0) processOutputs(outputsRdd)
  }

...

This e-mail message may contain privileged and/or confidential information, and 
is intended to be received only by persons entitled
to receive such information. If you have received this e-mail in error, please 
notify the sender immediately. Please delete it and
all attachments from any servers, hard drives or any other media. Other use of 
this e-mail by you is strictly prohibited.

All e-mails and attachments sent and received are subject to monitoring, 
reading and archival by Monsanto, including its
subsidiaries. The recipient of this e-mail is solely responsible for checking 
for the presence of Viruses or other Malware.
Monsanto, along with its subsidiaries, accepts no liability for any damage 
caused by any such code transmitted by or accompanying
this e-mail or any attachment.


The information contained in this email may be subject to the export control 
laws and regulations of the United States, potentially
including but not limited to the Export Administration Regulations (EAR) and 
sanctions regulations issued by the U.S. Department of
Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this 
information you are obligated to comply with all
applicable U.S. export laws and regulations.


Re: Super slow caching in 1.3?

2015-04-27 Thread Christian Perez
Michael,

There is only one schema: both versions have 200 string columns in one file.

On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:
 Now this is very important:



 “Normal RDDs” refers to “batch RDDs”. However the default in-memory
 Serialization of RDDs which are part of DSTream is “Srialized” rather than
 actual (hydrated) Objects. The Spark documentation states that
 “Serialization” is required for space and garbage collection efficiency (but
 creates higher CPU load) – which makes sense consider the large number of
 RDDs which get discarded in a streaming app



 So what does Data Bricks actually recommend as Object Oriented model for RDD
 elements used in Spark Streaming apps – flat or not and can you provide a
 detailed description / spec of both



 From: Michael Armbrust [mailto:mich...@databricks.com]
 Sent: Thursday, April 16, 2015 7:23 PM
 To: Evo Eftimov
 Cc: Christian Perez; user


 Subject: Re: Super slow caching in 1.3?



 Here are the types that we specialize, other types will be much slower.
 This is only for Spark SQL, normal RDDs do not serialize data that is
 cached.  I'll also not that until yesterday we were missing FloatType

 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154



 Christian, can you provide the schema of the fast and slow datasets?



 On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Michael what exactly do you mean by flattened version/structure here e.g.:

 1. An Object with only primitive data types as attributes
 2. An Object with  no more than one level of other Objects as attributes
 3. An Array/List of primitive types
 4. An Array/List of Objects

 This question is in general about RDDs not necessarily RDDs in the context
 of SparkSQL

 When answering can you also score how bad the performance of each of the
 above options is


 -Original Message-
 From: Christian Perez [mailto:christ...@svds.com]
 Sent: Thursday, April 16, 2015 6:09 PM
 To: Michael Armbrust
 Cc: user
 Subject: Re: Super slow caching in 1.3?

 Hi Michael,

 Good question! We checked 1.2 and found that it is also slow cacheing the
 same flat parquet file. Caching other file formats of the same data were
 faster by up to a factor of ~2. Note that the parquet file was created in
 Impala but the other formats were written by Spark SQL.

 Cheers,

 Christian

 On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com
 wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you
 caching nested data or flat rows?  The in-memory caching is not really
 designed for nested data and so performs pretty slowly here (its just
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org





 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark where do third parties libraries need to be installed under Yarn-client mode

2015-04-24 Thread Christian Perez
To run MLlib, you only need numpy on each node. For additional
dependencies, you can call the spark-submit with --py-files option and
add the .zip or .egg.

https://spark.apache.org/docs/latest/submitting-applications.html

Cheers,

Christian

On Fri, Apr 24, 2015 at 1:56 AM, Hoai-Thu Vuong thuv...@gmail.com wrote:
 I use sudo pip install ... for each machine in cluster. And don't think how
 submit library

 On Fri, Apr 24, 2015 at 4:21 AM dusts66 dustin.davids...@gmail.com wrote:

 I am trying to figure out python library management.  So my question is:
 Where do third party Python libraries(ex. numpy, scipy, etc.) need to
 exist
 if I running a spark job via 'spark-submit' against my cluster in 'yarn
 client' mode.  Do the libraries need to only exist on the client(ie. the
 server executing the driver code) or do the libraries need to exist on the
 datanode/worker nodes where the tasks are executed?  The documentation
 seems
 to indicate that under 'yarn client' the libraries are only need on the
 client machine not the entire cluster.  If the libraries are needed across
 all cluster machines, any suggestions on a deployment strategy or
 dependency
 management model that works well?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-23 Thread Christian S. Perone
All these warnings come from ALS iterations, from flatMap and also from
aggregate, for instance the origin of the state where the flatMap is
showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

And from the aggregate:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)



On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote:

 This is the size of the serialized task closure. Is stage 246 part of
 ALS iterations, or something before or after it? -Xiangrui

 On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone
 christian.per...@gmail.com wrote:
  Hi Sean, thanks for the answer. I tried to call repartition() on the
 input
  with many different sizes and it still continues to show that warning
  message.
 
  On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote:
 
  I think maybe you need more partitions in your input, which might make
  for smaller tasks?
 
  On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
  christian.per...@gmail.com wrote:
   I keep seeing these warnings when using trainImplicit:
  
   WARN TaskSetManager: Stage 246 contains a task of very large size (208
   KB).
   The maximum recommended task size is 100 KB.
  
   And then the task size starts to increase. Is this a known issue ?
  
   Thanks !
  
   --
   Blog | Github | Twitter
   Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great
   big
   joke on me.
 
 
 
 
  --
  Blog | Github | Twitter
  Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
  joke on me.




-- 
Blog http://blog.christianperone.com | Github https://github.com/perone
| Twitter https://twitter.com/tarantulae
Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me.


MLlib - Collaborative Filtering - trainImplicit task size

2015-04-20 Thread Christian S. Perone
I keep seeing these warnings when using trainImplicit:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.

And then the task size starts to increase. Is this a known issue ?

Thanks !

-- 
Blog http://blog.christianperone.com | Github https://github.com/perone
| Twitter https://twitter.com/tarantulae
Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me.


Re: MLlib -Collaborative Filtering

2015-04-19 Thread Christian S. Perone
The easiest way to do that is to use a similarity metric between the
different user factors.

On Sat, Apr 18, 2015 at 7:49 AM, riginos samarasrigi...@gmail.com wrote:

 Is there any way that i can see the similarity table of 2 users in that
 algorithm? by that i mean the similarity between 2 users



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Blog http://blog.christianperone.com | Github https://github.com/perone
| Twitter https://twitter.com/tarantulae
Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
joke on me.


Re: Super slow caching in 1.3?

2015-04-16 Thread Christian Perez
Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing
the same flat parquet file. Caching other file formats of the same
data were faster by up to a factor of ~2. Note that the parquet file
was created in Impala but the other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you caching
 nested data or flat rows?  The in-memory caching is not really designed for
 nested data and so performs pretty slowly here (its just falling back to
 kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Super slow caching in 1.3?

2015-04-06 Thread Christian Perez
Hi all,

Has anyone else noticed very slow time to cache a Parquet file? It
takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
on M2 EC2 instances. Or are my expectations way off...

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: persist(MEMORY_ONLY) takes lot of time

2015-04-02 Thread Christian Perez
+1.

Caching is way too slow.

On Wed, Apr 1, 2015 at 12:33 PM, SamyaMaiti samya.maiti2...@gmail.com wrote:
 Hi Experts,

 I have a parquet dataset of 550 MB ( 9 Blocks) in HDFS. I want to run SQL
 queries repetitively.

 Few questions :

 1. When I do the below (persist to memory after reading from disk), it takes
 lot of time to persist to memory, any suggestions of how to tune this?

  val inputP  = sqlContext.parquetFile(some HDFS path)
  inputP.registerTempTable(sample_table)
  inputP.persist(MEMORY_ONLY)
  val result = sqlContext.sql(some sql query)
  result.count

 Note : Once the data is persisted to memory, it takes fraction of seconds to
 return query result from the second query onwards. So my concern is how to
 reduce the time when the data is first loaded to cache.


 2. I have observed that if I omit the below line,
  inputP.persist(MEMORY_ONLY)
   the first time Query execution is comparatively quick (say it take
 1min), as the load to Memory time is saved, but to my surprise the second
 time I run the same query it takes 30 sec as the inputP is not constructed
 from disk (checked from UI).

  So my question is, Does spark use some kind of internal caching for inputP
 in this scenario?

 Thanks in advance

 Regards,
 Sam



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/persist-MEMORY-ONLY-takes-lot-of-time-tp22343.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: input size too large | Performance issues with Spark

2015-04-02 Thread Christian Perez
To Akhil's point, see Tuning Data structures. Avoid standard collection hashmap.

With fewer machines, try running 4 or 5 cores per executor and only
3-4 executors (1 per node):
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/.
Ought to reduce shuffle performance hit (someone else confirm?)

#7 see default.shuffle.partitions (default: 200)

On Sun, Mar 29, 2015 at 7:57 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Go through this once, if you haven't read it already.
 https://spark.apache.org/docs/latest/tuning.html

 Thanks
 Best Regards

 On Sat, Mar 28, 2015 at 7:33 PM, nsareen nsar...@gmail.com wrote:

 Hi All,

 I'm facing performance issues with spark implementation, and was briefly
 investigating on WebUI logs, i noticed that my RDD size is 55GB  the
 Shuffle Write is 10 GB  Input Size is 200GB. Application is a web
 application which does predictive analytics, so we keep most of our data
 in
 memory. This observation was only for 30mins usage of the application on a
 single user. We anticipate atleast 10-15 users of the application sending
 requests in parallel, which makes me a bit nervous.

 One constraint we have is that we do not have too many nodes in a cluster,
 we may end up with 3-4 machines at best, but they can be scaled up
 vertically each having 24 cores / 512 GB ram etc. which can allow us to
 make
 a virtual 10-15 node cluster.

 Even then the input size  shuffle write is too high for my liking. Any
 suggestions in this regard will be greatly appreciated as there aren't
 much
 resource on the net for handling performance issues such as these.

 Some pointers on my application's data structures  design

 1) RDD is a JavaPairRDD with Key / Value as CustomPOJO containing 3-4
 Hashmaps  Value containing 1 Hashmap
 2) Data is loaded via JDBCRDD during application startup, which also tends
 to take a lot of time, since we massage the data once it is fetched from
 DB
 and then save it as JavaPairRDD.
 3) Most of the data is structured, but we are still using JavaPairRDD,
 have
 not explored the option of Spark SQL though.
 4) We have only one SparkContext which caters to all the requests coming
 into the application from various users.
 5) During a single user session user can send 3-4 parallel stages
 consisting
 of Map / Group By / Join / Reduce etc.
 6) We have to change the RDD structure using different types of group by
 operations since the user can do drill down drill up of the data (
 aggregation at a higher / lower level). This is where we make use of
 Groupby's but there is a cost associated with this.
 7) We have observed, that the initial RDD's we create have 40 odd
 partitions, but post some stage executions like groupby's the partitions
 increase to 200 or so, this was odd, and we havn't figured out why this
 happens.

 In summary we wan to use Spark to provide us the capability to process our
 in-memory data structure very fast as well as scale to a larger volume
 when
 required in the future.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/input-size-too-large-Performance-issues-with-Spark-tp22270.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-20 Thread Christian Perez
Any other users interested in a feature
DataFrame.saveAsExternalTable() for making _useful_ external tables in
Hive, or am I the only one? Bueller? If I start a PR for this, will it
be taken seriously?

On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez christ...@svds.com wrote:
 Hi Yin,

 Thanks for the clarification. My first reaction is that if this is the
 intended behavior, it is a wasted opportunity. Why create a managed
 table in Hive that cannot be read from inside Hive? I think I
 understand now that you are essentially piggybacking on Hive's
 metastore to persist table info between/across sessions, but I imagine
 others might expect more (as I have.)

 We find ourselves wanting to do work in Spark and persist the results
 where other users (e.g. analysts using Tableau connected to
 Hive/Impala) can explore it. I imagine this is very common. I can, of
 course, save it as parquet and create an external table in hive (which
 I will do now), but saveAsTable seems much less useful to me now.

 Any other opinions?

 Cheers,

 C

 On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote:
 I meant table properties and serde properties are used to store metadata of
 a Spark SQL data source table. We do not set other fields like SerDe lib.
 For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
 should not show unrelated stuff like Serde lib and InputFormat. I have
 created https://issues.apache.org/jira/browse/SPARK-6413 to track the
 improvement on the output of DESCRIBE statement.

 On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote:

 Hi Christian,

 Your table is stored correctly in Parquet format.

 For saveAsTable, the table created is not a Hive table, but a Spark SQL
 data source table
 (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
 We are only using Hive's metastore to store the metadata (to be specific,
 only table properties and serde properties). When you look at table
 property, there will be a field called spark.sql.sources.provider and the
 value will be org.apache.spark.sql.parquet.DefaultSource. You can also
 look at your files in the file system. They are stored by Parquet.

 Thanks,

 Yin

 On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education,
 income)
 df.saveAsTable(spark_test_foo)

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col arraystring COMMENT from deserializer
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 spark.sql.sources.schema is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi all,

DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
schema _and_ storage format in the Hive metastore, so that the table
cannot be read from inside Hive. Spark itself can read the table, but
Hive throws a Serialization error because it doesn't know it is
Parquet.

val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income)
df.saveAsTable(spark_test_foo)

Expected:

COLUMNS(
  education BIGINT,
  income BIGINT
)

SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

Actual:

COLUMNS(
  col arraystring COMMENT from deserializer
)

SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

---

Manually changing schema and storage restores access in Hive and
doesn't affect Spark. Note also that Hive's table property
spark.sql.sources.schema is correct. At first glance, it looks like
the schema data is serialized when sent to Hive but not deserialized
properly on receive.

I'm tracing execution through source code... but before I get any
deeper, can anyone reproduce this behavior?

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi Yin,

Thanks for the clarification. My first reaction is that if this is the
intended behavior, it is a wasted opportunity. Why create a managed
table in Hive that cannot be read from inside Hive? I think I
understand now that you are essentially piggybacking on Hive's
metastore to persist table info between/across sessions, but I imagine
others might expect more (as I have.)

We find ourselves wanting to do work in Spark and persist the results
where other users (e.g. analysts using Tableau connected to
Hive/Impala) can explore it. I imagine this is very common. I can, of
course, save it as parquet and create an external table in hive (which
I will do now), but saveAsTable seems much less useful to me now.

Any other opinions?

Cheers,

C

On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote:
 I meant table properties and serde properties are used to store metadata of
 a Spark SQL data source table. We do not set other fields like SerDe lib.
 For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
 should not show unrelated stuff like Serde lib and InputFormat. I have
 created https://issues.apache.org/jira/browse/SPARK-6413 to track the
 improvement on the output of DESCRIBE statement.

 On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote:

 Hi Christian,

 Your table is stored correctly in Parquet format.

 For saveAsTable, the table created is not a Hive table, but a Spark SQL
 data source table
 (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
 We are only using Hive's metastore to store the metadata (to be specific,
 only table properties and serde properties). When you look at table
 property, there will be a field called spark.sql.sources.provider and the
 value will be org.apache.spark.sql.parquet.DefaultSource. You can also
 look at your files in the file system. They are stored by Parquet.

 Thanks,

 Yin

 On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education,
 income)
 df.saveAsTable(spark_test_foo)

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col arraystring COMMENT from deserializer
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 spark.sql.sources.schema is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SV: Pyspark Hbase scan.

2015-03-13 Thread Castberg , René Christian
?Sorry forgot to attach traceback.


Regards


Rene Castberg


Fra: Castberg, René Christian
Sendt: 13. mars 2015 07:13
Til: user@spark.apache.org
Kopi: gen tang
Emne: SV: Pyspark Hbase scan.


?Hi,


I have now successfully managed to test this in a local spark session.

But i am having a huge programming getting this to work with Horton Works 
technical preview.  I think that there is an incompatability with the way YARN 
has been compiled.


After changing the hbase version, and adding:

resolvers += Hortonworks Releases at 
http://repo.hortonworks.com/content/repositories/releases/;


I get the attached traceback.


Any help in how to compile this jar such that it works would be greatly 
appreciated.


Regards


Rene Castberg



Fra: gen tang gen.tan...@gmail.com
Sendt: 5. februar 2015 11:38
Til: Castberg, René Christian
Kopi: user@spark.apache.org
Emne: Re: Pyspark Hbase scan.

Hi,

In fact, this pull https://github.com/apache/spark/pull/3920 is to do Hbase 
scan. However, it is not merged yet.
You can also take a look at the example code at 
http://spark-packages.org/package/20 which is using scala and python to read 
data from hbase.

Hope this can be helpful.

Cheers
Gen



On Thu, Feb 5, 2015 at 11:11 AM, Castberg, René Christian 
rene.castb...@dnvgl.commailto:rene.castb...@dnvgl.com wrote:
?Hi,

I am trying to do a hbase scan and read it into a spark rdd using pyspark. I 
have successfully written data to hbase from pyspark, and been able to read a 
full table from hbase using the python example code. Unfortunately I am unable 
to find any example code for doing an HBase scan and read it into a spark rdd 
from pyspark.

I have found a scala example :
http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark

But i can't find anything on how to do this from python. Can anybody shed some 
light on how (and if) this can be done??

Regards

Rene Castberg?


**
This e-mail and any attachments thereto may contain confidential information 
and/or information protected by intellectual property rights for the exclusive 
attention of the intended addressees named above. If you have received this 
transmission in error, please immediately notify the sender by return e-mail 
and delete this message and its attachments. Unauthorized use, copying or 
further full or partial distribution of this e-mail or its contents is 
prohibited.
**


**
This e-mail and any attachments thereto may contain confidential information 
and/or information protected by intellectual property rights for the exclusive 
attention of the intended addressees named above. If you have received this 
transmission in error, please immediately notify the sender by return e-mail 
and delete this message and its attachments. Unauthorized use, copying or 
further full or partial distribution of this e-mail or its contents is 
prohibited.
**
$ /hadoop-dist/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --driver-class-path 
/usr/hdp/current/share/lzo/0.6.0/lib/hadoop-lzo-0.6.0.jar:/home/recast/spark_hbase/target/scala-2.10/spark_hbase-assembly-1.0.jar
  --jars 
/hadoop-dist/spark-1.2.1-bin-hadoop2.4/lib/spark-examples-1.2.1-hadoop2.4.0.jar 
--driver-library-path 
/usr/hdp/current/share/lzo/0.6.0/lib/native/Linux-amd64-64/ 
AIS_count_msb_hbase.py  
Spark assembly has been built with Hive, including Datanucleus jars on classpath
2.7.9 (default, Feb 25 2015, 14:55:10) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-11)]
/hadoop-dist/Python/lib/python2.7/site-packages/setuptools-12.3-py2.7.egg/pkg_resources/__init__.py:1224:
 UserWarning: /tmp/python-eggs is writable by group/others and vulnerable to 
attack when used with get_resource_filename. Consider a more secure location 
(set with .set_extraction_path or the PYTHON_EGG_CACHE environment variable).
Reading config file for : smalldata01.hdp
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/home/recast/spark_hbase/target/scala-2.10/spark_hbase-assembly-1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/hadoop-dist/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/03/13 06:10:34 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/03/13 06:10:34 WARN YarnClientSchedulerBackend: NOTE: SPARK_WORKER_INSTANCES 
is deprecated. Use

SV: Pyspark Hbase scan.

2015-03-13 Thread Castberg , René Christian
?Hi,


I have now successfully managed to test this in a local spark session.

But i am having a huge programming getting this to work with Horton Works 
technical preview.  I think that there is an incompatability with the way YARN 
has been compiled.


After changing the hbase version, and adding:

resolvers += Hortonworks Releases at 
http://repo.hortonworks.com/content/repositories/releases/;


I get the attached traceback.


Any help in how to compile this jar such that it works would be greatly 
appreciated.


Regards


Rene Castberg



Fra: gen tang gen.tan...@gmail.com
Sendt: 5. februar 2015 11:38
Til: Castberg, René Christian
Kopi: user@spark.apache.org
Emne: Re: Pyspark Hbase scan.

Hi,

In fact, this pull https://github.com/apache/spark/pull/3920 is to do Hbase 
scan. However, it is not merged yet.
You can also take a look at the example code at 
http://spark-packages.org/package/20 which is using scala and python to read 
data from hbase.

Hope this can be helpful.

Cheers
Gen



On Thu, Feb 5, 2015 at 11:11 AM, Castberg, René Christian 
rene.castb...@dnvgl.commailto:rene.castb...@dnvgl.com wrote:
?Hi,

I am trying to do a hbase scan and read it into a spark rdd using pyspark. I 
have successfully written data to hbase from pyspark, and been able to read a 
full table from hbase using the python example code. Unfortunately I am unable 
to find any example code for doing an HBase scan and read it into a spark rdd 
from pyspark.

I have found a scala example :
http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark

But i can't find anything on how to do this from python. Can anybody shed some 
light on how (and if) this can be done??

Regards

Rene Castberg?


**
This e-mail and any attachments thereto may contain confidential information 
and/or information protected by intellectual property rights for the exclusive 
attention of the intended addressees named above. If you have received this 
transmission in error, please immediately notify the sender by return e-mail 
and delete this message and its attachments. Unauthorized use, copying or 
further full or partial distribution of this e-mail or its contents is 
prohibited.
**


**
This e-mail and any attachments thereto may contain confidential information 
and/or information protected by intellectual property rights for the exclusive 
attention of the intended addressees named above. If you have received this 
transmission in error, please immediately notify the sender by return e-mail 
and delete this message and its attachments. Unauthorized use, copying or 
further full or partial distribution of this e-mail or its contents is 
prohibited.
**


New guide on how to write a Spark job in Clojure

2015-02-24 Thread Christian Betz
Hi all,

Maybe some of you are interested: I wrote a new guide on how to start using 
Spark from Clojure. The tutorial covers

  *   setting up a project,
  *   doing REPL- or Test Driven Development of Spark jobs
  *   Running Spark jobs locally.

Just read it on 
https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html.

Comments (and Pull requests) are very welcome.

Sincerly

Chris



Re: Datastore HDFS vs Cassandra

2015-02-11 Thread Christian Betz
Hi

Regarding the Cassandra Data model, there's an excellent post on the ebay tech 
blog: 
http://www.ebaytechblog.com/2012/07/16/cassandra-data-modeling-best-practices-part-1/.
 There's also a slideshare for this somewhere.

Happy hacking

Chris

Von: Franc Carter 
franc.car...@rozettatech.commailto:franc.car...@rozettatech.com
Datum: Mittwoch, 11. Februar 2015 10:03
An: Paolo Platter paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it
Cc: Mike Trienis mike.trie...@orcsol.commailto:mike.trie...@orcsol.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Betreff: Re: Datastore HDFS vs Cassandra


One additional comment I would make is that you should be careful with Updates 
in Cassandra, it does support them but large amounts of Updates (i.e changing 
existing keys) tends to cause fragmentation. If you are (mostly) adding new 
keys (e.g new records in the the time series) then Cassandra can be excellent

cheers


On Wed, Feb 11, 2015 at 6:13 PM, Paolo Platter 
paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it wrote:
Hi Mike,

I developed a Solution with cassandra and spark, using DSE.
The main difficult is about cassandra, you need to understand very well its 
data model and its Query patterns.
Cassandra has better performance than hdfs and it has DR and stronger 
availability.
Hdfs is a filesystem, cassandra is a dbms.
Cassandra supports full CRUD without acid.
Hdfs is more flexible than cassandra.

In my opinion, if you have a real time series, go with Cassandra paying 
attention at your reporting data access patterns.

Paolo

Inviata dal mio Windows Phone

Da: Mike Trienismailto:mike.trie...@orcsol.com
Inviato: ?11/?02/?2015 05:59
A: user@spark.apache.orgmailto:user@spark.apache.org
Oggetto: Datastore HDFS vs Cassandra

Hi,

I am considering implement Apache Spark on top of Cassandra database after
listing to related talk and reading through the slides from DataStax. It
seems to fit well with our time-series data and reporting requirements.

http://www.slideshare.net/patrickmcfadin/apache-cassandra-apache-spark-for-time-series-data

Does anyone have any experiences using Apache Spark and Cassandra, including
limitations (and or) technical difficulties? How does Cassandra compare with
HDFS and what use cases would make HDFS more suitable?

Thanks, Mike.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Datastore-HDFS-vs-Cassandra-tp21590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




--

Franc Carter | Systems Architect | Rozetta Technology

franc.car...@rozettatech.com mailto:franc.car...@rozettatech.com | 
www.rozettatechnology.comhttp://www.rozettatechnology.com/

Tel: +61 2 8355 2515

Level 4, 55 Harrington St, The Rocks NSW 2000

PO Box H58, Australia Square, Sydney NSW 1215

AUSTRALIA



Pyspark Hbase scan.

2015-02-05 Thread Castberg , René Christian
?Hi,

I am trying to do a hbase scan and read it into a spark rdd using pyspark. I 
have successfully written data to hbase from pyspark, and been able to read a 
full table from hbase using the python example code. Unfortunately I am unable 
to find any example code for doing an HBase scan and read it into a spark rdd 
from pyspark.

I have found a scala example :
http://stackoverflow.com/questions/25189527/how-to-process-a-range-of-hbase-rows-using-spark

But i can't find anything on how to do this from python. Can anybody shed some 
light on how (and if) this can be done??

Regards

Rene Castberg?


**
This e-mail and any attachments thereto may contain confidential information 
and/or information protected by intellectual property rights for the exclusive 
attention of the intended addressees named above. If you have received this 
transmission in error, please immediately notify the sender by return e-mail 
and delete this message and its attachments. Unauthorized use, copying or 
further full or partial distribution of this e-mail or its contents is 
prohibited.
**


Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-16 Thread Christian Chua
Is 1.0.8 working for you ?

You indicated your last known good version is 1.0.0

Maybe we can track down where it broke. 



 On Sep 16, 2014, at 12:25 AM, Paul Wais pw...@yelp.com wrote:
 
 Thanks Christian!  I tried compiling from source but am still getting the 
 same hadoop client version error when reading from HDFS.  Will have to poke 
 deeper... perhaps I've got some classpath issues.  FWIW I compiled using:
 
 $ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m mvn 
 -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package 
 
 and hadoop 2.3 / cdh5 from 
 http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz
 
 
 
 
 
 On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote:
 Hi Paul.
 
 I would recommend building your own 1.1.0 distribution.
 
  ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn 
 -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
 
 
 
 I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange 
 behavior where
 
  spark-submit --master yarn-cluster ...
 
 will work, but
 
  spark-submit --master yarn-client ...
 
 will fail.
 
 
 But on the personal build obtained from the command above, both will then 
 work.
 
 
 -Christian
 
 
 
 
 On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote:
 
 Dear List,
 
 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:
 
 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...
 
 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:
 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )
 
 
 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.
 
 
 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).
 
 
 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz
 
 
 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)
 
 Thanks for any help anybody can give me here!
 -Paul
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-15 Thread Christian Chua
Hi Paul.

I would recommend building your own 1.1.0 distribution.

./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn 
-Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests



I downloaded the Pre-build for Hadoop 2.4 binary, and it had this strange 
behavior where

spark-submit --master yarn-cluster ...

will work, but

spark-submit --master yarn-client ...

will fail.


But on the personal build obtained from the command above, both will then work.


-Christian




On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,
 
 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:
 
 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...
 
 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:
 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )
 
 
 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.
 
 
 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).
 
 
 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz
 
 
 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)
 
 Thanks for any help anybody can give me here!
 -Paul
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



K-NN by efficient sparse matrix product

2014-05-28 Thread Christian Jauvin
Hi,

I'm new to Spark and Hadoop, and I'd like to know if the following
problem is solvable in terms of Spark's primitives.

To compute the K-nearest neighbours of a N-dimensional dataset, I can
multiply my very large normalized sparse matrix by its transpose. As
this yields all pairwise distance values (N x N), I can then sort each
row and only keep the K highest elements for each, resulting in a N x
K dense matrix.

As this Quora answer suggests:

http://qr.ae/v03lY

rather than the row-wise dot product, which would be O(N^2), it's
better to compute the sum of the column outer products, which is O(N x
K^2).

However, given the number of non-zero elements in the resulting
matrix, it seems I could not afford to first perform the full
multiplication (N x N) and then prune it afterward (N x K).. So I need
a way to prune it on the fly.

The original algorithm I came up with is roughly this, for an input matrix M:

for each row i:
__outer_i = [0] * N
__for j in nonzero elements of row i:
for k in nonzero elements of col j:
__outer_i[k] += M[i][j] * M[k][j]
__nearest_i = {sort outer_i and keep best K}

which can be parallelized in an embarrassing way, i.e. each compute
node can simply process a slice of the the rows.

Would there be a way to do something similar (or related) with Spark?

Christian


Re: K-NN by efficient sparse matrix product

2014-05-28 Thread Christian Jauvin
Thank you for your answer. Would you have by any chance some example
code (even fragmentary) that I could study?

On 28 May 2014 14:04, Tom Vacek minnesota...@gmail.com wrote:
 Maybe I should add: if you can hold the entire matrix in memory, then this
 is embarrassingly parallel.  If not, then the complications arise.


 On Wed, May 28, 2014 at 1:00 PM, Tom Vacek minnesota...@gmail.com wrote:

 The problem with matrix multiplication is that the amount of data blows up
 between the mapper and the reducer, and the shuffle operation is very slow.
 I have not ever tried this, but the shuffle can be avoided by making use of
 the broadcast.  Say we have M = L*R.  We do a column decomposition on R, and
 we collect rows of L to the master and broadcast them (in manageably-sized
 blocks).  Each worker does a dot product and discards the row block when
 finished.  In theory, this has complexity max(nnz(L)*log p, nnz(L)*n/p).  I
 have to warn though: when I played with matrix multiplication, I was getting
 nowhere near serial performance.


 On Wed, May 28, 2014 at 11:00 AM, Christian Jauvin cjau...@gmail.com
 wrote:

 Hi,

 I'm new to Spark and Hadoop, and I'd like to know if the following
 problem is solvable in terms of Spark's primitives.

 To compute the K-nearest neighbours of a N-dimensional dataset, I can
 multiply my very large normalized sparse matrix by its transpose. As
 this yields all pairwise distance values (N x N), I can then sort each
 row and only keep the K highest elements for each, resulting in a N x
 K dense matrix.

 As this Quora answer suggests:

 http://qr.ae/v03lY

 rather than the row-wise dot product, which would be O(N^2), it's
 better to compute the sum of the column outer products, which is O(N x
 K^2).

 However, given the number of non-zero elements in the resulting
 matrix, it seems I could not afford to first perform the full
 multiplication (N x N) and then prune it afterward (N x K).. So I need
 a way to prune it on the fly.

 The original algorithm I came up with is roughly this, for an input
 matrix M:

 for each row i:
 __outer_i = [0] * N
 __for j in nonzero elements of row i:
 for k in nonzero elements of col j:
 __outer_i[k] += M[i][j] * M[k][j]
 __nearest_i = {sort outer_i and keep best K}

 which can be parallelized in an embarrassing way, i.e. each compute
 node can simply process a slice of the the rows.

 Would there be a way to do something similar (or related) with Spark?

 Christian





Re: pyspark and Python virtual enviroments

2014-03-05 Thread Christian
Thanks Bryn.


On Wed, Mar 5, 2014 at 9:00 PM, Bryn Keller xol...@xoltar.org wrote:

 Hi Christian,

 The PYSPARK_PYTHON environment variable specifies the python executable to
 use for pyspark. You can put the path to a virtualenv's python executable
 and it will work fine. Remember you have to have the same installation at
 the same path on each of your cluster nodes for pyspark to work. If you're
 creating the spark context yourself in a python application, you can use
 os.environ['PYSPARK_PYTHON'] = sys.executable before creating your spark
 context.

 Hope that helps,
 Bryn


 On Wed, Mar 5, 2014 at 4:54 AM, Christian chri...@gmail.com wrote:

 Hello,

 I usually create different python virtual environments for different
 projects to avoid version conflicts and skip the requirement to be root to
 install libs.

 How can I specify to pyspark to activate a virtual environment before
 executing the tasks ?

 Further info on virtual envs:
 http://virtualenv.readthedocs.org/en/latest/virtualenv.html

 Thanks in advance,
 Christian