RE: Apache Spark Operator for Kubernetes?

2022-10-28 Thread Jim Halfpenny
Hi Clayton,
I’m not aware of an official Apache operator, but I can recommend taking a look 
a the one we’re created at Stackable.

https://github.com/stackabletech/spark-k8s-operator

It’s actively maintained and we’d be happy to receive feedback if you have 
feature requests.

Kind regards,
Jim


On 2022/10/14 15:28:55 Clayton Wohl wrote:
> My company has been exploring the Google Spark Operator for running Spark
> jobs on a Kubernetes cluster, but we've found lots of limitations and
> problems, and the product seems weakly supported.
> 
> Is there any official Apache option, or plans for such an option, to run
> Spark jobs on Kubernetes? Is there perhaps an official Apache Spark
> Operator in the works?
> 
> We currently run jobs on both Databricks and on Amazon EMR, but it would be
> nice to have a good option for running Spark directly on our Kubernetes
> clusters.
> 
> thanks :)
> 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
I got it working. It's much faster.

If someone else wants to try it I:
1) Was already using the code from the Presto S3 Hadoop FileSystem
implementation modified to sever it from the rest of the Presto codebase.
2) I extended it and overrode the method "keyFromPath" so that anytime the
Path referred to a "_temporary" parquet file "part" it returned a "key" to
the final location of the file.
3) I registered the filesystem through sparkContext.hadoopConfiguration by
setting fs.s3.impl, fs.s3n.impl, and fs.s3a.impl.

I realize I'm risking a file corruption but it's WY faster than it was.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
). 

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations
. FWIW, it doesn't. I'm going to modify that experiment to override the key
name like before except actually mode the file, keep track of the state, and
override the rename method.

The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet. 2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver. But it should do until there's a better
solution in the Hadoop committer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-20 Thread Jim Carroll
Thanks. In the meantime I might just write a custom file system that maps
writes to parquet file parts to their final locations and then skips the
move. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-20 Thread Jim Carroll
I have this exact issue. I was going to intercept the call in the filesystem
if I had to (since we're using the S3 filesystem from Presto anyway) but if
there's simply a way to do this correctly I'd much prefer it. This basically
doubles the time to write parquet files to s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: best spark spatial lib?

2017-10-10 Thread Jim Hughes

Hi all,

GeoMesa integrates with Spark SQL and allows for queries like:

select * from chicago where case_number = 1 and st_intersects(geom, 
st_makeBox2d(st_point(-77, 38), st_point(-76, 39)))


GeoMesa does this by calling package protected Spark methods to 
implement geospatial user defined types and functions.


Cheers,

Jim

On 10/10/2017 11:30 AM, Georg Heiler wrote:

What about someting like gromesa?
Anastasios Zouzias <zouz...@gmail.com <mailto:zouz...@gmail.com>> 
schrieb am Di. 10. Okt. 2017 um 15:29:


Hi,

Which spatial operations do you require exactly? Also, I don't
follow what you mean by combining logical operators?

I have created a library that wraps Lucene's spatial functionality
here: https://github.com/zouzias/spark-lucenerdd/wiki/Spatial-search

You could give a try to the library, it supports intersections /
within /  etc. Ideally, I try to push all spatial Lucene features
in the library.

See also,
https://github.com/zouzias/spark-lucenerdd/wiki/Related-Work for
related libraries.

Best,
Anastasios


On Tue, Oct 10, 2017 at 11:21 AM, Imran Rajjad <raj...@gmail.com
<mailto:raj...@gmail.com>> wrote:

I need to have a location column inside my Dataframe so that I
can do spatial queries and geometry operations. Are there any
third-party packages that perform this kind of operations. I
have seen a few like Geospark and megalan but they don't
support operations where spatial and logical operators can be
combined.

regards,
Imran

-- 
I.R





-- 
-- Anastasios Zouzias






Spark on mesos in docker not getting parameters

2016-08-09 Thread Jim Carroll
I'm running spark 2.0.0 on Mesos using spark.mesos.executor.docker.image to
point to a docker container that I built with the Spark installation.

Everything is working except the Spark client process that's started inside
the container doesn't get any of my parameters I set in the spark config in
the driver.

I set spark.executor.extraJavaOptions and spark.executor.extraClassPath in
the driver and they don't get passed all the way through. Here is a capture
of the chain of processes that are started on the mesos slave, in the docker
container:

root  1064  1051  0 12:46 ?00:00:00 docker -H
unix:///var/run/docker.sock run --cpu-shares 8192 --memory 4723834880 -e
SPARK_CLASSPATH=[path to my jar] -e SPARK_EXECUTOR_OPTS=
-Daws.accessKeyId=[myid] -Daws.secretKey=[mykey] -e SPARK_USER=root -e
SPARK_EXECUTOR_MEMORY=4096m -e MESOS_SANDBOX=/mnt/mesos/sandbox -e
MESOS_CONTAINER_NAME=mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605
-v
/tmp/mesos/slaves/90e2c720-1e45-4dbc-8271-f0c47a33032a-S0/frameworks/f5794f8a-b56f-4958-b906-f05c426dcef0-0001/executors/0/runs/772f8080-6278-4a35-9e57-0009787ac605:/mnt/mesos/sandbox
--net host --entrypoint /bin/sh --name
mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605
[my docker image] -c  "/opt/spark/./bin/spark-class"
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1193  1175  0 12:46 ?00:00:00 /bin/sh -c 
"/opt/spark/./bin/spark-class"
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1208  1193  0 12:46 ?00:00:00 bash
/opt/spark/./bin/spark-class
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1213  1208  0 12:46 ?00:00:00 bash
/opt/spark/./bin/spark-class
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1215  1213  0 12:46 ?00:00:00
/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx128m -cp /opt/spark/jars/*
org.apache.spark.launcher.Main
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

Notice, in the initial process started by mesos both the SPARK_CLASSPATH is
set to the value of spark.executor.extraClassPath and the -D options are set
as I set them on spark.executor.extraJavaOptions (in this case, to my aws
creds) in the drive configuration.

However, they are missing in subsequent child processes and the final java
process started doesn't contain them either.

I "fixed" the classpath problem by putting my jar in /opt/spark/jars
(/opt/spark is the location I have spark installed in the docker container).

Can someone tell me what I'm missing?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-mesos-in-docker-not-getting-parameters-tp27500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How do I download 2.0? The main download page isn't showing it?

2016-07-27 Thread Jim O'Flaherty
Nevermind, it literally just appeared right after I posted this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-download-2-0-The-main-download-page-isn-t-showing-it-tp27420p27421.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How do I download 2.0? The main download page isn't showing it?

2016-07-27 Thread Jim O'Flaherty
How do I download 2.0? The main download page isn't showing it? And all the
other download links point to the same single download page.

This is the one I end up at:
http://spark.apache.org/downloads.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-download-2-0-The-main-download-page-isn-t-showing-it-tp27420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Non-classification neural networks

2016-03-27 Thread Jim Carroll
Hello all,

We were using the old "Artificial Neural Network" :
https://github.com/apache/spark/pull/1290

This code appears to have been incorporated in 1.5.2 but it's only exposed
publicly via the MultilayerPerceptronClassifier. Is there a way to use the
old feedforward/backprop non-classification functionality? It appears to be
buried in private classes and it's not obvious to me if the
MultilayerPerceptronClassifier can be used without the classification. The
doc says "Number of outputs has to be equal to the total number of labels." 

What if the output is continuous and you want to simply do prediction?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Non-classification-neural-networks-tp26604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2 memory error

2016-02-02 Thread Jim Green
Look at part#3 in below blog:
http://www.openkb.info/2015/06/resource-allocation-configurations-for.html

You may want to increase the executor memory, not just the
spark.yarn.executor.memoryOverhead.

On Tue, Feb 2, 2016 at 2:14 PM, Stefan Panayotov  wrote:

> For the memoryOvethead I have the default of 10% of 16g, and Spark version
> is 1.5.2.
>
>
>
> Stefan Panayotov, PhD
> Sent from Outlook Mail for Windows 10 phone
>
>
>
>
> *From: *Ted Yu 
> *Sent: *Tuesday, February 2, 2016 4:52 PM
> *To: *Jakob Odersky 
> *Cc: *Stefan Panayotov ; user@spark.apache.org
> *Subject: *Re: Spark 1.5.2 memory error
>
>
>
> What value do you use for spark.yarn.executor.memoryOverhead ?
>
>
>
> Please see https://spark.apache.org/docs/latest/running-on-yarn.html for
> description of the parameter.
>
>
>
> Which Spark release are you using ?
>
>
>
> Cheers
>
>
>
> On Tue, Feb 2, 2016 at 1:38 PM, Jakob Odersky  wrote:
>
> Can you share some code that produces the error? It is probably not
> due to spark but rather the way data is handled in the user code.
> Does your code call any reduceByKey actions? These are often a source
> for OOM errors.
>
>
> On Tue, Feb 2, 2016 at 1:22 PM, Stefan Panayotov 
> wrote:
> > Hi Guys,
> >
> > I need help with Spark memory errors when executing ML pipelines.
> > The error that I see is:
> >
> >
> > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 32.0 in
> > stage 32.0 (TID 3298)
> >
> >
> > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 12.0 in
> > stage 32.0 (TID 3278)
> >
> >
> > 16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720) called
> with
> > curMem=296303415, maxMem=8890959790
> >
> >
> > 16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored as
> bytes in
> > memory (estimated size 1911.9 MB, free 6.1 GB)
> >
> >
> > 16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15:
> > SIGTERM
> >
> >
> > 16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in stage 32.0
> (TID
> > 3278)
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >
> >at java.util.Arrays.copyOf(Arrays.java:2271)
> >
> >
> >at
> > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> >
> >
> >at
> >
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
> >
> >
> >at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > 16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called
> >
> >
> > 16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage 32.0 (TID
> > 3298). 2004728720 bytes result sent via BlockManager)
> >
> >
> > 16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught
> exception in
> > thread Thread[Executor task launch worker-8,5,main]
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >
> >at java.util.Arrays.copyOf(Arrays.java:2271)
> >
> >
> >at
> > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> >
> >
> >at
> >
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
> >
> >
> >at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > 16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping azure-file-system
> metrics
> > system...
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread interrupted.
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics
> system
> > stopped.
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics
> system
> > shutdown complete.
> >
> >
> >
> >
> >
> > And …..
> >
> >
> >
> >
> >
> > 16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy: Opening
> proxy
> > : 10.0.0.5:30050
> >
> >
> > 16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container
> > container_1454421662639_0011_01_05 (state: COMPLETE, exit status:
> -104)
> >
> >
> > 16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for
> > exceeding memory limits. 16.8 GB of 16.5 GB physical memory used.
> Consider
> > boosting spark.yarn.executor.memoryOverhead.
> >
> >
> > 16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1 executor
> > 

Re: spark.kryo.classesToRegister

2016-01-28 Thread Jim Lohse
You are only required to add classes to Kryo (compulsorily) if you use a 
specific setting:


//require registration of all classes with Kyro 
.set("spark.kryo.registrationRequired","true")

Here's an example of my setup, I think this is the best approach because 
it forces me to really think about what I am serializing:


// for kyro serializer it wants to register all classes that need to be 
serialized Class[] kryoClassArray = new Class[]{DropResult.class, 
DropEvaluation.class, PrintHetSharing.class}; SparkConf sparkConf = new 
SparkConf() .setAppName("MyAppName") .setMaster(spark://ipaddress:7077) 
// now for the Kryo stuff .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") //require registration of 
all classes with Kyro .set("spark.kryo.registrationRequired", "true") // 
don't forget to register ALL classes or will get error 
.registerKryoClasses(kryoClassArray);





On 01/27/2016 12:58 PM, Shixiong(Ryan) Zhu wrote:
It depends. The default Kryo serializer cannot handle all cases. If 
you encounter any issue, you can follow the Kryo doc to set up custom 
serializer: 
https://github.com/EsotericSoftware/kryo/blob/master/README.md
On Wed, Jan 27, 2016 at 3:13 AM, amit tewari > wrote:


This is what I have added in my code:

rdd.persist(StorageLevel.MEMORY_ONLY_SER())

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");

Do I compulsorily need to do anything via
: spark.kryo.classesToRegister?

Or the above code sufficient to achieve performance gain using
Kryo serialization?

Thanks

Amit



Re: Fwd: how to submit multiple jar files when using spark-submit script in shell?

2016-01-12 Thread Jim Lohse
Thanks for your answer, you are correct, it's just a different approach 
than the one I am asking for :)


Building an uber- or assembly- jar goes against the idea of placing the 
jars on all workers. Uber-jars increase network traffic, using local:/ 
in the classpath reduces network traffic.


Eventually, depending on uber-jars can run into various problems.

Really the question is narrowly geared toward understand what arguments 
can setup the classpath using the --jars argument. Using an uber-jar is 
a workaround, true, but with downsides.


Thanks!

On 01/12/2016 12:06 AM, UMESH CHAUDHARY wrote:



Could you build a fat jar by including all your dependencies along 
with you application. See here 
 and 
here 
 . 



Also:
/*So this application-jar can point to a directory and will be 
expanded? Or

needs to be a path to a single specific jar?*/
/
/
*This will be path to a single specific JAR.*

On Tue, Jan 12, 2016 at 12:04 PM, jiml > wrote:


Question is: Looking for all the ways to specify a set of jars
using --jars
on spark-submit

I know this is old but I am about to submit a proposed docs change on
--jars, and I had an issue with --jars today

When this user submitted the following command line, is that a
proper way to
reference a jar?

hdfs://master:8000/srcdata/kmeans  (is that a directory? or a jar that
doesn't end with .jar? I have not gotten into the machine learning
libs yet
to recognize this)

