Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
I got it working. It's much faster.

If someone else wants to try it I:
1) Was already using the code from the Presto S3 Hadoop FileSystem
implementation modified to sever it from the rest of the Presto codebase.
2) I extended it and overrode the method "keyFromPath" so that anytime the
Path referred to a "_temporary" parquet file "part" it returned a "key" to
the final location of the file.
3) I registered the filesystem through sparkContext.hadoopConfiguration by
setting fs.s3.impl, fs.s3n.impl, and fs.s3a.impl.

I realize I'm risking a file corruption but it's WY faster than it was.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
). 

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations
. FWIW, it doesn't. I'm going to modify that experiment to override the key
name like before except actually mode the file, keep track of the state, and
override the rename method.

The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet. 2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver. But it should do until there's a better
solution in the Hadoop committer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-20 Thread Jim Carroll
Thanks. In the meantime I might just write a custom file system that maps
writes to parquet file parts to their final locations and then skips the
move. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing files to s3 with out temporary directory

2017-11-20 Thread Jim Carroll
I have this exact issue. I was going to intercept the call in the filesystem
if I had to (since we're using the S3 filesystem from Presto anyway) but if
there's simply a way to do this correctly I'd much prefer it. This basically
doubles the time to write parquet files to s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark on mesos in docker not getting parameters

2016-08-09 Thread Jim Carroll
I'm running spark 2.0.0 on Mesos using spark.mesos.executor.docker.image to
point to a docker container that I built with the Spark installation.

Everything is working except the Spark client process that's started inside
the container doesn't get any of my parameters I set in the spark config in
the driver.

I set spark.executor.extraJavaOptions and spark.executor.extraClassPath in
the driver and they don't get passed all the way through. Here is a capture
of the chain of processes that are started on the mesos slave, in the docker
container:

root  1064  1051  0 12:46 ?00:00:00 docker -H
unix:///var/run/docker.sock run --cpu-shares 8192 --memory 4723834880 -e
SPARK_CLASSPATH=[path to my jar] -e SPARK_EXECUTOR_OPTS=
-Daws.accessKeyId=[myid] -Daws.secretKey=[mykey] -e SPARK_USER=root -e
SPARK_EXECUTOR_MEMORY=4096m -e MESOS_SANDBOX=/mnt/mesos/sandbox -e
MESOS_CONTAINER_NAME=mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605
-v
/tmp/mesos/slaves/90e2c720-1e45-4dbc-8271-f0c47a33032a-S0/frameworks/f5794f8a-b56f-4958-b906-f05c426dcef0-0001/executors/0/runs/772f8080-6278-4a35-9e57-0009787ac605:/mnt/mesos/sandbox
--net host --entrypoint /bin/sh --name
mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605
[my docker image] -c  "/opt/spark/./bin/spark-class"
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1193  1175  0 12:46 ?00:00:00 /bin/sh -c 
"/opt/spark/./bin/spark-class"
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1208  1193  0 12:46 ?00:00:00 bash
/opt/spark/./bin/spark-class
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1213  1208  0 12:46 ?00:00:00 bash
/opt/spark/./bin/spark-class
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

root  1215  1213  0 12:46 ?00:00:00
/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx128m -cp /opt/spark/jars/*
org.apache.spark.launcher.Main
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0
--hostname 192.168.10.145 --cores 8 --app-id
f5794f8a-b56f-4958-b906-f05c426dcef0-0001

Notice, in the initial process started by mesos both the SPARK_CLASSPATH is
set to the value of spark.executor.extraClassPath and the -D options are set
as I set them on spark.executor.extraJavaOptions (in this case, to my aws
creds) in the drive configuration.

However, they are missing in subsequent child processes and the final java
process started doesn't contain them either.

I "fixed" the classpath problem by putting my jar in /opt/spark/jars
(/opt/spark is the location I have spark installed in the docker container).

Can someone tell me what I'm missing?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-mesos-in-docker-not-getting-parameters-tp27500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Non-classification neural networks

2016-03-27 Thread Jim Carroll
Hello all,

We were using the old "Artificial Neural Network" :
https://github.com/apache/spark/pull/1290

This code appears to have been incorporated in 1.5.2 but it's only exposed
publicly via the MultilayerPerceptronClassifier. Is there a way to use the
old feedforward/backprop non-classification functionality? It appears to be
buried in private classes and it's not obvious to me if the
MultilayerPerceptronClassifier can be used without the classification. The
doc says "Number of outputs has to be equal to the total number of labels." 

What if the output is continuous and you want to simply do prediction?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Non-classification-neural-networks-tp26604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: EOFException using KryoSerializer

2015-06-24 Thread Jim Carroll
I finally got back to this and I just wanted to let anyone that runs into
this know that the problem is a kryo version issue. Spark (at least 1.4.0)
depends on Kryo 2.21 while my client had 2.24.0 on the classpath. Changing
it to 2.21 fixed the problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948p23479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



EOFException using KryoSerializer

2015-05-19 Thread Jim Carroll
I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
run the exact same code with master set to local[N] I have no problem:

 2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

KryoSerializer explicitly throws an EOFException. The comment says: 

// DeserializationStream uses the EOF exception to indicate stopping
condition.

Apparently this isn't what TorrentBroadcast expects.

Any suggestions? Thanks.

Jim





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 1.3 Hadoop File System problem

2015-03-25 Thread Jim Carroll
Thanks Patrick and Michael for your responses.

For anyone else that runs across this problem prior to 1.3.1 being released,
I've been pointed to this Jira ticket that's scheduled for 1.3.1:

https://issues.apache.org/jira/browse/SPARK-6351

Thanks again.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207p5.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



1.3 Hadoop File System problem

2015-03-24 Thread Jim Carroll

I have code that works under 1.2.1 but when I upgraded to 1.3.0 it fails to
find the s3 hadoop file system.

I get the java.lang.IllegalArgumentException: Wrong FS: s3://path to my
file], expected: file:/// when I try to save a parquet file. This worked in
1.2.1.

Has anyone else seen this?

I'm running spark using local[8] so it's all internal. These are actually
unit tests in our app that are failing now.

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet divide by zero

2015-01-28 Thread Jim Carroll
Hello all,

I've been hitting a divide by zero error in Parquet though Spark detailed
(and fixed) here: https://github.com/apache/incubator-parquet-mr/pull/102

Is anyone else hitting this error? I hit it frequently.

It looks like the Parquet team is preparing to release 1.6.0 and, since they
have been completely unresponsive, I'm assuming its going to go with this
bug (without the fix). Other than the fact that the divide by zero mistake
is obvious, perhaps the conditions it occurs are rare and I'm doing
something wrong.

Has anyone else hit this and if so, have they resolved it?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-divide-by-zero-tp21406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cluster Aware Custom RDD

2015-01-16 Thread Jim Carroll
Hello all,

I have a custom RDD for fast loading of data from a non-partitioned source.
The partitioning happens in the RDD implementation by pushing data from the
source into queues picked up by the current active partitions in worker
threads.

This works great on a multi-threaded single host (say with the manager set
to local[x] ) but I'd like to run it distributed. However, I need to know,
not only which slice my partition is, but also which host (by sequence)
it's on so I can divide up the source by worker (host) and then run the
multi-threaded. In other words, I need what effectively amounts to a 2-tier
slice identifier.

I know this is probably unorthodox, but is there some way to get this
information in the compute method or the deserialized Partition objects?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-Aware-Custom-RDD-tp21196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-18 Thread Jim Carroll
Hi,

This was all my fault. It turned out I had a line of code buried in a
library that did a repartition. I used this library to wrap an RDD to
present it to legacy code as a different interface. That's what was causing
the data to spill to disk.

The really stupid thing is it took me the better part of a day to find and
several misguided emails to this list (including the one that started this
thread).

Sorry about that.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20763.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Okay,

I have an rdd that I want to run an aggregate over but it insists on
spilling to disk even though I structured the processing to only require a
single pass.

In other words, I can do all of my processing one entry in the rdd at a time
without persisting anything.

I set rdd.persist(StorageLevel.NONE) and it had no affect. When I run
locally I get my /tmp directory filled with transient rdd data even though I
never need the data again after the row's been processed. Is there a way to
turn this off?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
In case a little more information is helpful:

the RDD is constructed using sc.textFile(fileUri) where the fileUri is to a
.gz file (that's too big to fit on my disk).

I do an rdd.persist(StorageLevel.NONE) and it seems to have no affect.

This rdd is what I'm calling aggregate on and I expect to only use it once.
Each row in the rdd never has to be revisited. The aggregate seqOp is
modifying a current state and returning it so there's no need to store the
results of the seqOp on a row-by-row basis, and give the fact that there's
one partition the comboOp doesn't even need to be called (since there would
be nothing to combine across partitions).

Thanks for any help.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20724.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Nvm. I'm going to post another question since this has to do with the way
spark handles sc.textFile with a file://.gz



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim Carroll
Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a transformation and not
an action (if the analogy is apt). There is no need to fully create and
fill out an intermediate RDD with the expanded data when it can be done one
row at a time.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll

I've been trying to figure out how to use Spark to do a simple aggregation
without reparitioning and essentially creating fully instantiated
intermediate RDDs and it seem virtually impossible.

I've now gone as far as writing my own single parition RDD that wraps an
Iterator[String] and calling aggregate() on it. Before any of my aggregation
code executes the entire Iterator is unwound and multiple partitions are
created to be given to my aggregation.

The Task execution call stack includes:
   ShuffleMap.runTask
   SortShuffleWriter.write
   ExternalSorter.insertAll
  ... which is iterating over my entire RDD and repartitioning it an
SpillFile collecting it. 

How do I prevent this from happening? There's no need to do this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll
Wow. i just realized what was happening and it's all my fault. I have a
library method that I wrote that presents the RDD and I was actually
repartitioning it myself.

I feel pretty dumb. Sorry about that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732p20735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Standard SQL tool access to SchemaRDD

2014-12-02 Thread Jim Carroll
Hello all,

Is there a way to load an RDD in a small driver app and connect with a JDBC
client and issue SQL queries against it? It seems the thrift server only
works with pre-existing Hive tables.

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Standard SQL tool access to SchemaRDD

2014-12-02 Thread Jim Carroll

Thanks! I'll give it a try.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197p20202.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
How do I set the log level when running local[n]? It ignores the
log4j.properties file on my classpath.

I also tried to set the spark home dir on the SparkConfig using setSparkHome
and made sure an appropriate log4j.properties file was in a conf
subdirectory and that didn't work either.

I'm running with the current master.

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
Actually, it looks like it's Parquet logging that I don't have control over.

For some reason the parquet project decided to use java.util logging with
its own logging configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

I'm running a local spark master (local[n]).

I cannot seem to turn off the parquet logging. I tried:

1) Setting a log4j.properties on the classpath.
2) Setting a log4j.properties file in a spark install conf directory and
pointing to the install using setSparkHome
3) Editing the log4j-default.properties file in the spark-core jar that I'm
using
4) Changing the JAVA_HOME/jre/lib/logging.properties (since Parquet uses
java.util.logging)
5) adding the following code as the first lines in my main:

  java.util.logging.Logger.getLogger(parquet).addHandler(
  new java.util.logging.Handler() {
def close(): Unit = {}
def flush(): Unit = {}
def publish(x: java.util.logging.LogRecord): Unit = {}
  })

NOTHING seems to change the default log level console output in parquet when
it runs in a worker.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

This is a problem because (other than the fact that Parquet uses
java.util.logging) of a bug in Spark in the current master.

ParquetRelation.scala attempts to override the parquet logger but, at least
currently (and if your application simply reads a parquet file before it
does anything else with Parquet), the parquet.Log class hasn't been loaded
yet. Therefore the code in ParquetRelation.enableLogForwarding has no
affect. If you look at the code in parquet.Log there's a static initializer
that needs to be called prior to enableLogForwarding or whatever
enableLogForwarding does gets undone by this static initializer.

Adding: Class.forName(parquet.Log) as the first thing in my main fixed the
problem.

The fix would be to force the static initializer to get called in
parquet.Log as part of enableForwardLogging.

Anyone want a PR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Set worker log configuration when running local[n]

2014-11-14 Thread Jim Carroll
Just to be complete, this is a problem in Spark that I worked around and
detailed here:
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-td18955.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I turn off Parquet logging in a worker?

2014-11-14 Thread Jim Carroll

Jira: https://issues.apache.org/jira/browse/SPARK-4412
PR: https://github.com/apache/spark/pull/3271




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Wildly varying aggregate performance depending on code location

2014-11-12 Thread Jim Carroll
Hello all,

I have a really strange thing going on.

I have a test data set with 500K lines in a gzipped csv file.

I have an array of column processors, one for each column in the dataset.
A Processor tracks aggregate state and has a method process(v : String)

I'm calling:

  val processors: Array[Processors] = 

  sc.textFile(gzippedFileName).aggregate(processors,
{ (curState, row) =
row.split(,, -1).zipWithIndex.foreach({
  v = curState(v._2).process(v._1)
})
curState
} )

If the class definition for the Processors is in the same file as the driver
it runs in ~23 seconds. If I move the classes to a separate file in the same
package without ANY OTHER CHANGES it goes to ~35 seconds.

This doesn't make any sense to me. I can't even understand how the compiled
class files could be any different in either case.

Does anyone have an explanation for why this might be?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Wildly varying aggregate performance depending on code location

2014-11-12 Thread Jim Carroll

Well it looks like this is a scala problem after all. I loaded the file
using pure scala and ran the exact same Processors without Spark and I got
20 seconds (with the code in the same file as the 'main') vs 30 seconds
(with the exact same code in a different file) on the 500K rows.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752p18772.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ending a job early

2014-10-28 Thread Jim Carroll

We have some very large datasets where the calculation converge on a result.
Our current implementation allows us to track how quickly the calculations
are converging and end the processing early. This can significantly speed up
some of our processing.

Is there a way to do the same thing is spark?

A trivial example might be a column average on a dataset. As we're
'aggregating' rows into columnar averages I can track how fast these
averages are moving and decide to stop after a low percentage of the rows
has been processed, producing an estimate rather than an exact value.

Within a partition, or better yet, within a worker across 'reduce' steps, is
there a way to stop all of the aggregations and just continue on with
reduces of already processed data?

Thanks
JIm




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ending-a-job-early-tp17505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Network requirements between Driver, Master, and Slave

2014-09-12 Thread Jim Carroll
Hi Akhil,

Thanks! I guess in short that means the master (or slaves?) connect back to
the driver. This seems like a really odd way to work given the driver needs
to already connect to the master on port 7077. I would have thought that if
the driver could initiate a connection to the master, that would be all
that's required.

Can you describe what it is about the architecture that requires the master
to connect back to the driver even when the driver initiates a connection to
the master? Just curious.

Thanks anyway.
Jim
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997p14086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Network requirements between Driver, Master, and Slave

2014-09-11 Thread Jim Carroll
Hello all,

I'm trying to run a Driver on my local network with a deployment on EC2 and
it's not working. I was wondering if either the master or slave instances
(in standalone) connect back to the driver program.

I outlined the details of my observations in a previous post but here is
what I'm seeing:

I have v1.1.0 installed (the new tag) on ec2 using the spark-ec2 script.
I have the same version of the code built locally.
I edited the master security group to allow inbound access from anywhere to
7077 and 8080.
I see a connection take place.
I see the workers fail with a timeout when any job is run.
The master eventually removes the driver's job.

I supposed this makes sense if there's a requirement for either the worker
or the master to be on the same network as the driver. Is that the case?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
My apologies to the list. I replied to Manu's question and it went directly
to him rather than the list.

In case anyone else has this issue here is my reply and Manu's reply to me.
This also answers Ian's question.

---

Hi Manu,

The dataset is 7.5 million rows and 500 columns. In parquet form it's about
1.1 Gig. It was created with Spark and copied up to s3. It has about 4600
parts (which I'd also like to gain some control over). I can try a smaller
dataset, however it works when I run it locally, even with the file out on
s3. It just takes a while.

I can try copying it to HDFS first but that wont help longer term.

Thanks
Jim

-
Manu's response:
-

I am pretty sure it is due to the number of parts you have.. I have a
parquet data set that is 250M rows  and 924 columns and it is ~2500 files... 

I recommend creating a tables in HIve with that data set and doing an insert
overwrite so you can get a data set with more manageable files..

Why I think its the number of files is that I believe that a all of those or
large part of those files are read when you run sqlContext.parquetFile() and
the time it would take in s3 for that to happen is a lot so something
internally is timing out..

-Manu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll

Why I think its the number of files is that I believe that a
 all of those or large part of those files are read when 
you run sqlContext.parquetFile() and the time it would 
take in s3 for that to happen is a lot so something 
internally is timing out.. 

I'll create the parquet files with Drill instead of Spark which will give me
(somewhat) better control over the slice sizes and see what happens.

That said, this behavior seems wrong to me. First, exiting due to inactivity
on a job seems like (perhaps?) the wrong fix to a former problem.  Second,
there IS activity if it's reading the slice headers but the job is exiting
anyway. So if this fixes the problem the measure of activity seems wrong.

Ian and Manu, thanks for your help. I'll post back and let you know if that
fixes it.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
Okay,

This seems to be either a code version issue or a communication issue. It
works if I execute the spark shell from the master node. It doesn't work if
I run it from my laptop and connect to the master node. 

I had opened the ports for the WebUI (8080) and the cluster manager (7077)
for the master node or it fails much sooner. Do I need to open up the ports
for the workers as well?

I used the spark-ec2 install script with --spark-version using both 1.0.2
and then again with the git hash tag that corresponds to 1.1.0rc4
(2f9b2bd7844ee8393dc9c319f4fefedf95f5e460). In both cases I rebuilt from
source using the same codebase on my machine and moved the entire project
into /root/spark (since to run the spark-shell it needs to match the same
path as the install on ec2). Could I have missed something here?

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Querying a parquet file in s3 with an ec2 install

2014-09-08 Thread Jim Carroll
Hello all,

I've been wrestling with this problem all day and any suggestions would be
greatly appreciated.

I'm trying to test reading a parquet file that's stored in s3 using a spark
cluster deployed on ec2. The following works in the spark shell when run
completely locally on my own machine (i.e. no --master option passed to the
spark-shell command):

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val p = parquetFile(s3n://[bucket]/path-to-parquet-dir/)
p.registerAsTable(s)
sql(select count(*) from s).collect

I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4) using
the standalone cluster manager and deployed with the spark-ec2 script. 

Running the same code in a spark shell connected to the cluster it basically
hangs on the select statement. The workers/slaves simply time out and
restart every 30 seconds when they hit what appears to be an activity
timeout, as if there's no activity from the spark-shell (based on what I see
in the stderr logs for the job, I assume this is expected behavior when
connected from a spark-shell that's sitting idle).

I see these messages about every 30 seconds:

14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated:
app-20140908213842-0002/7 is now EXITED (Command exited with code 1)
14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor
app-20140908213842-0002/7 removed: Command exited with code 1
14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added:
app-20140908213842-0002/8 on
worker-20140908183422-ip-10-60-107-194.ec2.internal-53445
(ip-10-60-107-194.ec2.internal:53445) with 2 cores
14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445
with 2 cores, 4.0 GB RAM
14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated:
app-20140908213842-0002/8 is now RUNNING

Eventually it fails with a: 

14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated:
app-20140908213842-0002/9 is now EXITED (Command exited with code 1)
14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor
app-20140908213842-0002/9 removed: Command exited with code 1
14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: Master removed our application: FAILED
14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
have all completed, from pool 
14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1
14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at
SparkPlan.scala:85
14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at
http://192.168.10.198:4040
14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler
14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all
executors
14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down
14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down
org.apache.spark.SparkException: Job aborted due to stage failure: Master
removed our application: FAILED
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at

Using Spark to add data to an existing Parquet file without a schema

2014-09-04 Thread Jim Carroll
Hello all,

I've been trying to figure out how to add data to an existing Parquet file
without having a schema. Spark has allowed me to load JSON and save it as a
Parquet file but I was wondering if anyone knows how to ADD/INSERT more
data. 

I tried using sql insert and that doesn't work. All of the examples assume a
schema exists in the form of a serialization IDL and generated classes.

I looked into the code and considered direct use of InsertIntoParquetTable
or a copy of it but I was hoping someone already solved the problem.

Any guidance would be greatly appreciated.

Thanks
Jim






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Spark to add data to an existing Parquet file without a schema

2014-09-04 Thread Jim Carroll
Okay,

Obviously I don't care about adding more files to the system so is there a
way to point to an existing parquet file (directory) and seed the individual
part-r-***.parquet (the value of partition + offset) while preventing 

I mean, I can hack it by copying files into the same parquet directory and
managing the file names externally but this seems like a work around. Is
that the way others are doing it?

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450p13499.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Continuously running non-streaming jobs

2014-04-17 Thread Jim Carroll
Is there a way to create continuously-running, or at least
continuously-loaded, jobs that can be 'invoked' rather than 'sent' to to
avoid the job creation overhead of a couple seconds?

I read through the following:
http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Continuously running non-streaming jobs

2014-04-17 Thread Jim Carroll
Daniel,

I'm new to Spark but I thought that thread hinted at the right answer. 

Thanks,
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391p4397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.