I know the docs say, "Path to a bundled jar including your
application and
all dependencies. The URL must be globally visible inside of your
cluster,
for instance, an hdfs:// path or a file:// path that is present on all
nodes."

*So this application-jar can point to a directory and will be
expanded? Or
needs to be a path to a single specific jar?*

I ask because when I was testing --jars today, we had to
explicitly provide
a path to each jar:

//usr/local/spark/bin/spark-submit --class
jpsgcs.thold.PipeLinkageData

---jars=local:/usr/local/spark/jars/groovy-all-2.3.3.jar,local:/usr/local/spark/jars/guava-14.0.1.jar,local:/usr/local/spark/jars/jopt-simple-4.6.jar,local:/usr/local/spark/jars/jpsgcs-core-1.0.8-2.jar,local:/usr/local/spark/jars/jpsgcs-pipe-1.0.6-7.jar
/usr/local/spark/jars/thold-0.0.1-1.jar/

(The only way I figured out to use the commas was a StackOverflow
answer
that led me to look beyond the docs to the command line:
spark-submit --help
results in :

 --jars JARS Comma-separated list of local jars to
include
on the driver
  and executor classpaths.


And it seems that we do not need to put the main jar in the --jars
argument,
I have not tested yet if other classes in the application-jar
(/usr/local/spark/jars/thold-0.0.1-1.jar) are shipped to workers,
or if I
need to put the application-jar in the --jars path to get classes
not named
after --class to be seen?

Thanks for any ideas




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/how-to-submit-multiple-jar-files-when-using-spark-submit-script-in-shell-tp16662p25942.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org








Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread Jim Lohse
Hey Python 2.6 don't let the door hit you on the way out! haha Drop It 
No Problem


On 01/05/2016 12:17 AM, Reynold Xin wrote:
Does anybody here care about us dropping support for Python 2.6 in 
Spark 2.0?


Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json 
parsing) when compared with Python 2.7. Some libraries that Spark 
depend on stopped supporting 2.6. We can still convince the library 
maintainers to support 2.6, but it will be extra work. I'm curious if 
anybody still uses Python 2.6 to run Spark.


Thanks.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: feedback on the use of Spark’s gateway hidden REST API (standalone cluster mode) for application submission

2016-01-02 Thread Jim Lohse
There is a lot of interesting info about this API here: 
https://issues.apache.org/jira/browse/SPARK-5388


I got that from a comment thread on the last link in your PR. Thanks for 
bringing this up! I knew you could check status via REST per 
http://spark.apache.org/docs/latest/monitoring.html#rest-api but the 
"REST" of the story was off my radar screen, pardon the pun :)


Jim

On 01/02/2016 09:14 AM, HILEM Youcef wrote:


Happy new year.

I solicit community feedback on the use of Spark’s gateway hidden REST 
API (standalone cluster mode) for application submission.


We already use status checking and cancellation in our ansible scripts.

I also opened a ticket to make this API public 
(https://issues.apache.org/jira/browse/SPARK-12528).


Thank you in advance.

Youcef.


Post-scriptum La Poste

Ce message est confidentiel. Sous reserve de tout accord conclu par
ecrit entre vous et La Poste, son contenu ne represente en aucun cas un
engagement de la part de La Poste. Toute publication, utilisation ou
diffusion, meme partielle, doit etre autorisee prealablement. Si vous
n'etes pas destinataire de ce message, merci d'en avertir 
immediatement l'expediteur.






Re: Dynamic jar loading

2015-12-18 Thread Jim Lohse
I am going to say no, but have not actually tested this. Just going on 
this line in the docs:


http://spark.apache.org/docs/latest/configuration.html

|spark.driver.extraClassPath| 	(none) 	Extra classpath entries to 
prepend to the classpath of the driver.
/Note:/ In client mode, this config must not be set through the 
|SparkConf| directly in your application, because the driver JVM has 
already started at that point. Instead, please set this through the 
|--driver-class-path| command line option or in your default properties 
file.





On 12/17/2015 07:53 AM, amarouni wrote:

Hello guys,

Do you know if the method SparkContext.addJar("file:///...") can be used
on a running context (an already started spark-shell) ?
And if so, does it add the jar to the class-path of the Spark workers
(Yarn containers in case of yarn-client) ?

Thanks,

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Confirm this won't parallelize/partition?

2015-11-28 Thread Jim Lohse
Hi, I got a good answer on the main question elsewhere, would anyone 
please confirm the first code is the right approach? For a MVCE I am 
trying to adapt this example and it's seems like I am having Java issues 
with types:


(but this is basically the right approach?)

int count = spark.parallelize(makeRange(1, NUM_SAMPLES)).filter(new 
Function<Integer, Boolean>() {

  public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
return x*x + y*y < 1;
  }
}).count();
System.out.println("Pi is roughly " + 4 * count / NUM_SAMPLES);

And this is definitely the wrong approach? Using the loop in the 
function will all execute on one partition? Want to be sure I understood 
the other answer correct. Thanks!


|JavaRDDnSizedRDD 
=spark.parallelize(pldListofOne).flatMap(newFlatMapFunction<PipeLinkageData,DropResult>(){publicIterablecall(PipeLinkageDatapld){ListreturnRDD 
=newArrayList();// is Spark good at spreading a for loop like 
this?for(inti =0;i ;i++){returnRDD.add(pld.doDrop());}returnreturnRDD;}});|




On 11/27/2015 4:18 PM, Jim wrote:

 Hello there,

(part of my problem is docs that say "undocumented" on parallelize 
<https://spark.apache.org/docs/1.5.0/api/java/org/apache/spark/SparkContext.html#parallelize%28scala.collection.Seq,%20int,%20scala.reflect.ClassTag%29> 
leave me reading books for examples that don't always pertain )


I am trying to create an RDD length N = 10^6 by executing N operations 
of a Java class we have, I can have that class implement Serializable 
or any Function if necessary. I don't have a fixed length dataset up 
front, I am trying to create one. Trying to figure out whether to 
create a dummy array of length N to parallelize, or pass it a function 
that runs N times.


Not sure which approach is valid/better, I see in Spark if I am 
starting out with a well defined data set like words in a doc, the 
length/count of those words is already defined and I just parallelize 
some map or filter to do some operation on that data.


In my case I think it's different, trying to parallelize the creation 
an RDD that will contain 10^6 elements... here's a lot more info if 
you want ...


DESCRIPTION:

In Java 8 using Spark 1.5.1, we have a Java method doDrop() that takes 
a PipeLinkageData and returns a DropResult.


I am thinking I could use map() or flatMap() to call a one to many 
function, I was trying to do something like this in another question 
that never quite worked 
<http://stackoverflow.com/questions/33882283/build-spark-javardd-list-from-dropresult-objects>:


|JavaRDDsimCountRDD 
=spark.parallelize(makeRange(1,getSimCount())).map(newFunction<Integer,DropResult>(){publicDropResultcall(Integeri){returnpld.doDrop();}});|


Thinking something like this is more the correct approach? And this 
has more context if desired:


|// pld is of type PipeLinkageData, it's already initialized// 
parallelize wants a collection passed into first 
paramListpldListofOne =newArrayList();// make an 
ArrayList of onepldListofOne.add(pld);inthowMany 
=100;JavaRDDnSizedRDD 
=spark.parallelize(pldListofOne).flatMap(newFlatMapFunction<PipeLinkageData,DropResult>(){publicIterablecall(PipeLinkageDatapld){ListreturnRDD 
=newArrayList();// is Spark good at spreading a for loop like 
this?for(inti =0;i ;i++){returnRDD.add(pld.doDrop());}returnreturnRDD;}});|


One other concern: A JavaRDD is corrrect here? I can see needing to 
call FlatMapFunction but I don't need a FlatMappedRDD? And since I am 
never trying to flatten a group of arrays or lists to a single array 
or list, do I really ever need to flatten anything?










Give parallelize a dummy Arraylist length N to control RDD size?

2015-11-27 Thread Jim

 Hello there,

(part of my problem is docs that say "undocumented" on parallelize 
 
leave me reading books for examples that don't always pertain )


I am trying to create an RDD length N = 10^6 by executing N operations 
of a Java class we have, I can have that class implement Serializable or 
any Function if necessary. I don't have a fixed length dataset up front, 
I am trying to create one. Trying to figure out whether to create a 
dummy array of length N to parallelize, or pass it a function that runs 
N times.


Not sure which approach is valid/better, I see in Spark if I am starting 
out with a well defined data set like words in a doc, the length/count 
of those words is already defined and I just parallelize some map or 
filter to do some operation on that data.


In my case I think it's different, trying to parallelize the creation an 
RDD that will contain 10^6 elements... here's a lot more info if you 
want ...


DESCRIPTION:

In Java 8 using Spark 1.5.1, we have a Java method doDrop() that takes a 
PipeLinkageData and returns a DropResult.


I am thinking I could use map() or flatMap() to call a one to many 
function, I was trying to do something like this in another question 
that never quite worked 
:


|JavaRDDsimCountRDD 
=spark.parallelize(makeRange(1,getSimCount())).map(newFunction(){publicDropResultcall(Integeri){returnpld.doDrop();}});|


Thinking something like this is more the correct approach? And this has 
more context if desired:


|// pld is of type PipeLinkageData, it's already initialized// 
parallelize wants a collection passed into first 
paramListpldListofOne =newArrayList();// make an 
ArrayList of onepldListofOne.add(pld);inthowMany 
=100;JavaRDDnSizedRDD 
=spark.parallelize(pldListofOne).flatMap(newFlatMapFunction(){publicIterablecall(PipeLinkageDatapld){ListreturnRDD 
=newArrayList();// is Spark good at spreading a for loop like 
this?for(inti =0;i ;i++){returnRDD.add(pld.doDrop());}returnreturnRDD;}});|


One other concern: A JavaRDD is corrrect here? I can see needing to call 
FlatMapFunction but I don't need a FlatMappedRDD? And since I am never 
trying to flatten a group of arrays or lists to a single array or list, 
do I really ever need to flatten anything?








localhost webui port

2015-10-13 Thread Langston, Jim
Hi all,

Is there anyway to change the default port 4040 for the localhost webUI,
unfortunately, that port is blocked and I have no control of that. I have
not found any configuration parameter that would enable me to change it.

Thanks,

Jim


Array column stored as “.bag” in parquet file instead of “REPEATED INT64

2015-08-27 Thread Jim Green
Hi Team,

Say I have a test.json file: {c1:[1,2,3]}
I can create a parquet file like :
var df = sqlContext.load(/tmp/test.json,json)
var df_c = df.repartition(1)
df_c.select(*).save(/tmp/testjson_spark,parquet”)

The output parquet file’s schema is like:
c1:  OPTIONAL F:1
.bag:REPEATED F:1
..array: OPTIONAL INT64 R:1 D:3

Is there anyway to avoid using “.bag”, instead of, can we create the
parquet file using column type “REPEATED INT64”?
The expected data type is:
c1:  REPEATED INT64 R:1 D:1

Thanks!
-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Jim Green
We tested Spark 1.2 and 1.3 , and this issue is gone. I know starting from
1.2, Spark uses netty instead of nio.
So you mean that bypass this issue?

Another question is , why this error message did not show in Spark 0.9 or
older version?

On Tue, Aug 4, 2015 at 11:01 PM, Aaron Davidson ilike...@gmail.com wrote:

 ConnectionManager has been deprecated and is no longer used by default
 (NettyBlockTransferService is the replacement). Hopefully you would no
 longer see these messages unless you have explicitly flipped it back on.

 On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote:

 And also https://issues.apache.org/jira/browse/SPARK-3106
 This one is still open.

 On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi
 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this
 jira is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)





-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
And also https://issues.apache.org/jira/browse/SPARK-3106
This one is still open.

On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this jira
 is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
*Symotom:*
Even sample job fails:
$ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
Pi is roughly 3.140636
ERROR ConnectionManager: Corresponding SendingConnection to
ConnectionManagerId(xxx,) not found
WARN ConnectionManager: All connections not cleaned up

Found https://issues.apache.org/jira/browse/SPARK-3322
But the code changes are not in newer version os Spark, however this jira
is marked as fixed.
Is this issue really fixed in latest version? If so, what is the related
JIRA?

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: EOFException using KryoSerializer

2015-06-24 Thread Jim Carroll
I finally got back to this and I just wanted to let anyone that runs into
this know that the problem is a kryo version issue. Spark (at least 1.4.0)
depends on Kryo 2.21 while my client had 2.24.0 on the classpath. Changing
it to 2.21 fixed the problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948p23479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Resource allocation configurations for Spark on Yarn

2015-06-12 Thread Jim Green
Hi Team,

Sharing one article which summarize the Resource allocation configurations
for Spark on Yarn:
Resource allocation configurations for Spark on Yarn
http://www.openkb.info/2015/06/resource-allocation-configurations-for.html

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


EOFException using KryoSerializer

2015-05-19 Thread Jim Carroll
I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
run the exact same code with master set to local[N] I have no problem:

 2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

KryoSerializer explicitly throws an EOFException. The comment says: 

// DeserializationStream uses the EOF exception to indicate stopping
condition.

Apparently this isn't what TorrentBroadcast expects.

Any suggestions? Thanks.

Jim





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 1.3 Hadoop File System problem

2015-03-25 Thread Jim Carroll
Thanks Patrick and Michael for your responses.

For anyone else that runs across this problem prior to 1.3.1 being released,
I've been pointed to this Jira ticket that's scheduled for 1.3.1:

https://issues.apache.org/jira/browse/SPARK-6351

Thanks again.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207p5.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



1.3 Hadoop File System problem

2015-03-24 Thread Jim Carroll

I have code that works under 1.2.1 but when I upgraded to 1.3.0 it fails to
find the s3 hadoop file system.

I get the java.lang.IllegalArgumentException: Wrong FS: s3://path to my
file], expected: file:/// when I try to save a parquet file. This worked in
1.2.1.

Has anyone else seen this?

I'm running spark using local[8] so it's all internal. These are actually
unit tests in our app that are failing now.

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
I notice that some people send messages directly to user@spark.apache.org
and some via nabble, either using email or the web client.

There are two index sites, one directly at apache.org and one at nabble.
But messages sent directly to user@spark.apache.org only show up in the
apache list.  Further, it appears that you can subscribe either directly to
user@spark.apache.org, in which you see all emails, or via nabble and you
see a subset.

Is this correct and is it intentional?

Apache site:
  http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser

Nabble site:
  http://apache-spark-user-list.1001560.n3.nabble.com/

An example of a message that only shows up in Apache:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E


This message was sent both to Nabble and user@spark.apache.org to see how
that behaves.

Jim


Re: Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
Yes, it did get delivered to the apache list shown here:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E

But the web site for spark community directs people to nabble for viewing
messages and it doesn't show up there.

Community page: http://spark.apache.org/community.html

Link in that page to the archive:
http://apache-spark-user-list.1001560.n3.nabble.com/

The reliable archive:
http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser



On Fri, Mar 20, 2015 at 12:34 PM, Ted Yu yuzhih...@gmail.com wrote:

 Jim:
 I can find the example message here:
 http://search-hadoop.com/m/JW1q5zP54J1

 On Fri, Mar 20, 2015 at 12:29 PM, Jim Kleckner j...@cloudphysics.com
 wrote:

 I notice that some people send messages directly to user@spark.apache.org
 and some via nabble, either using email or the web client.

 There are two index sites, one directly at apache.org and one at
 nabble.  But messages sent directly to user@spark.apache.org only show
 up in the apache list.  Further, it appears that you can subscribe either
 directly to user@spark.apache.org, in which you see all emails, or via
 nabble and you see a subset.

 Is this correct and is it intentional?

 Apache site:
   http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser

 Nabble site:
   http://apache-spark-user-list.1001560.n3.nabble.com/

 An example of a message that only shows up in Apache:

 http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E


 This message was sent both to Nabble and user@spark.apache.org to see
 how that behaves.

 Jim





Reliable method/tips to solve dependency issues?

2015-03-19 Thread Jim Kleckner
Do people have a reliable/repeatable method for solving dependency issues
or tips?

The current world of spark-hadoop-hbase-parquet-... is very challenging
given the huge footprint of dependent packages and we may be pushing
against the limits of how many packages can be combined into one
environment...

The process of searching the web to pick at incompatibilities one at a time
is at best tedious and at worst non-converging.

It makes me wonder if there is (or ought to be) a page cataloging in one
place the conflicts that Spark users have hit and what was done to solve it.

Eugene Yokota wrote an interesting blog about current sbt dependency
management in sbt v 0.13.7 that includes nice improvements for working with
dependencies:
  https://typesafe.com/blog/improved-dependency-management-with-sbt-0137

After reading that, I refreshed on the sbt documentation and found show
update.  It gives very extensive information.

For reference, there was an extensive discussion thread about sbt and maven
last year that touches on a lot of topics:

http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/201402.mbox/%3ccabpqxsukhd4qsf5dg9ruhn7wvonxfm+y5b1k5d8g7h6s9bh...@mail.gmail.com%3E


Re: Spark excludes fastutil dependencies we need

2015-02-27 Thread Jim Kleckner
Yes, I used both.

The discussion on this seems to be at github now:
  https://github.com/apache/spark/pull/4780

I am using more classes from a package from which spark uses HyperLogLog as
well.
So we are both including the jar file but Spark is excluding the dependent
package that is required.


On Thu, Feb 26, 2015 at 9:54 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Feb 25, 2015 at 8:42 PM, Jim Kleckner j...@cloudphysics.com
 wrote:
  So, should the userClassPathFirst flag work and there is a bug?

 Sorry for jumping in the middle of conversation (and probably missing
 some of it), but note that this option applies only to executors. If
 you're trying to use the class in your driver, there's a separate
 option for that.

 Also to note is that if you're adding a class that doesn't exist
 inside the Spark jars, which seems to be the case, this option should
 be irrelevant, since the class loaders should all end up finding the
 one copy of the class that you're adding with your app.

 --
 Marcelo





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Spark-excludes-fastutil-dependencies-we-need-tp21849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Fwd: Spark excludes fastutil dependencies we need

2015-02-25 Thread Jim Kleckner
Forwarding conversation below that didn't make it to the list.

-- Forwarded message --
From: Jim Kleckner j...@cloudphysics.com
Date: Wed, Feb 25, 2015 at 8:42 PM
Subject: Re: Spark excludes fastutil dependencies we need
To: Ted Yu yuzhih...@gmail.com
Cc: Sean Owen so...@cloudera.com, user user@spark.apache.org


Inline

On Wed, Feb 25, 2015 at 1:53 PM, Ted Yu yuzhih...@gmail.com wrote:

 Interesting. Looking at SparkConf.scala :

 val configs = Seq(
   DeprecatedConfig(spark.files.userClassPathFirst,
 spark.executor.userClassPathFirst,
 1.3),
   DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3,
 Use spark.{driver,executor}.userClassPathFirst instead.))

 It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first
 are deprecated.


Note that I did use the non-deprecated version, spark.executor.
userClassPathFirst=true.



 On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote:

 No, we should not add fastutil back. It's up to the app to bring
 dependencies it needs, and that's how I understand this issue. The
 question is really, how to get the classloader visibility right. It
 depends on where you need these classes. Have you looked into
 spark.files.userClassPathFirst and spark.yarn.user.classpath.first ?



I noted that I tried this in my original email.

The issue appears related to the fact that parquet is also creating a shaded
jar and that one leaves out the Long2LongOpenHashMap class.

FYI, I have subsequently tried removing the exclusion from the spark build
and
that does cause the fastutil classes to be included and the example works...

So, should the userClassPathFirst flag work and there is a bug?

Or is it reasonable to put in a pull request for the elimination of the
exclusion?




 On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote:
  bq. depend on missing fastutil classes like Long2LongOpenHashMap
 
  Looks like Long2LongOpenHashMap should be added to the shaded jar.
 
  Cheers

 
  On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com
 wrote:
 
  Spark includes the clearspring analytics package but intentionally
  excludes
  the dependencies of the fastutil package (see below).
 
  Spark includes parquet-column which includes fastutil and relocates it
  under
  parquet/
  but creates a shaded jar file which is incomplete because it shades out
  some
  of
  the fastutil classes, notably Long2LongOpenHashMap, which is present in
  the
  fastutil jar file that parquet-column is referencing.
 
  We are using more of the clearspring classes (e.g. QDigest) and those
 do
  depend on
  missing fastutil classes like Long2LongOpenHashMap.
 
  Even though I add them to our assembly jar file, the class loader finds
  the
  spark assembly
  and we get runtime class loader errors when we try to use it.
 
  It is possible to put our jar file first, as described here:
https://issues.apache.org/jira/browse/SPARK-939
 
 
 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment
 
  which I tried with args to spark-submit:
--conf spark.driver.userClassPathFirst=true  --conf
  spark.executor.userClassPathFirst=true
  but we still get the class not found error.
 
  We have tried copying the source code for clearspring into our package
 and
  renaming the
  package and that makes it appear to work...  Is this risky?  It
 certainly
  is
  ugly.
 
  Can anyone recommend a way to deal with this dependency **ll ?
 
 
  === The spark/pom.xml file contains the following lines:
 
dependency
  groupIdcom.clearspring.analytics/groupId
  artifactIdstream/artifactId
  version2.7.0/version
  exclusions
 
exclusion
  groupIdit.unimi.dsi/groupId
  artifactIdfastutil/artifactId
/exclusion
  /exclusions
/dependency
 
  === The parquet-column/pom.xml file contains:
  artifactIdmaven-shade-plugin/artifactId
  executions
execution
  phasepackage/phase
  goals
goalshade/goal
  /goals
  configuration
minimizeJartrue/minimizeJar
artifactSet
  includes
includeit.unimi.dsi:fastutil/include
  /includes
/artifactSet
relocations
  relocation
patternit.unimi.dsi/pattern
shadedPatternparquet.it.unimi.dsi/shadedPattern
  /relocation
/relocations
  /configuration
/execution
  /executions
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com

Re: Fwd: Spark excludes fastutil dependencies we need

2015-02-25 Thread Jim Kleckner
I created an issue and pull request.

Discussion can continue there:
https://issues.apache.org/jira/browse/SPARK-6029



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-excludes-fastutil-dependencies-we-need-tp21812p21814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark excludes fastutil dependencies we need

2015-02-25 Thread Jim Kleckner
Inline

On Wed, Feb 25, 2015 at 1:53 PM, Ted Yu yuzhih...@gmail.com wrote:

 Interesting. Looking at SparkConf.scala :

 val configs = Seq(
   DeprecatedConfig(spark.files.userClassPathFirst,
 spark.executor.userClassPathFirst,
 1.3),
   DeprecatedConfig(spark.yarn.user.classpath.first, null, 1.3,
 Use spark.{driver,executor}.userClassPathFirst instead.))

 It seems spark.files.userClassPathFirst and spark.yarn.user.classpath.first
 are deprecated.


Note that I did use the non-deprecated version, spark.executor.
userClassPathFirst=true.



 On Wed, Feb 25, 2015 at 12:39 AM, Sean Owen so...@cloudera.com wrote:

 No, we should not add fastutil back. It's up to the app to bring
 dependencies it needs, and that's how I understand this issue. The
 question is really, how to get the classloader visibility right. It
 depends on where you need these classes. Have you looked into
 spark.files.userClassPathFirst and spark.yarn.user.classpath.first ?



I noted that I tried this in my original email.

The issue appears related to the fact that parquet is also creating a shaded
jar and that one leaves out the Long2LongOpenHashMap class.

FYI, I have subsequently tried removing the exclusion from the spark build
and
that does cause the fastutil classes to be included and the example works...

So, should the userClassPathFirst flag work and there is a bug?

Or is it reasonable to put in a pull request for the elimination of the
exclusion?




 On Wed, Feb 25, 2015 at 5:34 AM, Ted Yu yuzhih...@gmail.com wrote:
  bq. depend on missing fastutil classes like Long2LongOpenHashMap
 
  Looks like Long2LongOpenHashMap should be added to the shaded jar.
 
  Cheers

 
  On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com
 wrote:
 
  Spark includes the clearspring analytics package but intentionally
  excludes
  the dependencies of the fastutil package (see below).
 
  Spark includes parquet-column which includes fastutil and relocates it
  under
  parquet/
  but creates a shaded jar file which is incomplete because it shades out
  some
  of
  the fastutil classes, notably Long2LongOpenHashMap, which is present in
  the
  fastutil jar file that parquet-column is referencing.
 
  We are using more of the clearspring classes (e.g. QDigest) and those
 do
  depend on
  missing fastutil classes like Long2LongOpenHashMap.
 
  Even though I add them to our assembly jar file, the class loader finds
  the
  spark assembly
  and we get runtime class loader errors when we try to use it.
 
  It is possible to put our jar file first, as described here:
https://issues.apache.org/jira/browse/SPARK-939
 
 
 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment
 
  which I tried with args to spark-submit:
--conf spark.driver.userClassPathFirst=true  --conf
  spark.executor.userClassPathFirst=true
  but we still get the class not found error.
 
  We have tried copying the source code for clearspring into our package
 and
  renaming the
  package and that makes it appear to work...  Is this risky?  It
 certainly
  is
  ugly.
 
  Can anyone recommend a way to deal with this dependency **ll ?
 
 
  === The spark/pom.xml file contains the following lines:
 
dependency
  groupIdcom.clearspring.analytics/groupId
  artifactIdstream/artifactId
  version2.7.0/version
  exclusions
 
exclusion
  groupIdit.unimi.dsi/groupId
  artifactIdfastutil/artifactId
/exclusion
  /exclusions
/dependency
 
  === The parquet-column/pom.xml file contains:
  artifactIdmaven-shade-plugin/artifactId
  executions
execution
  phasepackage/phase
  goals
goalshade/goal
  /goals
  configuration
minimizeJartrue/minimizeJar
artifactSet
  includes
includeit.unimi.dsi:fastutil/include
  /includes
/artifactSet
relocations
  relocation
patternit.unimi.dsi/pattern
shadedPatternparquet.it.unimi.dsi/shadedPattern
  /relocation
/relocations
  /configuration
/execution
  /executions
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 





Spark excludes fastutil dependencies we need

2015-02-24 Thread Jim Kleckner
Spark includes the clearspring analytics package but intentionally excludes
the dependencies of the fastutil package (see below).

Spark includes parquet-column which includes fastutil and relocates it under
parquet/
but creates a shaded jar file which is incomplete because it shades out some
of 
the fastutil classes, notably Long2LongOpenHashMap, which is present in the
fastutil jar file that parquet-column is referencing.

We are using more of the clearspring classes (e.g. QDigest) and those do
depend on
missing fastutil classes like Long2LongOpenHashMap.

Even though I add them to our assembly jar file, the class loader finds the
spark assembly
and we get runtime class loader errors when we try to use it.

It is possible to put our jar file first, as described here:
  https://issues.apache.org/jira/browse/SPARK-939
  http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment

which I tried with args to spark-submit:
  --conf spark.driver.userClassPathFirst=true  --conf
spark.executor.userClassPathFirst=true
but we still get the class not found error.

We have tried copying the source code for clearspring into our package and
renaming the
package and that makes it appear to work...  Is this risky?  It certainly is
ugly.

Can anyone recommend a way to deal with this dependency **ll ?


=== The spark/pom.xml file contains the following lines:

  dependency
groupIdcom.clearspring.analytics/groupId
artifactIdstream/artifactId
version2.7.0/version
exclusions
  
  exclusion
groupIdit.unimi.dsi/groupId
artifactIdfastutil/artifactId
  /exclusion
/exclusions
  /dependency

=== The parquet-column/pom.xml file contains:
artifactIdmaven-shade-plugin/artifactId
executions
  execution
phasepackage/phase
goals
  goalshade/goal
/goals
configuration
  minimizeJartrue/minimizeJar
  artifactSet
includes
  includeit.unimi.dsi:fastutil/include
/includes
  /artifactSet
  relocations
relocation
  patternit.unimi.dsi/pattern
  shadedPatternparquet.it.unimi.dsi/shadedPattern
/relocation
  /relocations
/configuration
  /execution
/executions




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark impersonation

2015-02-02 Thread Jim Green
Hi Team,

Does spark support impersonation?
For example, when spark on yarn/hive/hbase/etc..., which user is used by
default?
The user which starts the spark job?
Any suggestions related to impersonation?

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Scala on Spark functions examples cheatsheet.

2015-02-02 Thread Jim Green
Hi Team,

I just spent some time these 2 weeks on Scala and tried all Scala on Spark
functions in the Spark Programming Guide
http://spark.apache.org/docs/1.2.0/programming-guide.html.
If you need example codes of Scala on Spark functions, I created this cheat
sheet  http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.htmlwith
examples.

Sharing.

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Parquet divide by zero

2015-01-28 Thread Jim Carroll
Hello all,

I've been hitting a divide by zero error in Parquet though Spark detailed
(and fixed) here: https://github.com/apache/incubator-parquet-mr/pull/102

Is anyone else hitting this error? I hit it frequently.

It looks like the Parquet team is preparing to release 1.6.0 and, since they
have been completely unresponsive, I'm assuming its going to go with this
bug (without the fix). Other than the fact that the divide by zero mistake
is obvious, perhaps the conditions it occurs are rare and I'm doing
something wrong.

Has anyone else hit this and if so, have they resolved it?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-divide-by-zero-tp21406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
Hi Team,

I need some help on writing a scala to bulk load some data into hbase.
*Env:*
hbase 0.94
spark-1.0.2

I am trying below code to just bulk load some data into hbase table “t1”.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = t1
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x={
val put: Put = new Put(Bytes.toBytes(x))
put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable],
classOf[Put], classOf[HFileOutputFormat], conf)


However I am allways getting below error:
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be
cast to org.apache.hadoop.hbase.KeyValue
at
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

My questions are:
1. Do we have a sample code to do bulk load into hbase directly?
Can we use saveAsNewAPIHadoopFile?

2. Is there any other way to do this?
For example, firstly write a hfile on hdfs, and then use hbase command to
bulk load?
Any sample code using scala?

Thanks.




-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
Thanks Ted. Could you give me a simple example to load one row data in
hbase? How should I generate the KeyValue?
I tried multiple times, and still can not figure it out.

On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here is the method signature used by HFileOutputFormat :
   public void write(ImmutableBytesWritable row, KeyValue kv)

 Meaning, KeyValue is expected, not Put.

 On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote:

 Hi Team,

 I need some help on writing a scala to bulk load some data into hbase.
 *Env:*
 hbase 0.94
 spark-1.0.2

 I am trying below code to just bulk load some data into hbase table “t1”.

 import org.apache.spark._
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.hbase.client.HBaseAdmin
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

 val conf = HBaseConfiguration.create()
 val tableName = t1
 val table = new HTable(conf, tableName)

 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val job = Job.getInstance(conf)
 job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
 job.setMapOutputValueClass (classOf[KeyValue])
 HFileOutputFormat.configureIncrementalLoad (job, table)

 val num = sc.parallelize(1 to 10)
 val rdd = num.map(x={
 val put: Put = new Put(Bytes.toBytes(x))
 put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
 (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
 })
 rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable],
 classOf[Put], classOf[HFileOutputFormat], conf)


 However I am allways getting below error:
 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
 be cast to org.apache.hadoop.hbase.KeyValue
 at
 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 My questions are:
 1. Do we have a sample code to do bulk load into hbase directly?
 Can we use saveAsNewAPIHadoopFile?

 2. Is there any other way to do this?
 For example, firstly write a hfile on hdfs, and then use hbase command to
 bulk load?
 Any sample code using scala?

 Thanks.




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)





-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
I used below code, and it still failed with the same error.
Anyone has experience on bulk loading using scala?
Thanks.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = t1
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x={
val put: Put = new Put(Bytes.toBytes(x))
put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile(/tmp/13, classOf[ImmutableBytesWritable],
classOf[KeyValue], classOf[HFileOutputFormat], conf)



On Tue, Jan 27, 2015 at 12:17 PM, Jim Green openkbi...@gmail.com wrote:

 Thanks Ted. Could you give me a simple example to load one row data in
 hbase? How should I generate the KeyValue?
 I tried multiple times, and still can not figure it out.

 On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here is the method signature used by HFileOutputFormat :
   public void write(ImmutableBytesWritable row, KeyValue kv)

 Meaning, KeyValue is expected, not Put.

 On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote:

 Hi Team,

 I need some help on writing a scala to bulk load some data into hbase.
 *Env:*
 hbase 0.94
 spark-1.0.2

 I am trying below code to just bulk load some data into hbase table “t1”.

 import org.apache.spark._
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.hbase.client.HBaseAdmin
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

 val conf = HBaseConfiguration.create()
 val tableName = t1
 val table = new HTable(conf, tableName)

 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val job = Job.getInstance(conf)
 job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
 job.setMapOutputValueClass (classOf[KeyValue])
 HFileOutputFormat.configureIncrementalLoad (job, table)

 val num = sc.parallelize(1 to 10)
 val rdd = num.map(x={
 val put: Put = new Put(Bytes.toBytes(x))
 put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
 (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
 })
 rdd.saveAsNewAPIHadoopFile(/tmp/8,
 classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
 conf)


 However I am allways getting below error:
 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
 be cast to org.apache.hadoop.hbase.KeyValue
 at
 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 My questions are:
 1. Do we have a sample code to do bulk load into hbase directly?
 Can we use

Cluster Aware Custom RDD

2015-01-16 Thread Jim Carroll
Hello all,

I have a custom RDD for fast loading of data from a non-partitioned source.
The partitioning happens in the RDD implementation by pushing data from the
source into queues picked up by the current active partitions in worker
threads.

This works great on a multi-threaded single host (say with the manager set
to local[x] ) but I'd like to run it distributed. However, I need to know,
not only which slice my partition is, but also which host (by sequence)
it's on so I can divide up the source by worker (host) and then run the
multi-threaded. In other words, I need what effectively amounts to a 2-tier
slice identifier.

I know this is probably unorthodox, but is there some way to get this
information in the compute method or the deserialized Partition objects?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-Aware-Custom-RDD-tp21196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-18 Thread Jim Carroll
Hi,

This was all my fault. It turned out I had a line of code buried in a
library that did a repartition. I used this library to wrap an RDD to
present it to legacy code as a different interface. That's what was causing
the data to spill to disk.

The really stupid thing is it took me the better part of a day to find and
several misguided emails to this list (including the one that started this
thread).

Sorry about that.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20763.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Okay,

I have an rdd that I want to run an aggregate over but it insists on
spilling to disk even though I structured the processing to only require a
single pass.

In other words, I can do all of my processing one entry in the rdd at a time
without persisting anything.

I set rdd.persist(StorageLevel.NONE) and it had no affect. When I run
locally I get my /tmp directory filled with transient rdd data even though I
never need the data again after the row's been processed. Is there a way to
turn this off?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
In case a little more information is helpful:

the RDD is constructed using sc.textFile(fileUri) where the fileUri is to a
.gz file (that's too big to fit on my disk).

I do an rdd.persist(StorageLevel.NONE) and it seems to have no affect.

This rdd is what I'm calling aggregate on and I expect to only use it once.
Each row in the rdd never has to be revisited. The aggregate seqOp is
modifying a current state and returning it so there's no need to store the
results of the seqOp on a row-by-row basis, and give the fact that there's
one partition the comboOp doesn't even need to be called (since there would
be nothing to combine across partitions).

Thanks for any help.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20724.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Nvm. I'm going to post another question since this has to do with the way
spark handles sc.textFile with a file://.gz



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim Carroll
Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a transformation and not
an action (if the analogy is apt). There is no need to fully create and
fill out an intermediate RDD with the expanded data when it can be done one
row at a time.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim


Hi Harry,

Thanks for your response.

I'm working in scala. When I do a count call it expands the RDD in the 
count (since it's an action). You can see the call stack that results in 
the failure of the job here:


 ERROR DiskBlockObjectWriter - Uncaught exception while reverting 
partial writes to file 
/tmp/spark-local-20141216170458-964a/1d/temp_shuffle_4f46af09-5521-4fc6-adb1-c72839520560

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at 
org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:263)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)

at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718)
at 
org.apache.spark.serializer.JavaSerializationStream.flush(JavaSerializer.scala:51)
at 
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:173)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:774)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:773)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.util.collection.ExternalSorter.stop(ExternalSorter.scala:773)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:93)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Notice the task run (this is now doing a count) results in a Shuffle 
during which it writes the intermediate RDD to disk (and fails when the 
disk is full). This intermediate RDD/disk write is unnecessary.


I even implemented a Seq[String] in terms of streaming the file and 
called sc.parallelize(mySequence,1) and THIS results in a call to 
toArray on my sequence. Since this wont fit on disk it certainly wont 
fit in an array in memory.


Thanks for taking the time to respond.

Jim

On 12/16/2014 04:57 PM, Harry Brundage wrote:
Are you certain that's happening Jim? Why? What happens if you just do 
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop 
InputFormat for gzip and the RDD wrapper around it already has the 
streaming behaviour you wish for. but I could be wrong. Also, are 
you in pyspark or scala Spark?


On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll jimfcarr...@gmail.com 
mailto:jimfcarr...@gmail.com wrote:


Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a
transformation and not
an action (if the analogy is apt). There is no need to fully
create and
fill out an intermediate RDD with the expanded data when it can be
done one
row at a time.




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org





How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll

I've been trying to figure out how to use Spark to do a simple aggregation
without reparitioning and essentially creating fully instantiated
intermediate RDDs and it seem virtually impossible.

I've now gone as far as writing my own single parition RDD that wraps an
Iterator[String] and calling aggregate() on it. Before any of my aggregation
code executes the entire Iterator is unwound and multiple partitions are
created to be given to my aggregation.

The Task execution call stack includes:
   ShuffleMap.runTask
   SortShuffleWriter.write
   ExternalSorter.insertAll
  ... which is iterating over my entire RDD and repartitioning it an
SpillFile collecting it. 

How do I prevent this from happening? There's no need to do this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll
Wow. i just realized what was happening and it's all my fault. I have a
library method that I wrote that presents the RDD and I was actually
repartitioning it myself.

I feel pretty dumb. Sorry about that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732p20735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Standard SQL tool access to SchemaRDD

2014-12-02 Thread Jim Carroll
Hello all,

Is there a way to load an RDD in a small driver app and connect with a JDBC
client and issue SQL queries against it? It seems the thrift server only
works with pre-existing Hive tables.

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Standard SQL tool access to SchemaRDD

2014-12-02 Thread Jim Carroll

Thanks! I'll give it a try.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197p20202.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
How do I set the log level when running local[n]? It ignores the
log4j.properties file on my classpath.

I also tried to set the spark home dir on the SparkConfig using setSparkHome
and made sure an appropriate log4j.properties file was in a conf
subdirectory and that didn't work either.

I'm running with the current master.

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
Actually, it looks like it's Parquet logging that I don't have control over.

For some reason the parquet project decided to use java.util logging with
its own logging configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

I'm running a local spark master (local[n]).

I cannot seem to turn off the parquet logging. I tried:

1) Setting a log4j.properties on the classpath.
2) Setting a log4j.properties file in a spark install conf directory and
pointing to the install using setSparkHome
3) Editing the log4j-default.properties file in the spark-core jar that I'm
using
4) Changing the JAVA_HOME/jre/lib/logging.properties (since Parquet uses
java.util.logging)
5) adding the following code as the first lines in my main:

  java.util.logging.Logger.getLogger(parquet).addHandler(
  new java.util.logging.Handler() {
def close(): Unit = {}
def flush(): Unit = {}
def publish(x: java.util.logging.LogRecord): Unit = {}
  })

NOTHING seems to change the default log level console output in parquet when
it runs in a worker.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

This is a problem because (other than the fact that Parquet uses
java.util.logging) of a bug in Spark in the current master.

ParquetRelation.scala attempts to override the parquet logger but, at least
currently (and if your application simply reads a parquet file before it
does anything else with Parquet), the parquet.Log class hasn't been loaded
yet. Therefore the code in ParquetRelation.enableLogForwarding has no
affect. If you look at the code in parquet.Log there's a static initializer
that needs to be called prior to enableLogForwarding or whatever
enableLogForwarding does gets undone by this static initializer.

Adding: Class.forName(parquet.Log) as the first thing in my main fixed the
problem.

The fix would be to force the static initializer to get called in
parquet.Log as part of enableForwardLogging.

Anyone want a PR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
Just to be complete, this is a problem in Spark that I worked around and
detailed here:
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-td18955.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

Jira: https://issues.apache.org/jira/browse/SPARK-4412
PR: https://github.com/apache/spark/pull/3271




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Wildly varying aggregate performance depending on code location

2014-11-12 Thread Jim Carroll
Hello all,

I have a really strange thing going on.

I have a test data set with 500K lines in a gzipped csv file.

I have an array of column processors, one for each column in the dataset.
A Processor tracks aggregate state and has a method process(v : String)

I'm calling:

  val processors: Array[Processors] = 

  sc.textFile(gzippedFileName).aggregate(processors,
{ (curState, row) =
row.split(,, -1).zipWithIndex.foreach({
  v = curState(v._2).process(v._1)
})
curState
} )

If the class definition for the Processors is in the same file as the driver
it runs in ~23 seconds. If I move the classes to a separate file in the same
package without ANY OTHER CHANGES it goes to ~35 seconds.

This doesn't make any sense to me. I can't even understand how the compiled
class files could be any different in either case.

Does anyone have an explanation for why this might be?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Wildly varying aggregate performance depending on code location

2014-11-12 Thread Jim Carroll

Well it looks like this is a scala problem after all. I loaded the file
using pure scala and ran the exact same Processors without Spark and I got
20 seconds (with the code in the same file as the 'main') vs 30 seconds
(with the exact same code in a different file) on the 500K rows.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752p18772.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ending a job early

2014-10-28 Thread Jim Carroll

We have some very large datasets where the calculation converge on a result.
Our current implementation allows us to track how quickly the calculations
are converging and end the processing early. This can significantly speed up
some of our processing.

Is there a way to do the same thing is spark?

A trivial example might be a column average on a dataset. As we're
'aggregating' rows into columnar averages I can track how fast these
averages are moving and decide to stop after a low percentage of the rows
has been processed, producing an estimate rather than an exact value.

Within a partition, or better yet, within a worker across 'reduce' steps, is
there a way to stop all of the aggregations and just continue on with
reduces of already processed data?

Thanks
JIm




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ending-a-job-early-tp17505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug in JettyUtils?

2014-09-24 Thread Jim Donahue
I've been seeing a failure in JettyUtils.createStaticHandler when running Spark 
1.1 programs from Eclipse which I hadn't seen before.  The problem seems to be 
line 129

Option(Utils.getSparkClassLoader.getResource(resourceBase)) match { ...

getSparkClassLoader returns NULL, and it's all downhill after that. :)  
Following the code down a bit, it's trying to get the ClassLoader which loaded 
Spark.  However, stepping into the getClassLoader code, there's a comment that 
says some implementations may use null to represent the bootstrap class 
loader.  This method will return null in such implementations if this class was 
loaded by the bootstrap class loader.

Unfortunately, the code in JettyUtils isn't prepared to handle this case. :(



Jim Donahue



Re: Network requirements between Driver, Master, and Slave

2014-09-12 Thread Jim Carroll
Hi Akhil,

Thanks! I guess in short that means the master (or slaves?) connect back to
the driver. This seems like a really odd way to work given the driver needs
to already connect to the master on port 7077. I would have thought that if
the driver could initiate a connection to the master, that would be all
that's required.

Can you describe what it is about the architecture that requires the master
to connect back to the driver even when the driver initiates a connection to
the master? Just curious.

Thanks anyway.
Jim
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997p14086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Network requirements between Driver, Master, and Slave

2014-09-11 Thread Jim Carroll
Hello all,

I'm trying to run a Driver on my local network with a deployment on EC2 and
it's not working. I was wondering if either the master or slave instances
(in standalone) connect back to the driver program.

I outlined the details of my observations in a previous post but here is
what I'm seeing:

I have v1.1.0 installed (the new tag) on ec2 using the spark-ec2 script.
I have the same version of the code built locally.
I edited the master security group to allow inbound access from anywhere to
7077 and 8080.
I see a connection take place.
I see the workers fail with a timeout when any job is run.
The master eventually removes the driver's job.

I supposed this makes sense if there's a requirement for either the worker
or the master to be on the same network as the driver. Is that the case?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
My apologies to the list. I replied to Manu's question and it went directly
to him rather than the list.

In case anyone else has this issue here is my reply and Manu's reply to me.
This also answers Ian's question.

---

Hi Manu,

The dataset is 7.5 million rows and 500 columns. In parquet form it's about
1.1 Gig. It was created with Spark and copied up to s3. It has about 4600
parts (which I'd also like to gain some control over). I can try a smaller
dataset, however it works when I run it locally, even with the file out on
s3. It just takes a while.

I can try copying it to HDFS first but that wont help longer term.

Thanks
Jim

-
Manu's response:
-

I am pretty sure it is due to the number of parts you have.. I have a
parquet data set that is 250M rows  and 924 columns and it is ~2500 files... 

I recommend creating a tables in HIve with that data set and doing an insert
overwrite so you can get a data set with more manageable files..

Why I think its the number of files is that I believe that a all of those or
large part of those files are read when you run sqlContext.parquetFile() and
the time it would take in s3 for that to happen is a lot so something
internally is timing out..

-Manu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll

Why I think its the number of files is that I believe that a
 all of those or large part of those files are read when 
you run sqlContext.parquetFile() and the time it would 
take in s3 for that to happen is a lot so something 
internally is timing out.. 

I'll create the parquet files with Drill instead of Spark which will give me
(somewhat) better control over the slice sizes and see what happens.

That said, this behavior seems wrong to me. First, exiting due to inactivity
on a job seems like (perhaps?) the wrong fix to a former problem.  Second,
there IS activity if it's reading the slice headers but the job is exiting
anyway. So if this fixes the problem the measure of activity seems wrong.

Ian and Manu, thanks for your help. I'll post back and let you know if that
fixes it.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
Okay,

This seems to be either a code version issue or a communication issue. It
works if I execute the spark shell from the master node. It doesn't work if
I run it from my laptop and connect to the master node. 

I had opened the ports for the WebUI (8080) and the cluster manager (7077)
for the master node or it fails much sooner. Do I need to open up the ports
for the workers as well?

I used the spark-ec2 install script with --spark-version using both 1.0.2
and then again with the git hash tag that corresponds to 1.1.0rc4
(2f9b2bd7844ee8393dc9c319f4fefedf95f5e460). In both cases I rebuilt from
source using the same codebase on my machine and moved the entire project
into /root/spark (since to run the spark-shell it needs to match the same
path as the install on ec2). Could I have missed something here?

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Querying a parquet file in s3 with an ec2 install

2014-09-08 Thread Jim Carroll
)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

As far as the Initial job has not accepted any resources I'm running the
spark-shell command with:

SPARK_MEM=2g ./spark-shell --master
spark://ec2-x-x-x-x.compute-1.amazonaws.com:7077

According to the master web page each node has 6 Gig so I'm not sure why I'm
seeing that message either. If I run with less than 2g I get the following
in my spark-shell:

14/09/08 17:47:38 INFO Remoting: Remoting shut down
14/09/08 17:47:38 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
java.io.IOException: Error reading summaries
at
parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:128)

Caused by: java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.FutureTask.report(FutureTask.java:122)

I'm not sure if this exception is from the spark-shell jvm or transferred
over from the master or a worker through the master.

Any help would be greatly appreciated.

Thanks
Jim









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using Spark to add data to an existing Parquet file without a schema

2014-09-04 Thread Jim Carroll
Hello all,

I've been trying to figure out how to add data to an existing Parquet file
without having a schema. Spark has allowed me to load JSON and save it as a
Parquet file but I was wondering if anyone knows how to ADD/INSERT more
data. 

I tried using sql insert and that doesn't work. All of the examples assume a
schema exists in the form of a serialization IDL and generated classes.

I looked into the code and considered direct use of InsertIntoParquetTable
or a copy of it but I was hoping someone already solved the problem.

Any guidance would be greatly appreciated.

Thanks
Jim






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Spark to add data to an existing Parquet file without a schema

2014-09-04 Thread Jim Carroll
Okay,

Obviously I don't care about adding more files to the system so is there a
way to point to an existing parquet file (directory) and seed the individual
part-r-***.parquet (the value of partition + offset) while preventing 

I mean, I can hack it by copying files into the same parquet directory and
managing the file names externally but this seems like a work around. Is
that the way others are doing it?

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450p13499.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Compiling SNAPTSHOT

2014-08-14 Thread Jim Blomo
Hi, I'm having trouble compiling a snapshot, any advice would be
appreciated.  I get the error below when compiling either master or
branch-1.1.  The key error is, I believe, [ERROR] File name too long
but I don't understand what it is referring to.  Thanks!


./make-distribution.sh --tgz --skip-java-test -Phadoop-2.4  -Pyarn
-Dyarn.version=2.4.0

[ERROR]
 while compiling:
/home/jblomo/src/spark/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
during phase: jvm
 library version: version 2.10.4
compiler version: version 2.10.4
  reconstructed args: -classpath

Re: No space left on device

2014-08-09 Thread Jim Donahue
Root partitions on AWS instances tend to be small (for example, an m1.large 
instance has 2 420 GB drives, but only a 10 GB root partition).  Matei's 
probably right on about this - just need to be careful where things like the 
logs get stored.

From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com
Date: Saturday, August 9, 2014 at 1:48 PM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org 
u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org, 
kmatzen kmat...@gmail.commailto:kmat...@gmail.com
Subject: Re: No space left on device

Your map-only job should not be shuffling, but if you want to see what's 
running, look at the web UI at http://driver:4040. In fact the job should not 
even write stuff to disk except inasmuch as the Hadoop S3 library might build 
up blocks locally before sending them on.

My guess is that it's not /mnt or /mnt2 that get filled, but the root volume, 
/, either with logs or with temp files created by the Hadoop S3 library. You 
can check this by running df while the job is executing. (Tools like Ganglia 
can probably also log this.) If it is the logs, you can symlink the spark/logs 
directory to someplace on /mnt instead. If it's /tmp, you can set 
java.io.tmpdir to another directory in Spark's JVM options.

Matei


On August 8, 2014 at 11:02:48 PM, kmatzen 
(kmat...@gmail.commailto:kmat...@gmail.com) wrote:

I need some configuration / debugging recommendations to work around no
space left on device. I am completely new to Spark, but I have some
experience with Hadoop.

I have a task where I read images stored in sequence files from s3://,
process them with a map in scala, and write the result back to s3://. I
have about 15 r3.8xlarge instances allocated with the included ec2 script.
The input data is about 1.5 TB and I expect the output to be similarly
sized. 15 r3.8xlarge instances give me about 3 TB of RAM and 9 TB of
storage, so hopefully more than enough for this task.

What happens is that it takes about an hour to read in the input from S3.
Once that is complete, then it begins to process the images and several
succeed. However, quickly, the job fails with no space left on device.
By time I can ssh into one of the machines that reported the error, temp
files have already been cleaned up. I don't see any more detailed messages
in the slave logs. I have not yet changed the logging configuration from
the default.

The S3 input and output are cached in /mnt/ephemeral-hdfs/s3 and
/mnt2/ephemeral-hdfs/s3 (I see mostly input files at the time of failure,
but maybe 1 output file per slave). Shuffle files are generated in
/mnt/spark/something and /mnt2/spark/something (they were cleaned up
once the job failed and I don't remember the directory that I saw while it
was still running). I checked the disk utilization for a few slaves while
running the pipeline and they were pretty far away from being full. But the
failure probably came from a slave that was overloaded from a shard
imbalance (but why would that happen on read - map - write?).

What other things might I need to configure to prevent this error? What
logging options do people recommend? Is there an easy way to diagnose spark
failures from the web interface like with Hadoop?

I need to do some more testing to make sure I'm not emitting a giant image
for a malformed input image, but I figured I'd post this question early in
case anyone had any recommendations.

BTW, why does a map-only job need to shuffle? I was expecting it to
pipeline the transfer in from S3 operation, the actual computation
operation, and the transfer back out to S3 operation rather than doing
everything serially with a giant disk footprint. Actually, I was thinking
it would fuse all three operations into a single stage. Is that not what
Spark does?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-space-left-on-device-tp11829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



RE: Save an RDD to a SQL Database

2014-08-07 Thread Jim Donahue
Depending on what you mean by save, you might be able to use the Twitter 
Storehaus package to do this.  There was a nice talk about this at a Spark 
meetup -- Stores, Monoids and Dependency Injection - Abstractions for Spark 
Streaming Jobs.  Video here: 
https://www.youtube.com/watch?v=C7gWtxelYNMfeature=youtu.be.


Jim Donahue
Adobe

-Original Message-
From: Ron Gonzalez [mailto:zlgonza...@yahoo.com.INVALID] 
Sent: Wednesday, August 06, 2014 7:18 AM
To: Vida Ha
Cc: u...@spark.incubator.apache.org
Subject: Re: Save an RDD to a SQL Database

Hi Vida,
  It's possible to save an RDD as a hadoop file using hadoop output formats. It 
might be worthwhile to investigate using DBOutputFormat and see if this will 
work for you.
  I haven't personally written to a db, but I'd imagine this would be one way 
to do it.

Thanks,
Ron

Sent from my iPhone

 On Aug 5, 2014, at 8:29 PM, Vida Ha vid...@gmail.com wrote:
 
 
 Hi,
 
 I would like to save an RDD to a SQL database.  It seems like this would be a 
 common enough use case.  Are there any built in libraries to do it?
 
 Otherwise, I'm just planning on mapping my RDD, and having that call a method 
 to write to the database.   Given that a lot of records are going to be 
 written, the code would need to be smart and do a batch insert after enough 
 records have collected.  Does that sound like a reasonable approach?
 
 
 -Vida
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Command exited with code 137

2014-06-13 Thread Jim Blomo
I've seen these caused by the OOM killer.  I recommend checking
/var/log/syslog to see if it was activated due to lack of system
memory.

On Thu, Jun 12, 2014 at 11:45 PM, libl 271592...@qq.com wrote:
 I use standalone mode submit task.But often,I got an error.The stacktrace as

 2014-06-12 11:37:36,578 [INFO] [org.apache.spark.Logging$class]
 [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18]
  - Executor updated: app-20140612092238-0007/0 is now FAILED (Command exited
 with code 137)
 2014-06-12 11:37:36,670 [INFO] [org.apache.spark.Logging$class]
 [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-18]
  - Executor app-20140612092238-0007/0 removed: Command exited with code 137
 2014-06-12 11:37:36,673 [INFO] [org.apache.spark.Logging$cla0ss]
 [Method:logInfo] [Line:49] [Thread:spark-akka.actor.default-dispatcher-15]
  - Executor 0 disconnected, so removing it
 2014-06-12 11:37:36,682 [ERROR] [org.apache.spark.Logging$class]
 [Method:logError] [Line:65] [Thread:spark-akka.actor.default-dispatcher-15]
  - Lost executor 0 on tj-hadoop-1.certus.com: Unknown executor exit code
 (137) (died from signal 9?)


 spark config is
 spark_worker_timeout=300
 spark_akka_timeout=500
 spark_akka_frameSize=1000
 spark_akka_num_retries=30
 spark_akka_askTimeout=300



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Command-exited-with-code-137-tp7557.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pySpark memory usage

2014-05-15 Thread Jim Blomo
Should add that I had to tweak the numbers a bit to keep above swap
threshold, but below the Too many open files error (`ulimit -n` is
32768).

On Wed, May 14, 2014 at 10:47 AM, Jim Blomo jim.bl...@gmail.com wrote:
 That worked amazingly well, thank you Matei!  Numbers that worked for
 me were 400 for the textFile()s, 1500 for the join()s.

 On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).

 Matei

 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.

 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.

 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com 
 wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think 
 there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning

Re: pySpark memory usage

2014-05-15 Thread Jim Blomo
That worked amazingly well, thank you Matei!  Numbers that worked for
me were 400 for the textFile()s, 1500 for the join()s.

On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).

 Matei

 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.

 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.

 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning data
 // before we are done passing elements through, e.g., for
 take(). Just log a message to


 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:

 This dataset is uncompressed text at ~54GB

Re: pySpark memory usage

2014-05-12 Thread Jim Blomo
Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.

I noticed that the S3 exception seem to occur more frequently when the
box is swapping.  Why is the box swapping?  combineByKey seems to make
the assumption that it can fit an entire partition in memory when
doing the combineLocally step.  I'm going to try to break this apart
but will need some sort of heuristic options include looking at memory
usage via the resource module and trying to keep below
'spark.executor.memory', or using batchSize to limit the number of
entries in the dictionary.  Let me know if you have any opinions.

On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
  readerException = e
  Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
case e: IOException =
  // This can happen for legitimate reasons if the Python code
 stops returning data
  // before we are done passing elements through, e.g., for
 take(). Just log a message to


 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:

 This dataset is uncompressed text at ~54GB. stats() returns (count:
 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
 343)

 On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Okay, thanks. Do you have any info on how large your records and data
  file are? I'd like to reproduce and fix this.
 
  Matei
 
  On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
  Hi Matei, thanks for working with me to find these issues.
 
  To summarize, the issues I've seen are:
  0.9.0:
  - https://issues.apache.org/jira/browse/SPARK-1323
 
  SNAPSHOT 2014-03-18:
  - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
  Java heap space.  To me this indicates a memory

Continuously running non-streaming jobs

2014-04-17 Thread Jim Carroll
Is there a way to create continuously-running, or at least
continuously-loaded, jobs that can be 'invoked' rather than 'sent' to to
avoid the job creation overhead of a couple seconds?

I read through the following:
http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Continuously running non-streaming jobs

2014-04-17 Thread Jim Carroll
Daniel,

I'm new to Spark but I thought that thread hinted at the right answer. 

Thanks,
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391p4397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark - ready for prime time?

2014-04-13 Thread Jim Blomo
On Thu, Apr 10, 2014 at 12:24 PM, Andrew Ash and...@andrewash.com wrote:
 The biggest issue I've come across is that the cluster is somewhat unstable
 when under memory pressure.  Meaning that if you attempt to persist an RDD
 that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
 OOMs.  I had to carefully modify some of the space tuning parameters and GC
 settings to get some jobs to even finish.

Would you mind sharing some of these settings?  Even just a GitHub
gist would be helpful.  These are the main issues I've run into as
well, and memory pressure also seems to be correlated with akka
timeouts, possibly because of GC pauses.


Re: pySpark memory usage

2014-04-09 Thread Jim Blomo
Hi Matei, thanks for working with me to find these issues.

To summarize, the issues I've seen are:
0.9.0:
- https://issues.apache.org/jira/browse/SPARK-1323

SNAPSHOT 2014-03-18:
- When persist() used and batchSize=1, java.lang.OutOfMemoryError:
Java heap space.  To me this indicates a memory leak since Spark
should simply be counting records of size  3MB
- Without persist(), stdin writer to Python finished early hangs the
application, unknown root cause

I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
debugging turned on.  This gives me the stacktrace on the new stdin
problem:

14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
at 
org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at 
org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
at 
org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
at 
org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)


On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Cool, thanks for the update. Have you tried running a branch with this fix 
 (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
 issue are you referring to, is it separate from this? (Couldn't find it 
 earlier in the thread.)

 To turn on debug logging, copy conf/log4j.properties.template to 
 conf/log4j.properties and change the line log4j.rootCategory=INFO, console to 
 log4j.rootCategory=DEBUG, console. Then make sure this file is present in 
 conf on all workers.

 BTW I've managed to run PySpark with this fix on some reasonably large S3 
 data (multiple GB) and it was fine. It might happen only if records are 
 large, or something like that. How much heap are you giving to your 
 executors, and does it show that much in the web UI?

 Matei

 On Mar 29, 2014, at 10:44 PM, Jim Blomo jim.bl...@gmail.com wrote:

 I think the problem I ran into in 0.9 is covered in
 https://issues.apache.org/jira/browse/SPARK-1323

 When I kill the python process, the stacktrace I gets indicates that
 this happens at initialization.  It looks like the initial write to
 the Python process does not go through, and then the iterator hangs
 waiting for output.  I haven't had luck turning on debugging for the
 executor process.  Still trying to learn the lgo4j properties that
 need to be set.

 No luck yet on tracking down the memory leak.

 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
 org.apache.spark.SparkException: Python worker exited

Re: pySpark memory usage

2014-04-09 Thread Jim Blomo
This dataset is uncompressed text at ~54GB. stats() returns (count:
56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
343)

On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Okay, thanks. Do you have any info on how large your records and data file 
 are? I'd like to reproduce and fix this.

 Matei

 On Apr 9, 2014, at 3:52 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Hi Matei, thanks for working with me to find these issues.

 To summarize, the issues I've seen are:
 0.9.0:
 - https://issues.apache.org/jira/browse/SPARK-1323

 SNAPSHOT 2014-03-18:
 - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
 Java heap space.  To me this indicates a memory leak since Spark
 should simply be counting records of size  3MB
 - Without persist(), stdin writer to Python finished early hangs the
 application, unknown root cause

 I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
 debugging turned on.  This gives me the stacktrace on the new stdin
 problem:

 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
 java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
at 
 sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at 
 org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
at 
 org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at 
 org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
at 
 org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
at 
 org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)


 On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Cool, thanks for the update. Have you tried running a branch with this fix 
 (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak 
 issue are you referring to, is it separate from this? (Couldn't find it 
 earlier in the thread.)

 To turn on debug logging, copy conf/log4j.properties.template to 
 conf/log4j.properties and change the line log4j.rootCategory=INFO, console 
 to log4j.rootCategory=DEBUG, console. Then make sure this file is present 
 in conf on all workers.

 BTW I've managed to run PySpark with this fix on some reasonably large S3 
 data (multiple GB) and it was fine. It might happen only if records are 
 large, or something like that. How much heap are you giving to your 
 executors, and does it show that much in the web UI?

 Matei

 On Mar 29, 2014, at 10:44 PM, Jim Blomo jim.bl...@gmail.com wrote:

 I think the problem I ran into in 0.9 is covered in
 https://issues.apache.org/jira/browse/SPARK-1323

 When I kill the python process, the stacktrace I gets indicates that
 this happens

Re: pySpark memory usage

2014-03-29 Thread Jim Blomo
I've only tried 0.9, in which I ran into the `stdin writer to Python
finished early` so frequently I wasn't able to load even a 1GB file.
Let me know if I can provide any other info!

On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll 
 try to look into these, seems like a serious error.

 Matei

 On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Matei.  I am running Spark 1.0.0-SNAPSHOT built for Hadoop
 1.0.4 from GitHub on 2014-03-18.

 I tried batchSizes of 512, 10, and 1 and each got me further but none
 have succeeded.

 I can get this to work -- with manual interventions -- if I omit
 `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
 of the 175 executors hung, and I had to kill the python process to get
 things going again.  The only indication of this in the logs was `INFO
 python.PythonRDD: stdin writer to Python finished early`.

 With batchSize=1 and persist, a new memory error came up in several
 tasks, before the app was failed:

 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
 thread Thread[stdin writer for python,5,main]
 java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

 There are other exceptions, but I think they all stem from the above,
 eg. org.apache.spark.SparkException: Error sending message to
 BlockManagerMaster

 Let me know if there are other settings I should try, or if I should
 try a newer snapshot.

 Thanks again!


 On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim,

 In Spark 0.9 we added a batchSize parameter to PySpark that makes it 
 group multiple objects together before passing them between Java and 
 Python, but this may be too high by default. Try passing batchSize=10 to 
 your SparkContext constructor to lower it (the default is 1024). Or even 
 batchSize=1 to match earlier versions.

 Matei

 On Mar 21, 2014, at 6:18 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Hi all, I'm wondering if there's any settings I can use to reduce the
 memory needed by the PythonRDD when computing simple stats.  I am
 getting OutOfMemoryError exceptions while calculating count() on big,
 but not absurd, records.  It seems like PythonRDD is trying to keep
 too many of these records in memory, when all that is needed is to
 stream through them and count.  Any tips for getting through this
 workload?


 Code:
 session = sc.textFile('s3://...json.gz') # ~54GB of compressed data

 # the biggest individual text line is ~3MB
 parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s):
 (loads(y), loads(s)))
 parsed.persist(StorageLevel.MEMORY_AND_DISK)

 parsed.count()
 # will never finish: executor.Executor: Uncaught exception will FAIL
 all executors

 Incidentally the whole app appears to be killed, but this error is not
 propagated to the shell.

 Cluster:
 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)

 Exception:
 java.lang.OutOfMemoryError: Java heap space
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
   at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
   at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
   at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)




Re: pySpark memory usage

2014-03-29 Thread Jim Blomo
I think the problem I ran into in 0.9 is covered in
https://issues.apache.org/jira/browse/SPARK-1323

When I kill the python process, the stacktrace I gets indicates that
this happens at initialization.  It looks like the initial write to
the Python process does not go through, and then the iterator hangs
waiting for output.  I haven't had luck turning on debugging for the
executor process.  Still trying to learn the lgo4j properties that
need to be set.

No luck yet on tracking down the memory leak.

14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:52)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)


On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo jim.bl...@gmail.com wrote:
 I've only tried 0.9, in which I ran into the `stdin writer to Python
 finished early` so frequently I wasn't able to load even a 1GB file.
 Let me know if I can provide any other info!

 On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 I see, did this also fail with previous versions of Spark (0.9 or 0.8)? 
 We'll try to look into these, seems like a serious error.

 Matei

 On Mar 27, 2014, at 7:27 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Matei.  I am running Spark 1.0.0-SNAPSHOT built for Hadoop
 1.0.4 from GitHub on 2014-03-18.

 I tried batchSizes of 512, 10, and 1 and each got me further but none
 have succeeded.

 I can get this to work -- with manual interventions -- if I omit
 `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
 of the 175 executors hung, and I had to kill the python process to get
 things going again.  The only indication of this in the logs was `INFO
 python.PythonRDD: stdin writer to Python finished early`.

 With batchSize=1 and persist, a new memory error came up in several
 tasks, before the app was failed:

 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
 thread Thread[stdin writer for python,5,main]
 java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

 There are other exceptions, but I think they all stem from the above,
 eg. org.apache.spark.SparkException: Error sending message to
 BlockManagerMaster

 Let me know if there are other settings I should try, or if I should
 try a newer snapshot.

 Thanks again!


 On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim,

 In Spark 0.9 we added a batchSize parameter to PySpark that makes it 
 group multiple objects together before passing them between

pySpark memory usage

2014-03-21 Thread Jim Blomo
Hi all, I'm wondering if there's any settings I can use to reduce the
memory needed by the PythonRDD when computing simple stats.  I am
getting OutOfMemoryError exceptions while calculating count() on big,
but not absurd, records.  It seems like PythonRDD is trying to keep
too many of these records in memory, when all that is needed is to
stream through them and count.  Any tips for getting through this
workload?


Code:
session = sc.textFile('s3://...json.gz') # ~54GB of compressed data

# the biggest individual text line is ~3MB
parsed = session.map(lambda l: l.split(\t,1)).map(lambda (y,s):
(loads(y), loads(s)))
parsed.persist(StorageLevel.MEMORY_AND_DISK)

parsed.count()
# will never finish: executor.Executor: Uncaught exception will FAIL
all executors

Incidentally the whole app appears to be killed, but this error is not
propagated to the shell.

Cluster:
15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)

Exception:
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)