Hadoop 2.3 Centralized Cache vs RDD

2014-05-16 Thread William Kang
Hi,
Any comments or thoughts on the implications of the newly released feature
from Hadoop 2.3 on the centralized cache? How different it is from RDD?

Many thanks.


Cao


Re: maven for building scala simple program

2014-05-16 Thread Laeeq Ahmed
Hi Ryan,

It worked like a charm. Much appreciated.

Laeeq.


On Wednesday, May 7, 2014 1:30 AM, Ryan Compton  wrote:
 
I've been using this (you'll need maven 3).

http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    4.0.0

    com.mycompany.app
    my-app
    1.0-SNAPSHOT
    jar

    my-app
    http://maven.apache.org

    
        1.6
        1.6
        UTF-8
        2.10.4
    

    
        
            
                
                    net.alchim31.maven
                    scala-maven-plugin
                    3.1.5
                
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    2.0.2
                
            
        

        

            
                net.alchim31.maven
                scala-maven-plugin
                
                    
                        scala-compile-first
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        scala-test-compile
                        process-test-resources
                        
                            testCompile
                        
                    
                
            

            
            
                maven-assembly-plugin
                2.4
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            

        
    

    
        
            org.scala-lang
            scala-library
            ${scala.version}
        
    






On Tue, May 6, 2014 at 4:10 PM, Laeeq Ahmed  wrote:
> Hi all,
>
> If anyone is using maven for building scala classes with all dependencies
> for spark, please provide a sample pom.xml here. I have having trouble using
> maven for scala simple job though it was working properly for java. I have
> added scala maven plugin but still getting some issues.
>
> Laeeq

Benchmarking Spark with YCSB

2014-05-16 Thread bhusted
Can anyone comment on what it would take to run Spark with YCSB and HBase for
benchmarking?  Has this been done before or been reviewed?  If not, our
company is looking to make an investment to perform the development
necessary but we are looking for any technical insight on what it would take
to make this happen.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Spark-with-YCSB-tp5813.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark GCE Script

2014-05-16 Thread Akhil Das
Hi

I have sent a pull request https://github.com/apache/spark/pull/681 you can
verify it and add it :)


Thanks
Best Regards


On Thu, May 8, 2014 at 2:58 AM, Aureliano Buendia wrote:

> Please send a pull request, this should be maintained by the community,
> just in case you do not feel like continuing to maintain it.
>
> Also, nice to see that the gce version is shorter than the aws version.
>
>
> On Tue, May 6, 2014 at 10:11 AM, Akhil Das wrote:
>
>> Hi Matei,
>>
>> Will clean up the code a little bit and send the pull request :)
>>
>> Thanks
>> Best Regards
>>
>>
>> On Tue, May 6, 2014 at 1:00 AM, François Le lay  wrote:
>>
>>> Has anyone considered using jclouds tooling to support multiple cloud
>>> providers? Maybe using Pallet?
>>>
>>> François
>>>
>>> On May 5, 2014, at 3:22 PM, Nicholas Chammas 
>>> wrote:
>>>
>>> I second this motion. :)
>>>
>>> A unified "cloud deployment" tool would be absolutely great.
>>>
>>>
>>> On Mon, May 5, 2014 at 1:34 PM, Matei Zaharia 
>>> wrote:
>>>
 Very cool! Have you thought about sending this as a pull request? We’d
 be happy to maintain it inside Spark, though it might be interesting to
 find a single Python package that can manage clusters across both EC2 and
 GCE.

 Matei

 On May 5, 2014, at 7:18 AM, Akhil Das 
 wrote:

 Hi Sparkers,

 We have created a quick spark_gce script which can launch a spark
 cluster in the Google Cloud. I'm sharing it because it might be helpful for
 someone using the Google Cloud for deployment rather than AWS.

 Here's the link to the script

 https://github.com/sigmoidanalytics/spark_gce

 Feel free to use it and suggest any feedback around it.

 In short here's what it does:

 Just like the spark_ec2 script, this one also reads certain
 command-line arguments (See the github 
 page for
 more details) like the cluster name and all, then starts the machines in
 the google cloud, sets up the network, adds a 500GB empty disk to all
 machines, generate the ssh keys on master and transfer it to all slaves and
 install java and downloads and configures Spark/Shark/Hadoop. Also it
 starts the shark server automatically. Currently the version is 0.9.1 but
 I'm happy to add/support more versions if anyone is interested.


 Cheers.


 Thanks
 Best Regards



>>>
>>
>


cant get tests to pass anymore on master master

2014-05-16 Thread Koert Kuipers
i used to be able to get all tests to pass.

with java 6 and sbt i get PermGen errors (no matter how high i make the
PermGen). so i have given up on that.

with java 7 i see 1 error in a bagel test and a few in streaming tests. any
ideas? see the error in BagelSuite below.

[info] - large number of iterations *** FAILED *** (10 seconds, 105
milliseconds)
[info]   The code passed to failAfter did not complete within 10 seconds.
(BagelSuite.scala:85)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at
org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
[info]   at
org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
[info]   at
org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282)
[info]   at
org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246)
[info]   at org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32)
[info]   at
org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85)
[info]   at
org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
[info]   at
org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
[info]   at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1974)
[info]   at
org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)


Re: Spark workers keep getting disconnected(Keep dying) from the cluster.

2014-05-16 Thread akeed
Got the same experience over here. 0.9.1 (not from github, from official
download page), running hadoop 2.2.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-workers-keep-getting-disconnected-Keep-dying-from-the-cluster-tp5740p5747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error starting EC2 cluster

2014-05-16 Thread Aliaksei Litouka
Well... the reason was an out-of-date version of Python (2.6.6) on the
machine where I ran the script. If anyone else experiences this issue -
just update your Python.


On Sun, May 4, 2014 at 7:51 PM, Aliaksei Litouka  wrote:

> I am using Spark 0.9.1. When I'm trying to start a EC2 cluster with the
> spark-ec2 script, an error occurs and the following message is issued:
> AttributeError: 'module' object has no attribute 'check_output'. By this
> time, EC2 instances are up and running but Spark doesn't seem to be
> installed on them. Any ideas how to fix it?
>
> $ ./spark-ec2 -k my_key -i /home/alitouka/my_key.pem -s 1
> --region=us-east-1 --instance-type=m3.medium launch test_cluster
> Setting up security groups...
> Searching for existing cluster test_cluster...
> Don't recognize m3.medium, assuming type is pvm
> Spark AMI: ami-5bb18832
> Launching instances...
> Launched 1 slaves in us-east-1c, regid = r-
> Launched master in us-east-1c, regid = r-
> Waiting for instances to start up...
> Waiting 120 more seconds...
> Generating cluster's SSH key on master...
> ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22:
> Connection refused
> Error executing remote command, retrying after 30 seconds: Command
> '['ssh', '-o', 'StrictHostKeyChecking=no', '-i',
> '/home/alitouka/my_key.pem', '-t', '-t',
> u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', "\n  [ -f
> ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
> &&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
> returned non-zero exit status 255
> ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22:
> Connection refused
> Error executing remote command, retrying after 30 seconds: Command
> '['ssh', '-o', 'StrictHostKeyChecking=no', '-i',
> '/home/alitouka/my_key.pem', '-t', '-t',
> u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', "\n  [ -f
> ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
> &&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
> returned non-zero exit status 255
> ssh: connect to host ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com port 22:
> Connection refused
> Error executing remote command, retrying after 30 seconds: Command
> '['ssh', '-o', 'StrictHostKeyChecking=no', '-i',
> '/home/alitouka/my_key.pem', '-t', '-t',
> u'r...@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com', "\n  [ -f
> ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
> &&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
> returned non-zero exit status 255
> Warning: Permanently added 
> 'ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com,54.227.205.82'
> (RSA) to the list of known hosts.
> Connection to ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com closed.
> Traceback (most recent call last):
>   File "./spark_ec2.py", line 806, in 
> main()
>   File "./spark_ec2.py", line 799, in main
> real_main()
>   File "./spark_ec2.py", line 684, in real_main
> setup_cluster(conn, master_nodes, slave_nodes, opts, True)
>   File "./spark_ec2.py", line 419, in setup_cluster
> dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
>   File "./spark_ec2.py", line 624, in ssh_read
> return subprocess.check_output(
> AttributeError: 'module' object has no attribute 'check_output'
>


Re: is Mesos falling out of favor?

2014-05-16 Thread Gerard Maas
By looking at your config, I think there's something wrong with your setup.
One of the key elements of Mesos is that you are abstracted from where the
execution of your task takes place. The SPARK_EXECUTOR_URI tells Mesos
where to find the 'framework' (in Mesos jargon) required to execute a job.
 (Actually, it tells the spark driver  to tell mesos where to download the
framework)
Your config looks like you are running some mix of Spark Cluster with
Mesos.

This is an example of a Spark job to run on Mesos:

Driver:

ADD_JARS=/.../job-jar-with-dependencies.jar SPARK_LOCAL_IP= java -cp
/.../spark-assembly.jar:/.../job-jar-with-dependencies.jar
-Dconfig.file=job-config.conf com.example.jobs.SparkJob

Config: job-config.conf contains this info on Mesos: (Note the Mesos URI is
constructed from this config
# 
# Mesos configuration
# 
mesos {
zookeeper = {zookeeper.ip}
executorUri  =
"hdfs://"${hdfs.nameNode.host}":"${hdfs.nameNode.port}"/spark/spark-0.9.0.1-bin.tar.gz"
master   {
host = {mesos-ip}
port = 5050
}
}

Probably this can still be improved as it's the result of some
trial-error-repeat, but it's working for us.

-greetz, Gerard



On Wed, May 7, 2014 at 7:43 PM, deric  wrote:

> I'm running 1.0.0 branch, finally I've managed to make it work. I'm using a
> Debian package which is distributed on all slave nodes. So, I've removed
> `SPARK_EXECUTOR_URI` and it works,  spark-env.sh looks like this:
>
> export MESOS_NATIVE_LIBRARY="/usr/local/lib/libmesos.so"
> export SCALA_HOME="/usr"
> export SCALA_LIBRARY_PATH="/usr/share/java"
> export MASTER="mesos://zk://192.168.1.1:2181/mesos"
> export SPARK_HOME="/usr/share/spark"
> export SPARK_LOCAL_IP="192.168.1.2"
> export SPARK_PRINT_LAUNCH_COMMAND="1"
> export CLASSPATH=$CLASSPATH:$SPARK_HOME/lib/
>
> scripts for Debian package are here (I'll try to add some documentation):
> https://github.com/deric/spark-deb-packaging
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5484.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


unsubscribe

2014-05-16 Thread eric perler

  

Re: Packaging a spark job using maven

2014-05-16 Thread Eugen Cepoi
Laurent the problem is that the reference.conf that is embedded in akka
jars is being overriden by some other conf. This happens when multiple
files have the same name.
I am using Spark with maven. In order to build the fat jar I use the shade
plugin and it works pretty well. The trick here is to use an
AppendingTransformer that will merge all the resource.conf into a single
one.

Try something like that:


org.apache.maven.plugins
maven-shade-plugin
2.1


package

shade


false

false



my.package.etc:*




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA





reference.conf








2014-05-14 15:37 GMT+02:00 Laurent T :

> Hi,
>
> Thanks François but this didn't change much. I'm not even sure what this
> reference.conf is. It isn't mentioned in any of spark documentation. Should
> i have one in my resources ?
>
> Thanks
> Laurent
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-tp5615p5707.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Spark workers keep getting disconnected(Keep dying) from the cluster.

2014-05-16 Thread Ravi Hemnani
Hey,

I am facing a weird issue. 

My spark workers keep dying every now and then and in the master logs i keep
on seeing following messages,

 14/05/14 10:09:24 WARN Master: Removing worker-20140514080546-x.x.x.x-50737
because we got no heartbeat in 60 seconds
14/05/14 14:18:41 WARN Master: Removing worker-20140514123848-x.x.x.x-50901
because we got no heartbeat in 60 seconds

In my cluster, I have one master node and four worker nodes. 

On the cluster i am trying to run shark and related queries. 

I tried setting the property, spark.worker.timeout=300 on all workers and
master but still it shows, 60 seconds timeout. 


After that, i keep seeing the following messages as well,

14/05/14 16:59:52 INFO Master: Removing app app-20140514164003-0009

On the worker nodes, in the work folder, i cant seem to find any suspicious
messages. 

Any help as to what is causing all this. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-workers-keep-getting-disconnected-Keep-dying-from-the-cluster-tp5740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark LIBLINEAR

2014-05-16 Thread DB Tsai
Hi Deb,

My co-worker fixed a owlqn bug in breeze, and it's important to have this
to converge to the correct result.

https://github.com/scalanlp/breeze/pull/247

You may want to use the snapshot of breeze to have this fix in.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 7:32 AM, Debasish Das wrote:

> Hi Professor Lin,
>
> On our internal datasets,  I am getting accuracy at par with glmnet-R for
> sparse feature selection from liblinear. The default mllib based gradient
> descent was way off. I did not tune learning rate but I run with varying
> lambda. Ths feature selection was weak.
>
> I used liblinear code. Next I will explore the distributed liblinear.
>
> Adding the code on github will definitely help for collaboration.
>
> I am experimenting if a bfgs / owlqn based sparse logistic in spark mllib
> give us accuracy at par with liblinear.
>
> If liblinear solver outperforms them (either accuracy/performance) we have
> to bring tron to mllib and let other algorithms benefit from it as well.
>
> We are using Bfgs and Owlqn solvers from breeze opt.
>
> Thanks.
> Deb
>  On May 12, 2014 9:07 PM, "DB Tsai"  wrote:
>
>> It seems that the code isn't managed in github. Can be downloaded from
>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip
>>
>> It will be easier to track the changes in github.
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng  wrote:
>>
>>> Hi Chieh-Yen,
>>>
>>> Great to see the Spark implementation of LIBLINEAR! We will definitely
>>> consider adding a wrapper in MLlib to support it. Is the source code
>>> on github?
>>>
>>> Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Sun, May 11, 2014 at 10:29 AM, Debasish Das 
>>> wrote:
>>> > Hello Prof. Lin,
>>> >
>>> > Awesome news ! I am curious if you have any benchmarks comparing C++
>>> MPI
>>> > with Scala Spark liblinear implementations...
>>> >
>>> > Is Spark Liblinear apache licensed or there are any specific
>>> restrictions on
>>> > using it ?
>>> >
>>> > Except using native blas libraries (which each user has to manage by
>>> pulling
>>> > in their best proprietary BLAS package), all Spark code is Apache
>>> licensed.
>>> >
>>> > Thanks.
>>> > Deb
>>> >
>>> >
>>> > On Sun, May 11, 2014 at 3:01 AM, DB Tsai  wrote:
>>> >>
>>> >> Dear Prof. Lin,
>>> >>
>>> >> Interesting! We had an implementation of L-BFGS in Spark and already
>>> >> merged in the upstream now.
>>> >>
>>> >> We read your paper comparing TRON and OWL-QN for logistic regression
>>> with
>>> >> L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems
>>> that it's
>>> >> not in the distributed setup.
>>> >>
>>> >> Will be very interesting to know the L2 logistic regression benchmark
>>> >> result in Spark with your TRON optimizer and the L-BFGS optimizer
>>> against
>>> >> different datasets (sparse, dense, and wide, etc).
>>> >>
>>> >> I'll try your TRON out soon.
>>> >>
>>> >>
>>> >> Sincerely,
>>> >>
>>> >> DB Tsai
>>> >> ---
>>> >> My Blog: https://www.dbtsai.com
>>> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>>> >>
>>> >>
>>> >> On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen >> >
>>> >> wrote:
>>> >>>
>>> >>> Dear all,
>>> >>>
>>> >>> Recently we released a distributed extension of LIBLINEAR at
>>> >>>
>>> >>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/
>>> >>>
>>> >>> Currently, TRON for logistic regression and L2-loss SVM is supported.
>>> >>> We provided both MPI and Spark implementations.
>>> >>> This is very preliminary so your comments are very welcome.
>>> >>>
>>> >>> Thanks,
>>> >>> Chieh-Yen
>>> >>
>>> >>
>>> >
>>>
>>
>>


Re: Stable Hadoop version supported ?

2014-05-16 Thread Sandy Ryza
Hi Soumya,

If you want to stick with CDH, CDH5.0 is the latest stable release.  If you
want to use an Apache release, I'd go with Hadoop 2.3.

-Sandy


On Wed, May 14, 2014 at 12:17 PM, Soumya Simanta
wrote:

> Currently I've HDFS with version hadoop0.20.2-cdh3u6 on Spark 0.9.1. I
> want to upgrade to Spark 1.0.0 soon and would also like to upgrade my HDFS
> version as well.
>
> What's the recommended version of HDFS to use with Spark 1.0.0? I don't
> know much about YARN but I would just like to use the Spark standalone
> cluster mode.
>
> Thanks
> -Soumya
>
>


Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-16 Thread joe schmoo
I
On May 5, 2014 1:12 PM, "Gerard Maas"  wrote:
>
> Hi all,
>
> I'm currently working on creating a set of docker images to facilitate
local development with Spark/streaming on Mesos (+zk, hdfs, kafka)
>
> After solving the initial hurdles to get things working together in
docker containers, now everything seems to start-up correctly and the mesos
UI shows slaves as they are started.
>
> I'm trying to submit a job from IntelliJ and the jobs submissions seem to
get lost in Mesos translation. The logs are not helping me to figure out
what's wrong, so I'm posting them here in the hope that they can ring a
bell and somebdoy could provide me a hint on what's wrong/missing with my
setup.
>
>
>  DRIVER (IntelliJ running a Job.scala main) 
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
SHUFFLE_BLOCK_MANAGER
> 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
1399319251962
> 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
than 1399319251962
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BROADCAST_VARS
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BLOCK_MANAGER
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
HTTP_BROADCAST
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
MAP_OUTPUT_TRACKER
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
SPARK_CONTEXT
>
>
>  MESOS MASTER 
> I0505 19:52:39.718080   388 master.cpp:690] Registering framework
201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
> I0505 19:52:39.718261   388 master.cpp:493] Framework
201405051517-67113388-5050-383-6995 disconnected
> I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
framework 201405051517-67113388-5050-383-6995
> I0505 19:52:39.718312   388 master.cpp:520] Giving framework
201405051517-67113388-5050-383-6995 0ns to failover
> I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
Deactivated framework 201405051517-67113388-5050-383-6995
> W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
offered to framework 201405051517-67113388-5050-383-6995 because the
framework has terminated or is inactive
> I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
removing framework 201405051517-67113388-5050-383-6995
>
>
>
>  MESOS SLAVE 
> I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
> W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6803
> I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
> W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6804
> I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
> W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6805
> I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
> W0505 19:49:30.66248916 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6806
>
>
> Thanks in advance,
>
> Gerard.


spark 0.9.1 textFile hdfs unknown host exception

2014-05-16 Thread Eugen Cepoi
Hi,

I have some strange behaviour when using textFile to read some data from
HDFS in spark 0.9.1.
I get UnknownHost exceptions,  where hadoop client tries to resolve the
dfs.nameservices and fails.

So far:
 - this has been tested inside the shell
 - the exact same code works with spark-0.8.1
 - the shell is launched with HADOOP_CONF_DIR pointing to our HA conf
 - if before that some other rdd is created from HDFS and succeeds than,
this works also (might be related in the way the default hadoop
configuration is being shared?)
 - if using the new MR API it works
   sc.newAPIHadoopFile(path, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text],
sc.hadoopConfiguration).map(_._2.toString)

Hadoop disitribution: 2.0.0-cdh4.1.2
Spark 0.9.1 - packaged with correct version of hadoop

Eugen


Efficient implementation of getting top 10 hashtags in last 5 mins window

2014-05-16 Thread nilmish
I wanted to know how can we efficiently get top 10 hashtags in last 5 mins
window. Currently I am using reduceByKeyAndWindow over 5 mins window and
then sorting to get top 10 hashtags. But it is taking a lot of time. How can
we do it efficiently ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-implementation-of-getting-top-10-hashtags-in-last-5-mins-window-tp5741.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: same log4j slf4j error in spark 9.1

2014-05-16 Thread Tathagata Das
Spark 0.9.1 does not depend on log4j-over-slf4j
(hereis
the SBT file for 0.9.1). Are
you sure that no other dependency in your project is bringing in dependency
in the classpath? Alternatively, if you dont want slf4j-log4j12 from spark,
you can safely exclude in the dependencies.

TD


On Thu, May 8, 2014 at 12:56 PM, Adrian Mocanu wrote:

>  I recall someone from the Spark team (TD?) saying that Spark 9.1 will
> change the logger and the circular loop error between slf4j and log4j
> wouldn’t show up.
>
>
>
> Yet on Spark 9.1 I still get
>
> SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the
> class path, preempting StackOverflowError.
>
> SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for
> more details.
>
>
>
> Any solutions?
>
>
>
> -Adrian
>
>
>


Re: Spark LIBLINEAR

2014-05-16 Thread Tom Vacek
I've done some comparisons with my own implementation of TRON on Spark.
 From a distributed computing perspective, it does 2x more local work per
iteration than LBFGS, so the parallel isoefficiency is improved slightly.
 I think the truncated Newton solver holds some potential because there
have been some recent work in preconditioners:
http://dx.doi.org/10.1016/j.amc.2014.03.006


On Wed, May 14, 2014 at 9:32 AM, Debasish Das wrote:

> Hi Professor Lin,
>
> On our internal datasets,  I am getting accuracy at par with glmnet-R for
> sparse feature selection from liblinear. The default mllib based gradient
> descent was way off. I did not tune learning rate but I run with varying
> lambda. Ths feature selection was weak.
>
> I used liblinear code. Next I will explore the distributed liblinear.
>
> Adding the code on github will definitely help for collaboration.
>
> I am experimenting if a bfgs / owlqn based sparse logistic in spark mllib
> give us accuracy at par with liblinear.
>
> If liblinear solver outperforms them (either accuracy/performance) we have
> to bring tron to mllib and let other algorithms benefit from it as well.
>
> We are using Bfgs and Owlqn solvers from breeze opt.
>
> Thanks.
> Deb
>  On May 12, 2014 9:07 PM, "DB Tsai"  wrote:
>
>> It seems that the code isn't managed in github. Can be downloaded from
>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/spark/spark-liblinear-1.94.zip
>>
>> It will be easier to track the changes in github.
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Mon, May 12, 2014 at 7:53 AM, Xiangrui Meng  wrote:
>>
>>> Hi Chieh-Yen,
>>>
>>> Great to see the Spark implementation of LIBLINEAR! We will definitely
>>> consider adding a wrapper in MLlib to support it. Is the source code
>>> on github?
>>>
>>> Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Sun, May 11, 2014 at 10:29 AM, Debasish Das 
>>> wrote:
>>> > Hello Prof. Lin,
>>> >
>>> > Awesome news ! I am curious if you have any benchmarks comparing C++
>>> MPI
>>> > with Scala Spark liblinear implementations...
>>> >
>>> > Is Spark Liblinear apache licensed or there are any specific
>>> restrictions on
>>> > using it ?
>>> >
>>> > Except using native blas libraries (which each user has to manage by
>>> pulling
>>> > in their best proprietary BLAS package), all Spark code is Apache
>>> licensed.
>>> >
>>> > Thanks.
>>> > Deb
>>> >
>>> >
>>> > On Sun, May 11, 2014 at 3:01 AM, DB Tsai  wrote:
>>> >>
>>> >> Dear Prof. Lin,
>>> >>
>>> >> Interesting! We had an implementation of L-BFGS in Spark and already
>>> >> merged in the upstream now.
>>> >>
>>> >> We read your paper comparing TRON and OWL-QN for logistic regression
>>> with
>>> >> L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems
>>> that it's
>>> >> not in the distributed setup.
>>> >>
>>> >> Will be very interesting to know the L2 logistic regression benchmark
>>> >> result in Spark with your TRON optimizer and the L-BFGS optimizer
>>> against
>>> >> different datasets (sparse, dense, and wide, etc).
>>> >>
>>> >> I'll try your TRON out soon.
>>> >>
>>> >>
>>> >> Sincerely,
>>> >>
>>> >> DB Tsai
>>> >> ---
>>> >> My Blog: https://www.dbtsai.com
>>> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>>> >>
>>> >>
>>> >> On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen >> >
>>> >> wrote:
>>> >>>
>>> >>> Dear all,
>>> >>>
>>> >>> Recently we released a distributed extension of LIBLINEAR at
>>> >>>
>>> >>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/
>>> >>>
>>> >>> Currently, TRON for logistic regression and L2-loss SVM is supported.
>>> >>> We provided both MPI and Spark implementations.
>>> >>> This is very preliminary so your comments are very welcome.
>>> >>>
>>> >>> Thanks,
>>> >>> Chieh-Yen
>>> >>
>>> >>
>>> >
>>>
>>
>>


Re: Using String Dataset for Logistic Regression

2014-05-16 Thread DB Tsai
You could also use dummy coding to convert categorical feature to
numeric feature.

http://en.wikipedia.org/wiki/Categorical_variable#Dummy_coding

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 10:37 PM, Xiangrui Meng  wrote:
> It depends on how you want to use the string features. For the day of
> the week, you can replace it with 6 binary features indicating
> Mon/Tue/Wed/Th/Fri/Sat. -Xiangrui
>
> On Fri, May 9, 2014 at 5:31 AM, praveshjain1991
>  wrote:
>> I have been trying to use LR in Spark's Java API. I used the dataset given
>> along with Spark for the training and testing purposes.
>>
>> Now i want to use it on another dataset that contains string values along
>> with numbers. Is there any way to do this?
>>
>> I am attaching the Dataset that i want to use.
>>
>> Thanks and Regards, Test.data
>> 
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest

2014-05-16 Thread Xiangrui Meng
Could you try `println(result.toDebugString())` right after `val
result = ...` and attach the result? -Xiangrui

On Fri, May 9, 2014 at 8:20 AM, phoenix bai  wrote:
> after a couple of tests, I find that, if I use:
>
> val result = model.predict(prdctpairs)
> result.map(x =>
> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>
> it always fails with above error and the exception seems iterative.
>
> but if I do:
>
> val result = model.predict(prdctpairs)
> result.cach()
> result.map(x =>
> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>
> it succeeds.
>
> could anyone help explain why the cach() is necessary?
>
> thanks
>
>
>
> On Fri, May 9, 2014 at 6:45 PM, phoenix bai  wrote:
>>
>> Hi all,
>>
>> My spark code is running on yarn-standalone.
>>
>> the last three lines of the code as below,
>>
>> val result = model.predict(prdctpairs)
>> result.map(x =>
>> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>> sc.stop()
>>
>> the same code, sometimes be able to run successfully and could give out
>> the right result, while from time to time, it throws StackOverflowError and
>> fail.
>>
>> and  I don`t have a clue how I should debug.
>>
>> below is the error, (the start and end portion to be exact):
>>
>>
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:43885
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:43885
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
>> MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:56767
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:56767
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:49879
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:49879
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6:
>> rx15.mc10.site.net (PROCESS_LOCAL)
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms
>> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID
>> 133 (task 946.0:4)
>> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was
>> due to java.lang.StackOverflowError
>> java.lang.StackOverflowError
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
>> at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>>
>> 
>>
>> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>> at
>> java.io.ObjectInputStream.re

count()-ing gz files gives java.io.IOException: incorrect header check

2014-05-16 Thread Nick Chammas
I’m trying to do a simple count() on a large number of GZipped files in S3.
My job is failing with the following message:

14/05/15 19:12:37 WARN scheduler.TaskSetManager: Loss was due to
java.io.IOException
java.io.IOException: incorrect header check
at 
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
at 
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:82)
at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76)
at java.io.InputStream.read(InputStream.java:101)



I traced this down to
HADOOP-5281,
but I’m not sure if 1) it’s the same issue, or 2) how to go about resolving
it.

I gather I need to update some Hadoop jar? Any tips on where to look/what
to do?

I’m running Spark on an EC2 cluster created by spark-ec2 with no special
options used.

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/count-ing-gz-files-gives-java-io-IOException-incorrect-header-check-tp5768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Equivalent of collect() on DStream

2014-05-16 Thread Sean Owen
Are you not just looking for the foreachRDD() method on DStream?
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

It gives you an RDD that you can do what you want with, including collect() it.

On Thu, May 15, 2014 at 5:33 AM, Stephen Boesch  wrote:
> Looking further it appears the functionality I am seeking is in the
> following private[spark]  class ForEachdStream
>
> (version 0.8.1 , yes we are presently using an older release..)
>
> private[streaming]
> class ForEachDStream[T: ClassManifest] (
> parent: DStream[T],
> foreachFunc: (RDD[T], Time) => Unit
>   ) extends DStream[Unit](parent.ssc) {
>
> I would like to have access to this structure - particularly the ability to
> define an "foreachFunc" that gets applied to each RDD within the DStream.
> Is there a means to do so?
>
>
>
> 2014-05-14 21:25 GMT-07:00 Stephen Boesch :
>>
>>
>> Given that collect() does not exist on DStream apparently my mental model
>> of Streaming RDD (DStream) needs correction/refinement.  So what is the
>> means to convert DStream data into a JVM in-memory representation.  All of
>> the methods on DStream i.e. filter, map, transform, reduce, etc generate
>> other DStream's, and not an in memory data structure.
>>
>>
>>
>


Re: Stable Hadoop version supported ?

2014-05-16 Thread Sean Owen
Although you need to compile it differently for different versions of
HDFS / Hadoop, as far as I know Spark continues to work with Hadoop
1.x (and probably older 0.20.x as a result -- your experience is an
existence proof.) And it works with the newest Hadoop 2.4.x, again
with the appropriate build settings.

I think the default answer is to upgrade all the way to the newest
Hadoop / HDFS unless you have a reason you can't.

On Wed, May 14, 2014 at 8:17 PM, Soumya Simanta
 wrote:
> Currently I've HDFS with version hadoop0.20.2-cdh3u6 on Spark 0.9.1. I want
> to upgrade to Spark 1.0.0 soon and would also like to upgrade my HDFS
> version as well.
>
> What's the recommended version of HDFS to use with Spark 1.0.0? I don't know
> much about YARN but I would just like to use the Spark standalone cluster
> mode.
>
> Thanks
> -Soumya
>


Calling external classes added by sc.addJar needs to be through reflection

2014-05-16 Thread DB Tsai
Finally find a way out of the ClassLoader maze! It took me some times to
understand how it works; I think it worths to document it in a separated
thread.

We're trying to add external utility.jar which contains CSVRecordParser,
and we added the jar to executors through sc.addJar APIs.

If the instance of CSVRecordParser is created without reflection, it
raises *ClassNotFound
Exception*.

data.mapPartitions(lines => {
val csvParser = new CSVRecordParser((delimiter.charAt(0))
lines.foreach(line => {
  val lineElems = csvParser.parseLine(line)
})
...
...
 )


If the instance of CSVRecordParser is created through reflection, it works.

data.mapPartitions(lines => {
val loader = Thread.currentThread.getContextClassLoader
val CSVRecordParser =
loader.loadClass("com.alpine.hadoop.ext.CSVRecordParser")

val csvParser = CSVRecordParser.getConstructor(Character.TYPE)
.newInstance(delimiter.charAt(0).asInstanceOf[Character])

val parseLine = CSVRecordParser
.getDeclaredMethod("parseLine", classOf[String])

lines.foreach(line => {
   val lineElems = parseLine.invoke(csvParser,
line).asInstanceOf[Array[String]]
})
...
...
 )


This is identical to this question,
http://stackoverflow.com/questions/7452411/thread-currentthread-setcontextclassloader-without-using-reflection

It's not intuitive for users to load external classes through reflection,
but couple available solutions including 1) messing around
systemClassLoader by calling systemClassLoader.addURI through reflection or
2) forking another JVM to add jars into classpath before bootstrap loader
are very tricky.

Any thought on fixing it properly?

@Xiangrui,
netlib-java jniloader is loaded from netlib-java through reflection, so
this problem will not be seen.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


Re: Schema view of HadoopRDD

2014-05-16 Thread Flavio Pompermaier
Is there any Spark plugin/add-on that facilitate the query to a JSON
content?

Best,
Flavio

On Thu, May 15, 2014 at 6:53 PM, Michael Armbrust wrote:

> Here is a link with more info:
> http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html
>
>
> On Wed, May 7, 2014 at 10:09 PM, Debasish Das wrote:
>
>> Hi,
>>
>> For each line that we read as textLine from HDFS, we have a schema..if
>> there is an API that takes the schema as List[Symbol] and maps each token
>> to the Symbol it will be helpful...
>>
>> Does RDDs provide a schema view of the dataset on HDFS ?
>>
>> Thanks.
>> Deb
>>
>


Re: Equivalent of collect() on DStream

2014-05-16 Thread Tathagata Das
Doesnt DStream.foreach() suffice?

anyDStream.foreach { rdd =>
   // do something with rdd
}



On Wed, May 14, 2014 at 9:33 PM, Stephen Boesch  wrote:

> Looking further it appears the functionality I am seeking is in the
> following *private[spark] * class ForEachdStream
>
> (version 0.8.1 , yes we are presently using an older release..)
>
> private[streaming]
> class ForEachDStream[T: ClassManifest] (
> parent: DStream[T],
> *foreachFunc: (RDD[T], Time) => Unit*
>   ) extends DStream[Unit](parent.ssc) {
>
> I would like to have access to this structure - particularly the ability
> to define an "foreachFunc" that gets applied to each RDD within the
> DStream.  Is there a means to do so?
>
>
>
> 2014-05-14 21:25 GMT-07:00 Stephen Boesch :
>
>>
>> Given that collect() does not exist on DStream apparently my mental model
>> of Streaming RDD (DStream) needs correction/refinement.  So what is the
>> means to convert DStream data into a JVM in-memory representation.  All of
>> the methods on DStream i.e. filter, map, transform, reduce, etc generate
>> other DStream's, and not an in memory data structure.
>>
>>
>>
>>
>


unsubscribe

2014-05-16 Thread eric perler
unsubscribe   

Re: How to run the SVM and LogisticRegression

2014-05-16 Thread Xiangrui Meng
If you check out the master branch, there are some examples that can
be used as templates under

examples/src/main/scala/org/apache/spark/examples/mllib

Best,
Xiangrui

On Wed, May 14, 2014 at 1:36 PM, yxzhao  wrote:
>
> Hello,
> I found the classfication algorithms SVM and LogisticRegression implemented
> in the following directory. And how to run them? What is the commnad line
> should be? Thanks.
> spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Turn BLAS on MacOSX

2014-05-16 Thread Xiangrui Meng
Are you running Spark or just Breeze? First try breeze-natives locally
with the reference blas library and see whether it works or not. Also,
do not enable multi-threading when you compile OpenBLAS
(USE_THREADS=0). -Xiangrui

On Tue, May 13, 2014 at 2:17 AM, wxhsdp  wrote:
> Hi, Xiangrui
>
>   i compile openblas on ec2 m1.large, when breeze calls the native lib,
> error occurs:
>
> INFO: successfully loaded
> /mnt2/wxhsdp/libopenblas/lib/libopenblas_nehalemp-r0.2.9.rc2.so
> [error] (run-main-0) java.lang.UnsatisfiedLinkError:
> com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Ljava/lang/String;Ljava/lang/String;IIID[DII[DIID[DII)V
> java.lang.UnsatisfiedLinkError:
> com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Ljava/lang/String;Ljava/lang/String;IIID[DII[DIID[DII)V
> at com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Native 
> Method)
> at
> com.github.fommil.netlib.NativeSystemBLAS.dgemm(NativeSystemBLAS.java:100)
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Turn-BLAS-on-MacOSX-tpp5648.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Standalone client failing with docker deployed cluster

2014-05-16 Thread Bharath Ravi Kumar
Hi,

I'm running the spark server with a single worker on a laptop using the
docker images. The spark shell examples run fine with this setup. However,
a standalone java client that tries to run wordcount on a local files (1 MB
in size), the execution fails with the following error on the stdout of the
worker:

14/05/15 10:31:21 INFO Slf4jLogger: Slf4jLogger started
14/05/15 10:31:21 INFO Remoting: Starting remoting
14/05/15 10:31:22 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@worker1:55924]
14/05/15 10:31:22 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@worker1:55924]
14/05/15 10:31:22 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@R9FX97h.local:56720/user/CoarseGrainedScheduler
14/05/15 10:31:22 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@worker1:50040/user/Worker
14/05/15 10:31:22 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://spark@R9FX97h.local:56720]. Address is now gated for
6 ms, all messages to this address will be delivered to dead letters.
14/05/15 10:31:22 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@worker1:55924] ->
[akka.tcp://spark@R9FX97h.local:56720]
disassociated! Shutting down.

I noticed the following messages on the worker console when I attached
through docker:

14/05/15 11:24:33 INFO Worker: Asked to launch executor
app-20140515112408-0005/7 for billingLogProcessor
14/05/15 11:24:33 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] ->
[akka.tcp://sparkExecutor@worker1:42437]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:42437]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:42437
]
14/05/15 11:24:33 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] ->
[akka.tcp://sparkExecutor@worker1:42437]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:42437]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:42437]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:42437
]
14/05/15 11:24:33 INFO ExecutorRunner: Launch command:
"/usr/lib/jvm/java-7-openjdk-amd64/bin/java" "-cp"
":/opt/spark-0.9.0/conf:/opt/spark-0.9.0/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar"
"-Xms512M" "-Xmx512M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://spark@R9FX97h.local:46986/user/CoarseGrainedScheduler" "7"
"worker1" "1" "akka.tcp://sparkWorker@worker1:50040/user/Worker"
"app-20140515112408-0005"
14/05/15 11:24:35 INFO Worker: Executor app-20140515112408-0005/7 finished
with state FAILED message Command exited with code 1 exitStatus 1
14/05/15 11:24:35 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40172.17.0.4%3A33648-135#310170905]
was not delivered. [34] dead letters encountered. This logging can be
turned off or adjusted with configuration settings 'akka.log-dead-letters'
and 'akka.log-dead-letters-during-shutdown'.
14/05/15 11:24:35 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] ->
[akka.tcp://sparkExecutor@worker1:56594]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:56594]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:56594
]
14/05/15 11:24:35 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@worker1:50040] ->
[akka.tcp://sparkExecutor@worker1:56594]:
Error [Association failed with [akka.tcp://sparkExecutor@worker1:56594]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@worker1:56594]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: worker1/172.17.0.4:56594
]

The significant code snippets from the standalone java client are as
follows:

JavaSparkContext ctx = new JavaSparkContext(masterAddr, "log_processor",
sparkHome, jarFileLoc);
JavaRDD rawLog = ctx.textFile("/tmp/some.log");
List> topRecords =
rawLog.map(fieldSplitter).map(fieldExtractor).top(5, tupleComparator);


However, running the sample code provided on github (amplab docker page)
over the spark shell went through fine with the following stdout message:

14/05/15 10:39:41 INFO Slf4jLogger: Slf4jLogger started
14/05/15 10:39:42 INFO Remoting: Starting remoting
14/05/15 1

unsubscribe

2014-05-16 Thread eric perler
unsubscribe
  

advice on maintaining a production spark cluster?

2014-05-16 Thread Josh Marcus
Hey folks,

I'm wondering what strategies other folks are using for maintaining and
monitoring the stability of stand-alone spark clusters.

Our master very regularly loses workers, and they (as expected) never
rejoin the cluster.  This is the same behavior I've seen
using akka cluster (if that's what spark is using in stand-alone mode) --
are there configuration options we could be setting
to make the cluster more robust?

We have a custom script which monitors the number of workers (through the
web interface) and restarts the cluster when
necessary, as well as resolving other issues we face (like spark shells
left open permanently claiming resources), and it
works, but it's no where close to a great solution.

What are other folks doing?  Is this something that other folks observe as
well?  I suspect that the loss of workers is tied to
jobs that run out of memory on the client side or our use of very large
broadcast variables, but I don't have an isolated test case.
I'm open to general answers here: for example, perhaps we should simply be
using mesos or yarn instead of stand-alone mode.

--j


Workers unable to find class, even when in the SparkConf JAR list

2014-05-16 Thread Robert James
I'm using spark-ec2 to run some Spark code.  When I set master to
"local", then it runs fine.  However, when I set master to $MASTER,
the workers immediately fail, with java.lang.NoClassDefFoundError for
the classes.

I've used sbt-assembly to make a jar with the classes, confirmed using
jar tvf that the classes are there, and set SparkConf to distribute
the classes.  The Spark Web UI indeed shows the assembly jar to be
added to the classpath:
http://172.x.x.x47441/jars/myjar-assembly-1.0.jar

It seems that, despite the fact that myjar-assembly contains the
class, and is being added to the cluster, it's not reaching the
workers.  How do I fix this? (Do I need to manually copy the jar file?
If so, to which dir? I thought that the point of the SparkConf add
jars was to do this automatically)


Re: Express VMs - good idea?

2014-05-16 Thread Stephen Boesch
Hi Marco,
  Hive itself is not working in the CDH5.0 VM (due to FNFE's on the third
party jars).  While you did not mention using Shark, you may keep that in
mind. I will try out spark-only commands late today and report what I find.


2014-05-14 5:00 GMT-07:00 Marco Shaw :

> Hi,
>
> I've wanted to play with Spark.  I wanted to fast track things and just
> use one of the vendor's "express VMs".  I've tried Cloudera CDH 5.0 and
> Hortonworks HDP 2.1.
>
> I've not written down all of my issues, but for certain, when I try to run
> spark-shell it doesn't work.  Cloudera seems to crash, and both complain
> when I try to use "SparkContext" in a simple Scala command.
>
> So, just a basic question on whether anyone has had success getting these
> express VMs to work properly with Spark *out of the box* (HDP does required
> you install Spark manually).
>
> I know Cloudera recommends 8GB of RAM, but I've been running it with 4GB.
>
> Could it be that 4GB is just not enough, and causing issues or have others
> had success using these Hadoop 2.x pre-built VMs with Spark 0.9.x?
>
> Marco
>


Re: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-05-16 Thread Nathan Kronenfeld
 Serializing the main object isn't going to help here - it's SparkContext
it's complaining about.

The problem is that the context is, according to the code you sent,
computeDwt has a signature of:
class DWTSample ... {
def computDWT (sc: SparkContext, data: ArrayBuffer[(Int, Double)]):
List[Double]
}

do you need the SparkContext within that function?  That function is
executing out on your workers; they shouldn't be trying to send work
directly to other workers anyway, or using RDDs or other spark contexts,
they should just be working with the data.  If you can eliminate the
SparkContext parameter there, you should be fine.

Also, I don't know how expensive DWTSample is to produce, or if you need a
separate instance of each record; if you need one for each record, as is
indicated by the code you sent, it doesn't actually have to be serializable
- you're creating it out on the worker nodes, not sending it to them from
the client node.
If you don't need a unique instance per record, then you can either use the
serializable nature to just create one, and use that one for each record,
or if you would prefer it not to be serializable, you can create one per
partition and use that one on each record in the partition:
kk = series.mapPartitions(iter => {
  val sampler = new DWTsample()
  iter.map(i => sampler.computeDwt(i._2))
})

(assuming you eliminated the sc parameter, of course)


Hope this helps!


On Mon, May 12, 2014 at 2:27 AM, yh18190  wrote:
>
>> Hi,
>>
>> I am facing above exception when I am trying to apply a method(ComputeDwt)
>> on RDD[(Int,ArrayBuffer[(Int,Double)])] input.
>> I am even using extends Serialization option to serialize objects in
>> spark.Here is the code snippet.
>>
>> Could anyone suggest me what could be the problem and what should be done
>> to
>> overcome this issue.???
>>
>> input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
>> DWTsample extends Serialization is a class having computeDwt function.
>> sc: sparkContext
>>
>>  val  kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new
>> DWTsample().computeDwt(sc,t._2)))
>>
>> Error:
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-failed-java-io-NotSerializableException-org-apache-spark-SparkContext-tp5585.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


JavaNetworkWordCount

2014-05-16 Thread Eduardo Costa Alfaia
Hi Guys,

TD has given me this piece of code: “sparkContext.makeRDD(1 to 100, 
100).collect()", I am using a java code of NetworkWordcount, How could I use 
this piece in this code in java?

Thanks
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Efficient implementation of getting top 10 hashtags in last 5 mins window

2014-05-16 Thread bgawalt
Hi nilmish,

One option for you is to consider moving to a different algorithm. The
SpaceSaver/StreamSummary method will get you approximate results in exchange
for smaller data structure size. It has an implementation in Twitter's
Algebird library, if you're using Scala:

https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/SpaceSaver.scala

and has a more general write up here:

http://boundary.com/blog/2013/05/14/approximate-heavy-hitters-the-spacesaving-algorithm/

I believe it will let you avoid an expensive sort of all the hundreds of
thousands of hashtags you can see in a day.

Best,
--Brian



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-implementation-of-getting-top-10-hashtags-in-last-5-mins-window-tp5741p5845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: filling missing values in a sequence

2014-05-16 Thread bgawalt
Hello Mohit,

I don't think there's a direct way of bleeding elements across partitions.
But you could write it yourself relatively succinctly:

A) Sort the RDD
B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( )
method. Map each partition to its partition ID, and its maximum element.
Collect the (partID, maxElements) in the driver.
C) Broadcast the collection of (partID, part's max element) tuples
D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex( ).
For each partition K:
D1) Find the immediately-preceding partition K -1 , and its associated
maximum value. Use that to decide how many values are missing between the
last element of part K-1 and the first element of part K.
D2) Step through part K's elements and find the rest of the missing elements
in that part

This approach sidesteps worries you might have over the hack of using
.filter to remove the first element (how do you want to handle ties, for
instance?), as well as the possible fragility of zipping.

--Brian



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filling-missing-values-in-a-sequence-tp5708p5846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark with Drill

2014-05-16 Thread N . Venkata Naga Ravi
Hi,

I am trying to understand and and seeing Drill as one of the upcoming 
interesting tool outside. 
Can somebody clarify where Drill is going to position in Hadoop ecosystem 
compare with Spark and Shark?
Is it going to be used as alternative to any one of the Spark/Shark or Storm? 
Or Drill can integrate with them in this stack layer.
Also seeing MapR (major contributor of Drill) has going to packaging Spark in 
their recent announcement.


Thanks,
Ravi  

How to pass config variables to workers

2014-05-16 Thread srobertjames
What is a good way to pass config variables to workers?

I've tried setting them in environment variables via spark-env.sh, but, as
far as I can tell, the environment variables set there don't appear in
workers' environments.  If I want to be able to configure all workers,
what's a good way to do it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tp5780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: JavaNetworkWordCount

2014-05-16 Thread Mayur Rustagi
It would look ugly.. as explicit datatypes need to be mentioned..
you are better off using parallelize instead.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, May 16, 2014 at 6:11 PM, Eduardo Costa Alfaia <
e.costaalf...@unibs.it> wrote:

> Hi Guys,
>
> TD has given me this piece of code: “sparkContext.makeRDD(1 to 100,
> 100).collect()", I am using a java code of NetworkWordcount, How could I
> use this piece in this code in java?
>
> Thanks
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: cant get tests to pass anymore on master master

2014-05-16 Thread Koert Kuipers
i tried on a few different machines, including a server, all same ubuntu
and same java, and got same errors. i also tried modifying the timeouts in
the unit tests and it did not help.

ok i will try blowing away local maven repo and do clean.


On Thu, May 15, 2014 at 12:49 PM, Sean Owen  wrote:

> Since the error concerns a timeout -- is the machine slowish?
>
> What about blowing away everything in your local maven repo, do a
> clean, etc. to rule out environment issues?
>
> I'm on OS X here FWIW.
>
> On Thu, May 15, 2014 at 5:24 PM, Koert Kuipers  wrote:
> > yeah sure. it is ubuntu 12.04 with jdk1.7.0_40
> > what else is relevant that i can provide?
> >
> >
> > On Thu, May 15, 2014 at 12:17 PM, Sean Owen  wrote:
> >>
> >> FWIW I see no failures. Maybe you can say more about your environment,
> >> etc.
> >>
> >> On Wed, May 7, 2014 at 10:01 PM, Koert Kuipers 
> wrote:
> >> > i used to be able to get all tests to pass.
> >> >
> >> > with java 6 and sbt i get PermGen errors (no matter how high i make
> the
> >> > PermGen). so i have given up on that.
> >> >
> >> > with java 7 i see 1 error in a bagel test and a few in streaming
> tests.
> >> > any
> >> > ideas? see the error in BagelSuite below.
> >> >
> >> > [info] - large number of iterations *** FAILED *** (10 seconds, 105
> >> > milliseconds)
> >> > [info]   The code passed to failAfter did not complete within 10
> >> > seconds.
> >> > (BagelSuite.scala:85)
> >> > [info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
> >> > [info]   at
> >> >
> >> >
> org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
> >> > [info]   at
> >> >
> >> >
> org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
> >> > [info]   at
> >> >
> org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282)
> >> > [info]   at
> >> > org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246)
> >> > [info]   at
> >> > org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32)
> >> > [info]   at
> >> >
> >> >
> org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85)
> >> > [info]   at
> >> >
> org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
> >> > [info]   at
> >> >
> org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
> >> > [info]   at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265)
> >> > [info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1974)
> >> > [info]   at
> >> > org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)
> >> >
> >
> >
>


Re: Understanding epsilon in KMeans

2014-05-16 Thread Sean Owen
It is running k-means many times, independently, from different random
starting points in order to pick the best clustering. Convergence ends
one run, not all of them.

Yes epsilon should be the same as "convergence threshold" elsewhere.

You can set epsilon if you instantiate KMeans directly. Maybe it would
be nice to overload train() to be able to set that too, but I imagine
the point of the static convenience methods is to encapsulate the most
usual subsets of parameters.

On Wed, May 14, 2014 at 1:50 PM, Stuti Awasthi  wrote:
> Hi All,
>
>
>
> I wanted to understand the functionality of epsilon in KMeans in Spark
> MLlib.
>
>
>
> As per documentation :
>
> distance threshold within which we've consider centers to have converged.If
> all centers move less than this Euclidean distance, we stop iterating one
> run.
>
>
>
> Now I have assumed that if centers are moving less than epsilon value then
> Clustering Stops but then what does it mean by “we stop iterating one run”..
>
> Now suppose I have given maxIterations=10  and epsilon = 0.1 and assume that
> centers are afteronly 2 iteration, the epsilon condition is met i.e. now
> centers are moving only less than 0.1..
>
>
>
> Now what happens ?? The whole 10 iterations are completed OR the Clustering
> stops ??
>
>
>
> My 2nd query is in Mahout, there is a configuration param : “Convergence
> Threshold (cd)”   which states : “in an iteration, the centroids don’t move
> more than this distance, no further iterations are done and clustering
> stops.”
>
>
>
> So is epsilon and cd similar ??
>
>
>
> 3rd query :
>
> How to pass epsilon as a configurable param. KMeans.train() does not provide
> the way but in code I can see “setEpsilon” as method. SO if I want to pass
> the parameter as epsilon=0.1 , how may I do that..
>
>
>
> Pardon my ignorance
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
>
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

Could you try varying the minPartitions parameter? For example:

val r = sc.textFile("/user/aa/myfile.bz2", 4).count
val r = sc.textFile("/user/aa/myfile.bz2", 8).count

Best,
Xiangrui

On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> the problem you described, but it does contain several fixes to bzip2
> format. -Xiangrui
>
> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>> Hi all,
>>
>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>> success?
>>
>>
>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>> and CDH 4.4.0:
>>
>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>> s+"| " ).count
>>
>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>> (see below stacktraces), which is unusual because I'm able to read from that
>> file without an issue using the same libraries via Pig.  It was originally
>> created from Pig as well.
>>
>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>> for CBZip2InputStream:
>>
>> "Instances of this class are not threadsafe." [source]
>>
>>
>> My current working theory is that Spark has a much higher level of
>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>> exceptions much more frequently (as in can't finish a run over a little 2M
>> row file) vs hardly at all in other libraries.
>>
>> The only other reference I could find to the issue was in presto-users, but
>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>> the higher compression levels of .bz2.
>>
>>
>> Would love to hear if I have some kind of configuration issue or if there's
>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>> thoughts on the issue.
>>
>>
>> Thanks!
>> Andrew
>>
>>
>>
>> Below are examples of some exceptions I'm getting:
>>
>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ArrayIndexOutOfBoundsException
>> java.lang.ArrayIndexOutOfBoundsException: 65535
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> at java.io.InputStream.read(InputStream.java:101)
>> at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: 90
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> at java.io.InputStream.read(InputStream.java:101)
>> at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>> at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>> at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>> at scala.collection.Iterator$$anon$11.hasN

Re: Express VMs - good idea?

2014-05-16 Thread Sean Owen
Hey Marco, I tried the CDH5 VM today and it works fine -- but note
that you need to start the Spark service after the VM boots. Just go
to CM and choose Start from the dropdown next to Spark. spark-shell
works fine then.

On Wed, May 14, 2014 at 1:00 PM, Marco Shaw  wrote:
> Hi,
>
> I've wanted to play with Spark.  I wanted to fast track things and just use
> one of the vendor's "express VMs".  I've tried Cloudera CDH 5.0 and
> Hortonworks HDP 2.1.
>
> I've not written down all of my issues, but for certain, when I try to run
> spark-shell it doesn't work.  Cloudera seems to crash, and both complain
> when I try to use "SparkContext" in a simple Scala command.
>
> So, just a basic question on whether anyone has had success getting these
> express VMs to work properly with Spark *out of the box* (HDP does required
> you install Spark manually).
>
> I know Cloudera recommends 8GB of RAM, but I've been running it with 4GB.
>
> Could it be that 4GB is just not enough, and causing issues or have others
> had success using these Hadoop 2.x pre-built VMs with Spark 0.9.x?
>
> Marco


Error while launching ec2 spark cluster with HVM (r3.large)

2014-05-16 Thread Usman Ghani
Hi All,

I am trying to use amazon memory optimized (R3) instances in the Oregon
region. I am getting 'Connection refused' during the SSH setup phase. I
tried using both root and ec2-user as user ids.


sh: connect to host ec2-54-185-57-74.us-west-2.compute.amazonaws.com port
22: Connection refused
Error executing remote command, retrying after 30 seconds: Command '['ssh',
'-o', 'StrictHostKeyChecking=no', '-i',
'/Users/usman/.ssh/ughani_imp_us_west_2.pem', '-t', '-t',
u'r...@ec2-54-185-57-74.us-west-2.compute.amazonaws.com', "\n  [ -f
~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
&&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
returned non-zero exit status 255
ssh: connect to host ec2-54-185-57-74.us-west-2.compute.amazonaws.com port
22: Connection refused


Generating cluster's SSH key on master...
ssh: connect to host ec2-54-185-58-135.us-west-2.compute.amazonaws.com port
22: Connection refused
Error executing remote command, retrying after 30 seconds: Command '['ssh',
'-o', 'StrictHostKeyChecking=no', '-i',
'/Users/usman/.ssh/ughani_imp_us_west_2.pem', '-t', '-t',
u'ec2-u...@ec2-54-185-58-135.us-west-2.compute.amazonaws.com', "\n  [
-f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
&&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
returned non-zero exit status 255
ssh: connect to host ec2-54-185-58-135.us-west-2.compute.amazonaws.com port
22: Connection refused
Error executing remote command, retrying after 30 seconds: Command '['ssh',
'-o', 'StrictHostKeyChecking=no', '-i',
'/Users/usman/.ssh/ughani_imp_us_west_2.pem', '-t', '-t',
u'ec2-u...@ec2-54-185-58-135.us-west-2.compute.amazonaws.com', "\n  [
-f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
&&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
returned non-zero exit status 255
ssh: connect to host ec2-54-185-58-135.us-west-2.compute.amazonaws.com port
22: Connection refused
Error executing remote command, retrying after 30 seconds: Command '['ssh',
'-o', 'StrictHostKeyChecking=no', '-i',
'/Users/usman/.ssh/ughani_imp_us_west_2.pem', '-t', '-t',
u'ec2-u...@ec2-54-185-58-135.us-west-2.compute.amazonaws.com', "\n  [
-f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
&&\n cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]'
returned non-zero exit status 255
Warning: Permanently added '
ec2-54-185-58-135.us-west-2.compute.amazonaws.com,54.185.58.135' (RSA) to
the list of known hosts.
Connection to ec2-54-185-58-135.us-west-2.compute.amazonaws.com closed.
Transferring cluster's SSH key to slaves...
ec2-54-185-57-126.us-west-2.compute.amazonaws.com
ssh: connect to host ec2-54-185-57-126.us-west-2.compute.amazonaws.com port
22: Connection refused
Error 255 while executing remote command, retrying after 30 seconds
ssh: connect to host ec2-54-185-57-126.us-west-2.compute.amazonaws.com port
22: Connection refused
Error 255 while executing remote command, retrying after 30 seconds
ssh: connect to host ec2-54-185-57-126.us-west-2.compute.amazonaws.com port
22: Connection refused
Error 255 while executing remote command, retrying after 30 seconds
Warning: Permanently added '
ec2-54-185-57-126.us-west-2.compute.amazonaws.com,54.185.57.126' (RSA) to
the list of known hosts.
ec2-54-188-56-91.us-west-2.compute.amazonaws.com
Warning: Permanently added
'ec2-54-188-56-91.us-west-2.compute.amazonaws.com,54.188.56.91'
(RSA) to the list of known hosts.
ec2-54-188-61-59.us-west-2.compute.amazonaws.com
Warning: Permanently added
'ec2-54-188-61-59.us-west-2.compute.amazonaws.com,54.188.61.59'
(RSA) to the list of known hosts.
ec2-54-188-21-245.us-west-2.compute.amazonaws.com
Warning: Permanently added '
ec2-54-188-21-245.us-west-2.compute.amazonaws.com,54.188.21.245' (RSA) to
the list of known hosts.
Cloning into 'spark-ec2'...
remote: Counting objects: 1407, done.
remote: Compressing objects: 100% (673/673), done.
remote: Total 1407 (delta 457), reused 1398 (delta 454)
Receiving objects: 100% (1407/1407), 219.00 KiB | 340 KiB/s, done.
Resolving deltas: 100% (457/457), done.
Connection to ec2-54-185-58-135.us-west-2.compute.amazonaws.com closed.
Deploying files to master...
WARNING: Don't know number of disks on instance type r3.large; assuming 1
building file list ... done
root/spark-ec2/
rsync: recv_generator: mkdir "/root/spark-ec2" failed: Permission denied
(13)
*** Skipping any contents from this failed directory ***

sent 101 bytes  received 26 bytes  254.00 bytes/sec
total size is 1617  speedup is 12.73
rsync error: some files could not be transferred (code 23) at
/SourceCache/rsync/rsync-42/rsync/main.c(992) [sender=2.6.9]
Traceback (most recent call last):
  File "./spark_ec2.py", line 822, in 
main()
  File "./spark_ec2.py", line 814, in main
real_main()
  File "./spark_ec2.py", line 699, in real_main
setup_cluster(conn, master_nodes, slave_nodes, opts, True

Re: java serialization errors with spark.files.userClassPathFirst=true

2014-05-16 Thread Koert Kuipers
after removing all class paramater of class Path from my code, i tried
again. different but related eror when i set
spark.files.userClassPathFirst=true

now i dont even use FileInputFormat directly. HadoopRDD does...

14/05/16 12:17:17 ERROR Executor: Exception in task ID 45
java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at
org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:51)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:57)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1481)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1331)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)



On Thu, May 15, 2014 at 3:03 PM, Koert Kuipers  wrote:

> when i set spark.files.userClassPathFirst=true, i get java serialization
> errors in my tasks, see below. when i set userClassPathFirst back to its
> default of false, the serialization errors are gone. my spark.serializer is
> KryoSerializer.
>
> the class org.apache.hadoop.fs.Path is in the spark assembly jar, but not
> in my task jars (the ones i added to the SparkConf). so looks like the
> ClosureSerializer is having trouble with this class once the
> ChildExecutorURLClassLoader is used? thats me just guessing.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 1.0:5 failed 4 times, most recent failure:
> Exception failure in TID 31 on host node05.tresata.com:
> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
> java.lang.Class.getDeclaredConstructors0(Native Method)
> java.lang.Class.privateGetDeclaredConstructors(Class.java:2398)
> java.lang.Class.getDeclaredConstructors(Class.java:1838)
>
> java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1697)
> java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:50)
> java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:203)
> java.security.AccessController.doPrivileged(Native Method)
>
> java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:200)
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:556)
>
> java.io.ObjectInputStream.readNonProxyDesc(

Re: java serialization errors with spark.files.userClassPathFirst=true

2014-05-16 Thread Koert Kuipers
ok i put lots of logging statements in the ChildExecutorURLClassLoader.
this is what i see:

* the urls for userClassLoader are correct and includes only my one jar.

* for one class that only exists in my jar i see it gets loaded correctly
using userClassLoader

* for a class that exists in both my jar and spark kernel it tries to use
userClassLoader and ends up with a NoClassDefFoundError. the class is
org.apache.avro.mapred.AvroInputFormat and the NoClassDefFoundError is for
org.apache.hadoop.mapred.FileInputFormat (which the parentClassLoader is
responsible for since it is not in my jar). i currently catch this
NoClassDefFoundError and call parentClassLoader.loadClass but thats clearly
not a solution since it loads the wrong version.



On Fri, May 16, 2014 at 2:25 PM, Koert Kuipers  wrote:

> well, i modified ChildExecutorURLClassLoader to also delegate to
> parentClassloader if NoClassDefFoundError is thrown... now i get yet
> another error. i am clearly missing something with these classloaders. such
> nasty stuff... giving up for now. just going to have to not use
> spark.files.userClassPathFirst=true for now, until i have more time to look
> at this.
>
> 14/05/16 13:58:59 ERROR Executor: Exception in task ID 3
> java.lang.ClassCastException: cannot assign instance of scala.None$ to
> field org.apache.spark.rdd.RDD.checkpointData of type scala.Option in
> instance of MyRDD
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1995)
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60)
>
>
>
> On Fri, May 16, 2014 at 1:46 PM, Koert Kuipers  wrote:
>
>> after removing all class paramater of class Path from my code, i tried
>> again. different but related eror when i set
>> spark.files.userClassPathFirst=true
>>
>> now i dont even use FileInputFormat directly. HadoopRDD does...
>>
>> 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45
>> java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
>> at
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at
>> org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:35

spark-submit / S3

2014-05-16 Thread Nick Pentreath
Hi

I see from the docs for 1.0.0 that the new "spark-submit" mechanism seems
to support specifying the jar with hdfs:// or http://

Does this support S3? (It doesn't seem to as I have tried it on EC2 but
doesn't seem to work):

./bin/spark-submit --master local[2] --class myclass s3n://bucket/myapp.jar



Counting things only once

2014-05-16 Thread Daniel Siegmann
I want to use accumulators to keep counts of things like invalid lines
found and such, for reporting purposes. Similar to Hadoop counters. This
may seem simple, but my case is a bit more complicated. The code which is
creating an RDD from a transform is separated from the code which performs
the operation on that RDD - or operations (I can't make any assumption as
to how many operations will be done on this RDD). There are two issues: (1)
I want to retrieve the accumulator value only after it has been computed,
and (2) I don't wan to count the same thing twice if the RDD is recomputed.

Here's a simple example, converting strings to integers. Any records which
can't be parsed as an integer are dropped, but I want to count how many
times that happens:

def numbers(val input: RDD[String]) : RDD[Int] = {
val invalidRecords = sc.accumulator(0)
input.flatMap { record =>
try {
Seq(record.toInt)
} catch {
case NumberFormatException => invalidRecords += 1; Seq()
}
}
}

I need some way to know when the result RDD has been computed so I can get
the accumulator value and reset it. Or perhaps it would be better to say I
need a way to ensure the accumulator value is computed exactly once for a
given RDD. Anyone know a way to do this? Or anything I might look into? Or
is this something that just isn't supported in Spark?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io



Re: Hadoop 2.3 Centralized Cache vs RDD

2014-05-16 Thread Bertrand Dechoux
http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html

We do not currently cache blocks which are under construction, corrupt, or
otherwise incomplete.

Have you tried with a file with more than 1 block?

And dfs.namenode.path.based.cache.refresh.interval.ms might be too large?

You might want to ask a broader mailing list. This is not related to Spark.

Bertrand


On Fri, May 16, 2014 at 2:56 AM, hequn cheng  wrote:

> I tried centralized cache step by step following the apache hadoop oficial
> website, but it seems centralized cache doesn't work.
> see :
> http://stackoverflow.com/questions/22293358/centralized-cache-failed-in-hadoop-2-3
> .
> Can anyone succeed?
>
>
> 2014-05-15 5:30 GMT+08:00 William Kang :
>
>> Hi,
>> Any comments or thoughts on the implications of the newly released
>> feature from Hadoop 2.3 on the centralized cache? How different it is from
>> RDD?
>>
>> Many thanks.
>>
>>
>> Cao
>>
>
>


Re: java serialization errors with spark.files.userClassPathFirst=true

2014-05-16 Thread Koert Kuipers
ok i think the issue is visibility: a classloader can see all classes
loaded by its parent classloader. but userClassLoader does not have a
parent classloader, so its not able to "see" any classes that parentLoader
is responsible for. in my case userClassLoader is trying to get
AvroInputFormat which probably somewhere statically references
FileInputFormat, which is invisible to userClassLoader.


On Fri, May 16, 2014 at 3:32 PM, Koert Kuipers  wrote:

> ok i put lots of logging statements in the ChildExecutorURLClassLoader.
> this is what i see:
>
> * the urls for userClassLoader are correct and includes only my one jar.
>
> * for one class that only exists in my jar i see it gets loaded correctly
> using userClassLoader
>
> * for a class that exists in both my jar and spark kernel it tries to use
> userClassLoader and ends up with a NoClassDefFoundError. the class is
> org.apache.avro.mapred.AvroInputFormat and the NoClassDefFoundError is for
> org.apache.hadoop.mapred.FileInputFormat (which the parentClassLoader is
> responsible for since it is not in my jar). i currently catch this
> NoClassDefFoundError and call parentClassLoader.loadClass but thats clearly
> not a solution since it loads the wrong version.
>
>
>
> On Fri, May 16, 2014 at 2:25 PM, Koert Kuipers  wrote:
>
>> well, i modified ChildExecutorURLClassLoader to also delegate to
>> parentClassloader if NoClassDefFoundError is thrown... now i get yet
>> another error. i am clearly missing something with these classloaders. such
>> nasty stuff... giving up for now. just going to have to not use
>> spark.files.userClassPathFirst=true for now, until i have more time to look
>> at this.
>>
>> 14/05/16 13:58:59 ERROR Executor: Exception in task ID 3
>> java.lang.ClassCastException: cannot assign instance of scala.None$ to
>> field org.apache.spark.rdd.RDD.checkpointData of type scala.Option in
>> instance of MyRDD
>> at
>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>> at
>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1995)
>>
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60)
>>
>>
>>
>> On Fri, May 16, 2014 at 1:46 PM, Koert Kuipers  wrote:
>>
>>> after removing all class paramater of class Path from my code, i tried
>>> again. different but related eror when i set
>>> spark.files.userClassPathFirst=true
>>>
>>> now i dont even use FileInputFormat directly. HadoopRDD does...
>>>
>>> 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45
>>> java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
>>> at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>>> at java.net.URLClassLoader.access$10

Re: accessing partition i+1 from mapper of partition i

2014-05-16 Thread Brian Gawalt
I don't think there's a direct way of bleeding elements across partitions.
But you could write it yourself relatively succinctly:

A) Sort the RDD
B) Look at the sorted RDD's partitions with the .mapParititionsWithIndex( )
method. Map each partition to its partition ID, and its maximum element.
Collect the (partID, maxElements) in the driver.
C) Broadcast the collection of (partID, part's max element) tuples
D) Look again at the sorted RDD's partitions with mapPartitionsWithIndex(
). For each partition *K:*
D1) Find the immediately-preceding partition* K -1 , *and its associated
maximum value. Use that to decide how many values are missing between the
last element of part *K-1 *and the first element of part *K*.
D2) Step through part *K*'s elements and find the rest of the missing
elements in that part

This approach sidesteps worries you might have over the hack of using
.filter to remove the first element, as well as the zipping.

--Brian



On Tue, May 13, 2014 at 9:31 PM, Mohit Jaggi  wrote:

> Hi,
> I am trying to find a way to fill in missing values in an RDD. The RDD is
> a sorted sequence.
> For example, (1, 2, 3, 5, 8, 11, ...)
> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>
> One way to do this is to "slide and zip"
> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
> x = rdd1.first
> rdd2 = rdd1 filter (_ != x)
> rdd3 = rdd2 zip rdd1
> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y }
>
> Another method which I think is more efficient is to use mapParititions()
> on rdd1 to be able to iterate on elements of rdd1 in each partition.
> However, that leaves the boundaries of the partitions to be "unfilled". *Is
> there a way within the function passed to mapPartitions, to read the first
> element in the next partition?*
>
> The latter approach also appears to work for a general "sliding window"
> calculation on the RDD. The former technique requires a lot of "sliding and
> zipping" and I believe it is not efficient. If only I could read the next
> partition...I have tried passing a pointer to rdd1 to the function passed
> to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
> Spark cannot deal with a mapper calling another mapper (since it happens on
> a worker not the driver)
>
> Mohit.
>
>


Problem when sorting big file

2014-05-16 Thread Gustavo Enrique Salazar Torres
Hi there:

I have this dataset (about 12G) which I need to sort by key.
I used the sortByKey method but when I try to save the file to disk (HDFS
in this case) it seems that some tasks run out of time because they have
too much data to save and it can't fit in memory.
I say this because before the TimeOut exception at the worker there is an
OOM exception from an specific task.
My question is: is this a common problem at Spark? has anyone been through
this issue?
The cause of the problem seems to be an unbalanced distribution of data
between tasks.

I will appreciate any help.

Thanks
Gustavo


Re: Understanding epsilon in KMeans

2014-05-16 Thread Xiangrui Meng
In Spark's KMeans, if no cluster center moves more than epsilon in
Euclidean distance from previous iteration, the algorithm finishes. No
further iterations are performed. For Mahout, you need to check the
documentation or the code to see what epsilon means there. -Xiangrui

On Wed, May 14, 2014 at 8:35 PM, Stuti Awasthi  wrote:
> Hi All,
>
>
>
> Any ideas on this ??
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
> From: Stuti Awasthi
> Sent: Wednesday, May 14, 2014 6:20 PM
> To: user@spark.apache.org
> Subject: Understanding epsilon in KMeans
>
>
>
> Hi All,
>
>
>
> I wanted to understand the functionality of epsilon in KMeans in Spark
> MLlib.
>
>
>
> As per documentation :
>
> distance threshold within which we've consider centers to have converged.If
> all centers move less than this Euclidean distance, we stop iterating one
> run.
>
>
>
> Now I have assumed that if centers are moving less than epsilon value then
> Clustering Stops but then what does it mean by “we stop iterating one run”..
>
> Now suppose I have given maxIterations=10  and epsilon = 0.1 and assume that
> centers are afteronly 2 iteration, the epsilon condition is met i.e. now
> centers are moving only less than 0.1..
>
>
>
> Now what happens ?? The whole 10 iterations are completed OR the Clustering
> stops ??
>
>
>
> My 2nd query is in Mahout, there is a configuration param : “Convergence
> Threshold (cd)”   which states : “in an iteration, the centroids don’t move
> more than this distance, no further iterations are done and clustering
> stops.”
>
>
>
> So is epsilon and cd similar ??
>
>
>
> 3rd query :
>
> How to pass epsilon as a configurable param. KMeans.train() does not provide
> the way but in code I can see “setEpsilon” as method. SO if I want to pass
> the parameter as epsilon=0.1 , how may I do that..
>
>
>
> Pardon my ignorance
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
>
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: How to read a multipart s3 file?

2014-05-16 Thread Nicholas Chammas
On Wed, May 7, 2014 at 4:44 PM, Aaron Davidson  wrote:

Spark can only run as many tasks as there are partitions, so if you don't
> have enough partitions, your cluster will be underutilized.

 This is a very important point.

kamatsuoka, how many partitions does your RDD have when you try to save it?
You can check this with myrdd._jrdd.splits().size() in PySpark. If it’s
less than the number of cores in your cluster, try repartition()-ing the
RDD as Aaron suggested.

Nick


Variables outside of mapPartitions scope

2014-05-16 Thread pedro
I am working on some code which uses mapPartitions. Its working great, except
when I attempt to use a variable within the function passed to mapPartitions
which references something outside of the scope (for example, a variable
declared immediately before the mapPartitions call). When this happens, I
get a task not serializable error. I wanted to reference a variable which
had been broadcasted, and ready to use within that closure.

Seeing that, I attempted another solution, to store the broadcasted variable
within an object (singleton class, thing). It serialized fine, but when I
ran it on a cluster, any reference to it got a null pointer exception, my
presumption is that the workers were not getting their objects updated for
some reason, despite setting it as a broadcasted variable. My guess is that
the workers get the serialized function, but spark doesn't know to serialize
the object, including the things it reference. Thus the copied reference
becomes invalid.

What would be a good way to solve my problem? Is there a way to reference a
broadcast variable by name rather through a variable?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Variables-outside-of-mapPartitions-scope-tp5517.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: same log4j slf4j error in spark 9.1

2014-05-16 Thread Adrian Mocanu
Hi guys,
This has been solved. These emails are from last week when the mailing list 
didn’t work.

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-15-14 4:50 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: same log4j slf4j error in spark 9.1

Spark 0.9.1 does not depend on log4j-over-slf4j 
(here
 is the SBT file for 0.9.1). Are you sure that no other dependency in your 
project is bringing in dependency in the classpath? Alternatively, if you dont 
want slf4j-log4j12 from spark, you can safely exclude in the dependencies.

TD

On Thu, May 8, 2014 at 12:56 PM, Adrian Mocanu 
mailto:amoc...@verticalscope.com>> wrote:
I recall someone from the Spark team (TD?) saying that Spark 9.1 will change 
the logger and the circular loop error between slf4j and log4j wouldn’t show up.

Yet on Spark 9.1 I still get
SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class 
path, preempting StackOverflowError.
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more 
details.

Any solutions?

-Adrian




Doubts regarding Shark

2014-05-16 Thread vinay Bajaj
Hello

I have few questions regarding shark.

1) I have a table of 60 GB and i have total memory of 50 GB but when i try
to cache the table it get cached successfully. How shark caches the table
there was not enough memory to get the table in memory. And how cache
eviction policies (FIFO and LRU) works while caching the table. While
creating tables I am using cache type property as MEMORY (storage level:
memory and disk)

2) Sometime while running queries I get JavaOutOfMemory Exception but all
tables are cached successfully. Can you tell me the cases or some example
due to which that error can come.

Regards
Vinay Bajaj


Re: KryoSerializer Exception

2014-05-16 Thread Andrea Esposito
UP, doesn't anyone know something about it? ^^


2014-05-06 12:05 GMT+02:00 Andrea Esposito :

> Hi there,
>
> sorry if i'm posting a lot lately.
>
> i'm trying to add the KryoSerializer but i receive this exception:
> 2014 - 05 - 06 11: 45: 23 WARN TaskSetManager: 62 - Loss was due to
> java.io.EOFException
> java.io.EOFException
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:
> 105)
> at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala: 165)
> at
> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala: 56)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:
> 43)
> at java.lang.reflect.Method.invoke(Method.java: 606)
>
> I set the serializer as:
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> System.setProperty("spark.kryo.registrator", "test.TestKryoRegistrator")
>
> With or without register my custom registrator it throws the exception.
>
> Seems something related to broadcast.. but isn't Kryo already ok out of
> the box just setting it as default serializer?
>


Re: is Mesos falling out of favor?

2014-05-16 Thread Gerard Maas
Regarding docs, Andrew Ash recently did a great effort in refreshing the
Spark on Mesos documentation.
https://github.com/apache/spark/pull/756

It will become part of 1.0

-kr, Gerard.





On Fri, May 9, 2014 at 3:46 PM, Tim St Clair  wrote:

>
>
>
>
> - Original Message -
> > From: "deric" 
> > To: u...@spark.incubator.apache.org
> > Sent: Tuesday, May 6, 2014 11:42:58 AM
> > Subject: Re: is Mesos falling out of favor?
>
> Nope.
>
> >
> > I guess it's due to missing documentation and quite complicated setup.
> > Continuous integration would be nice!
>
> The setup is pretty simple, but stack integration tests are certainly
> missing, and Spark POM's have been out of date for some time.  There are
> JIRAs in both projects to clean up integration and update.
>
> >
> > Btw. is it possible to use spark as a shared library and not to fetch
> spark
> > tarball for each task?
>
> It's really easy to edit Spark's-Mesos scheduler and executor to do what
> you want.  e.g. run local binaries, etc.
>
> >
> > Do you point SPARK_EXECUTOR_URI to HDFS url?
>
> Typically yes, but again it's pretty easy edit to the scheduler and
> executor to do what you want.
>
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5448.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
>
> --
> Cheers,
> Tim
> Freedom, Features, Friends, First -> Fedora
> https://fedoraproject.org/wiki/SIGs/bigdata
>


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi guys,

I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.

Example 1) This is how Spark adds jars. Basically, add jars to
cutomURLClassLoader.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java

It doesn't work for two reasons. a) We don't pass the
customURLClassLoader to task, so it's only available in the
Executor.scala.  b) Even we do so, we need to get the class by
loader.loadClass("Class Name").newInstance(), and get the Method by
getDeclaredMethod to run it.


Example 2) It works by getting the class using loadClass API, and then
get and run the Method by getDeclaredMethod. Since we don't know which
classes users will use, it's not a solution.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java


Example 3) Add jars to systemClassLoader and have them accessible in
JVM. Users can use the classes directly.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java

I'm now porting example 3) to Spark, and will let you know if it works.

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
> Hi Xiangrui,
>
> We're still using Spark 0.9 branch, and our job is submitted by
>
> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>   --jar  \
>   --class  \
>   --args  \
>   --num-workers  \
>   --master-class 
>   --master-memory  \
>   --worker-memory  \
>   --addJars 
>
>
> Based on my understanding of the code in yarn-standalone mode, the jar
> distributing from local machine to application master is through distributed
> cache (using hadoop yarn-client api). From application master to executors,
> it's through http server. I maybe wrong, but if you look at the code in
> SparkContext addJar method, you can see the jar is added to http server in
> yarn-standalone mode.
>
> if (SparkHadoopUtil.get.isYarnMode() && master ==
> "yarn-standalone") {
>   // In order for this to work in yarn standalone mode the user
> must specify the
>   // --addjars option to the client to upload the file into the
> distributed cache
>   // of the AM to make it show up in the current working
> directory.
>   val fileName = new Path(uri.getPath).getName()
>   try {
> env.httpFileServer.addJar(new File(fileName))
>   } catch {
>
> Those jars will be fetched in Executor from http server and added to
> classloader of "Executor" class, see
>
>   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
> HashMap[String, Long]) {
> synchronized {
>   // Fetch missing dependencies
>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
> -1L) < timestamp) {
> logInfo("Fetching " + name + " with timestamp " + timestamp)
> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
> currentFiles(name) = timestamp
>   }
>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
> < timestamp) {
> logInfo("Fetching " + name + " with timestamp " + timestamp)
> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
> currentJars(name) = timestamp
> // Add it to our class loader
> val localName = name.split("/").last
> val url = new File(SparkFiles.getRootDirectory,
> localName).toURI.toURL
>
> if (!urlClassLoader.getURLs.contains(url)) {
>   urlClassLoader.addURL(url)
> }
>   }
>
>
> The problem seems to be that jars are added to the classloader of "Executor"
> classes, and they are not accessible in Task.scala.
>
> I verified this by trying to load our custom classes in Executor.scala, and
> it works. But if I tried to load those classes in Task.scala, I'll get
> classNotFound exception.
>
> Thanks.
>
>
>
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:
>>
>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>> get the jars from local distributed cache instead of fetching them
>> from the http server. Could you send the command you used to submit
>> the job? -Xiangrui
>>
>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
>> > Hi Xiangrui,
>> >
>> > I actually used `yarn-standalone`, sorry for misleading. I did debugging
>> > in
>> > the last couple days, and everything up to updateDependency in
>> > executor.scala works. I also checked the file size and md5sum in the
>> > executors, and they are the same as the one in driver. Gonna do more
>> > testing
>> > tomorrow.
>> >
>> > Thanks.
>> >
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > 

writing my own RDD

2014-05-16 Thread Koert Kuipers
in writing my own RDD i ran into a few issues with respect to stuff being
private in spark.

in compute i would like to return an iterator that respects task killing
(as HadoopRDD does), but the mechanics for that are inside the private
InterruptibleIterator. also the exception i am supposed to throw
(TaskKilledException) is private to spark.


Re: How to run the SVM and LogisticRegression

2014-05-16 Thread Debasish Das
There are examples to run them in BinaryClassification.scala in
org.apache.spark.examples...



On Wed, May 14, 2014 at 1:36 PM, yxzhao  wrote:

>
> Hello,
> I found the classfication algorithms SVM and LogisticRegression implemented
> in the following directory. And how to run them? What is the commnad line
> should be? Thanks.
>
> spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Counting things only once

2014-05-16 Thread Mark Hamstra
Better, the current location:
https://issues.apache.org/jira/browse/SPARK-732


On Fri, May 16, 2014 at 1:47 PM, Mark Hamstra wrote:

> https://spark-project.atlassian.net/browse/SPARK-732
>
>
> On Fri, May 16, 2014 at 9:05 AM, Daniel Siegmann  > wrote:
>
>> I want to use accumulators to keep counts of things like invalid lines
>> found and such, for reporting purposes. Similar to Hadoop counters. This
>> may seem simple, but my case is a bit more complicated. The code which is
>> creating an RDD from a transform is separated from the code which performs
>> the operation on that RDD - or operations (I can't make any assumption as
>> to how many operations will be done on this RDD). There are two issues: (1)
>> I want to retrieve the accumulator value only after it has been computed,
>> and (2) I don't wan to count the same thing twice if the RDD is recomputed.
>>
>> Here's a simple example, converting strings to integers. Any records
>> which can't be parsed as an integer are dropped, but I want to count how
>> many times that happens:
>>
>> def numbers(val input: RDD[String]) : RDD[Int] = {
>> val invalidRecords = sc.accumulator(0)
>> input.flatMap { record =>
>> try {
>> Seq(record.toInt)
>> } catch {
>> case NumberFormatException => invalidRecords += 1; Seq()
>> }
>> }
>> }
>>
>> I need some way to know when the result RDD has been computed so I can
>> get the accumulator value and reset it. Or perhaps it would be better to
>> say I need a way to ensure the accumulator value is computed exactly once
>> for a given RDD. Anyone know a way to do this? Or anything I might look
>> into? Or is this something that just isn't supported in Spark?
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
>> E: daniel.siegm...@velos.io W: www.velos.io
>>  
>>
>
>


Passing runtime config to workers?

2014-05-16 Thread Robert James
What is a good way to pass config variables to workers?

I've tried setting them in environment variables via spark-env.sh, but, as
far as I can tell, the environment variables set there don't appear in
workers' environments.  If I want to be able to configure all workers,
what's a good way to do it?  For example, I want to tell all workers:
USE_ALGO_A or USE_ALGO_B - but I don't want to recompile.


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread Robert James
I've experienced the same bug, which I had to workaround manually.  I
posted the details here:
http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster

On 5/15/14, DB Tsai  wrote:
> Hi guys,
>
> I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.
>
> Example 1) This is how Spark adds jars. Basically, add jars to
> cutomURLClassLoader.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
>
> It doesn't work for two reasons. a) We don't pass the
> customURLClassLoader to task, so it's only available in the
> Executor.scala.  b) Even we do so, we need to get the class by
> loader.loadClass("Class Name").newInstance(), and get the Method by
> getDeclaredMethod to run it.
>
>
> Example 2) It works by getting the class using loadClass API, and then
> get and run the Method by getDeclaredMethod. Since we don't know which
> classes users will use, it's not a solution.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
>
>
> Example 3) Add jars to systemClassLoader and have them accessible in
> JVM. Users can use the classes directly.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
>
> I'm now porting example 3) to Spark, and will let you know if it works.
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
>> Hi Xiangrui,
>>
>> We're still using Spark 0.9 branch, and our job is submitted by
>>
>> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>>   --jar  \
>>   --class  \
>>   --args  \
>>   --num-workers  \
>>   --master-class 
>>   --master-memory  \
>>   --worker-memory  \
>>   --addJars 
>>
>>
>> Based on my understanding of the code in yarn-standalone mode, the jar
>> distributing from local machine to application master is through
>> distributed
>> cache (using hadoop yarn-client api). From application master to
>> executors,
>> it's through http server. I maybe wrong, but if you look at the code in
>> SparkContext addJar method, you can see the jar is added to http server
>> in
>> yarn-standalone mode.
>>
>> if (SparkHadoopUtil.get.isYarnMode() && master ==
>> "yarn-standalone") {
>>   // In order for this to work in yarn standalone mode the
>> user
>> must specify the
>>   // --addjars option to the client to upload the file into
>> the
>> distributed cache
>>   // of the AM to make it show up in the current working
>> directory.
>>   val fileName = new Path(uri.getPath).getName()
>>   try {
>> env.httpFileServer.addJar(new File(fileName))
>>   } catch {
>>
>> Those jars will be fetched in Executor from http server and added to
>> classloader of "Executor" class, see
>>
>>   private def updateDependencies(newFiles: HashMap[String, Long],
>> newJars:
>> HashMap[String, Long]) {
>> synchronized {
>>   // Fetch missing dependencies
>>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
>> -1L) < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>> currentFiles(name) = timestamp
>>   }
>>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name,
>> -1L)
>> < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>> currentJars(name) = timestamp
>> // Add it to our class loader
>> val localName = name.split("/").last
>> val url = new File(SparkFiles.getRootDirectory,
>> localName).toURI.toURL
>>
>> if (!urlClassLoader.getURLs.contains(url)) {
>>   urlClassLoader.addURL(url)
>> }
>>   }
>>
>>
>> The problem seems to be that jars are added to the classloader of
>> "Executor"
>> classes, and they are not accessible in Task.scala.
>>
>> I verified this by trying to load our custom classes in Executor.scala,
>> and
>> it works. But if I tried to load those classes in Task.scala, I'll get
>> classNotFound exception.
>>
>> Thanks.
>>
>>
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:
>>>
>>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>>> get the jars from local distributed cache instead of fetching them
>>> from the http server. Could you send the command you used to submit
>>> the job? -Xiangrui
>>>
>>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
>>> > Hi Xiangrui,
>

Re: Counting things only once

2014-05-16 Thread Mark Hamstra
https://spark-project.atlassian.net/browse/SPARK-732


On Fri, May 16, 2014 at 9:05 AM, Daniel Siegmann
wrote:

> I want to use accumulators to keep counts of things like invalid lines
> found and such, for reporting purposes. Similar to Hadoop counters. This
> may seem simple, but my case is a bit more complicated. The code which is
> creating an RDD from a transform is separated from the code which performs
> the operation on that RDD - or operations (I can't make any assumption as
> to how many operations will be done on this RDD). There are two issues: (1)
> I want to retrieve the accumulator value only after it has been computed,
> and (2) I don't wan to count the same thing twice if the RDD is recomputed.
>
> Here's a simple example, converting strings to integers. Any records which
> can't be parsed as an integer are dropped, but I want to count how many
> times that happens:
>
> def numbers(val input: RDD[String]) : RDD[Int] = {
> val invalidRecords = sc.accumulator(0)
> input.flatMap { record =>
> try {
> Seq(record.toInt)
> } catch {
> case NumberFormatException => invalidRecords += 1; Seq()
> }
> }
> }
>
> I need some way to know when the result RDD has been computed so I can get
> the accumulator value and reset it. Or perhaps it would be better to say I
> need a way to ensure the accumulator value is computed exactly once for a
> given RDD. Anyone know a way to do this? Or anything I might look into? Or
> is this something that just isn't supported in Spark?
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>  
>


What is the difference between a Spark Worker and a Spark Slave?

2014-05-16 Thread Robert James
What is the difference between a Spark Worker and a Spark Slave?


Nested method in a class: Task not serializable?

2014-05-16 Thread Pierre B
Hi!

I understand the usual "Task not serializable" issue that arises when
accessing a field or a method that is out of scope of a closure.

To fix it, I usually define a local copy of these fields/methods, which
avoids the need to serialize the whole class:

class MyClass(val myField: Any) {
  def run() = {
val f = sc.textFile("hdfs://xxx.xxx.xxx.xxx/file.csv")

val myField = this.myField
println(f.map( _ + myField ).count)
  }
}

===

Now, if I define a nested function in the run method, it cannot be
serialized:
class MyClass() {
  def run() = {
val f = sc.textFile("hdfs://xxx.xxx.xxx.xxx/file.csv")

def mapFn(line: String) = line.split(";")

val myField = this.myField
println(f.map( mapFn( _ ) ).count)

  }
}

I don't understand since I thought "mapFn" would be in scope...
Even stranger, if I define mapFn to be a val instead of a def, then it
works:

class MyClass() {
  def run() = {
val f = sc.textFile("hdfs://xxx.xxx.xxx.xxx/file.csv")

val mapFn = (line: String) => line.split(";")
   
println(f.map( mapFn( _ ) ).count)
  }
}

Is this related to the way Scala represents nested functions?

What's the recommended way to deal with this issue ?

Thanks for your help,

Pierre



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Nested-method-in-a-class-Task-not-serializable-tp5869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Understanding epsilon in KMeans

2014-05-16 Thread Long Pham
Stuti,

I'm answering your questions in order:

1. From MLLib
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L159
*,* you can see that clustering stops when we have reached*maxIterations* or
there are no more*activeRuns*.

KMeans is executed *runs* times in parallel, and the best clustering found
over all *runs* is returned. For each run, the algorithm will stop if:The
number of iteration reaches *maxIterations*, orEvery cluster center moved
less than*epsilon *in the last iteration.

2. I can't find the source code for Mahout that refer to the "Convergence
Threshold" but I suspect the threshold and MLLib's *epsilon*are the same
concepts. There is no concept of parallel runs in Mahout.

Ref: https://mahout.apache.org/users/clustering/k-means-clustering.html

3. To set MLLib's KMeans to have *epsilon *of 0.1 and then train the model,
you can do the following:

new KMeans().setK(k).setMaxIterations(
maxIterations).setRuns(runs).setInitializationMode(initializationMode)
*.setEpsilon(0.1)*.run(data)

Enjoy,

Long Pham
Software Engineer at Adatao, Inc.
longp...@adatao.com
On May 15, 2014 7:29 PM, "Stuti Awasthi"  wrote:

>  Hi All,
>
>
>
> Any ideas on this ??
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
> *From:* Stuti Awasthi
> *Sent:* Wednesday, May 14, 2014 6:20 PM
> *To:* user@spark.apache.org
> *Subject:* Understanding epsilon in KMeans
>
>
>
> Hi All,
>
>
>
> I wanted to understand the functionality of epsilon in KMeans in Spark
> MLlib.
>
>
>
> As per documentation :
>
> distance threshold within which we've consider centers to have
> converged.If all centers move less than this *Euclidean* distance, we
> stop iterating one run.
>
>
>
> Now I have assumed that if centers are moving less than epsilon value then
> Clustering Stops but then what does it mean by “we stop iterating one run”..
>
>
> Now suppose I have given maxIterations=10  and epsilon = 0.1 and assume
> that centers are afteronly 2 iteration, the epsilon condition is met i.e.
> now centers are moving only less than 0.1..
>
>
>
> Now what happens ?? The whole 10 iterations are completed OR the
> Clustering stops ??
>
>
>
> My 2nd query is in Mahout, there is a configuration param : “Convergence
> Threshold (cd)”   which states : “in an iteration, the centroids don’t move
> more than this distance, no further iterations are done and clustering
> stops.”
>
>
>
> So is epsilon and cd similar ??
>
>
>
> 3rd query :
>
> How to pass epsilon as a configurable param. KMeans.train() does not
> provide the way but in code I can see “setEpsilon” as method. SO if I want
> to pass the parameter as epsilon=0.1 , how may I do that..
>
>
>
> Pardon my ignorance
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


Re: cant get tests to pass anymore on master master

2014-05-16 Thread Koert Kuipers
yeah sure. it is ubuntu 12.04 with jdk1.7.0_40
what else is relevant that i can provide?


On Thu, May 15, 2014 at 12:17 PM, Sean Owen  wrote:

> FWIW I see no failures. Maybe you can say more about your environment, etc.
>
> On Wed, May 7, 2014 at 10:01 PM, Koert Kuipers  wrote:
> > i used to be able to get all tests to pass.
> >
> > with java 6 and sbt i get PermGen errors (no matter how high i make the
> > PermGen). so i have given up on that.
> >
> > with java 7 i see 1 error in a bagel test and a few in streaming tests.
> any
> > ideas? see the error in BagelSuite below.
> >
> > [info] - large number of iterations *** FAILED *** (10 seconds, 105
> > milliseconds)
> > [info]   The code passed to failAfter did not complete within 10 seconds.
> > (BagelSuite.scala:85)
> > [info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
> > [info]   at
> >
> org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
> > [info]   at
> >
> org.scalatest.concurrent.Timeouts$$anonfun$failAfter$1.apply(Timeouts.scala:250)
> > [info]   at
> > org.scalatest.concurrent.Timeouts$class.timeoutAfter(Timeouts.scala:282)
> > [info]   at
> > org.scalatest.concurrent.Timeouts$class.failAfter(Timeouts.scala:246)
> > [info]   at
> org.apache.spark.bagel.BagelSuite.failAfter(BagelSuite.scala:32)
> > [info]   at
> >
> org.apache.spark.bagel.BagelSuite$$anonfun$3.apply$mcV$sp(BagelSuite.scala:85)
> > [info]   at
> > org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
> > [info]   at
> > org.apache.spark.bagel.BagelSuite$$anonfun$3.apply(BagelSuite.scala:85)
> > [info]   at org.scalatest.FunSuite$$anon$1.apply(FunSuite.scala:1265)
> > [info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1974)
> > [info]   at
> > org.apache.spark.bagel.BagelSuite.withFixture(BagelSuite.scala:32)
> >
>


java serialization errors with spark.files.userClassPathFirst=true

2014-05-16 Thread Koert Kuipers
when i set spark.files.userClassPathFirst=true, i get java serialization
errors in my tasks, see below. when i set userClassPathFirst back to its
default of false, the serialization errors are gone. my spark.serializer is
KryoSerializer.

the class org.apache.hadoop.fs.Path is in the spark assembly jar, but not
in my task jars (the ones i added to the SparkConf). so looks like the
ClosureSerializer is having trouble with this class once the
ChildExecutorURLClassLoader is used? thats me just guessing.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 1.0:5 failed 4 times, most recent failure: Exception
failure in TID 31 on host node05.tresata.com:
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
java.lang.Class.getDeclaredConstructors0(Native Method)
java.lang.Class.privateGetDeclaredConstructors(Class.java:2398)
java.lang.Class.getDeclaredConstructors(Class.java:1838)

java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1697)
java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:50)
java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:203)
java.security.AccessController.doPrivileged(Native Method)

java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:200)
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:556)

java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1580)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1493)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1729)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1326)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1950)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1874)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1756)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1326)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:348)
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
java.lang.reflect.Method.invoke(Method.java:597)

java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1852)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1756)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1326)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1950)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1874)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1756)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1326)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:348)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60)

org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:66)

org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:139)

java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1795)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1754)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1326)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:348)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:82)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:190)

java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
java.lang.Thread.run(Thread.java:662)


unsubscribe

2014-05-16 Thread eric perler

  

Re: Hadoop 2.3 Centralized Cache vs RDD

2014-05-16 Thread hequn cheng
I tried centralized cache step by step following the apache hadoop oficial
website, but it seems centralized cache doesn't work.
see :
http://stackoverflow.com/questions/22293358/centralized-cache-failed-in-hadoop-2-3
.
Can anyone succeed?


2014-05-15 5:30 GMT+08:00 William Kang :

> Hi,
> Any comments or thoughts on the implications of the newly released feature
> from Hadoop 2.3 on the centralized cache? How different it is from RDD?
>
> Many thanks.
>
>
> Cao
>


Re: java serialization errors with spark.files.userClassPathFirst=true

2014-05-16 Thread Koert Kuipers
well, i modified ChildExecutorURLClassLoader to also delegate to
parentClassloader if NoClassDefFoundError is thrown... now i get yet
another error. i am clearly missing something with these classloaders. such
nasty stuff... giving up for now. just going to have to not use
spark.files.userClassPathFirst=true for now, until i have more time to look
at this.

14/05/16 13:58:59 ERROR Executor: Exception in task ID 3
java.lang.ClassCastException: cannot assign instance of scala.None$ to
field org.apache.spark.rdd.RDD.checkpointData of type scala.Option in
instance of MyRDD
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
at
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1995)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:60)



On Fri, May 16, 2014 at 1:46 PM, Koert Kuipers  wrote:

> after removing all class paramater of class Path from my code, i tried
> again. different but related eror when i set
> spark.files.userClassPathFirst=true
>
> now i dont even use FileInputFormat directly. HadoopRDD does...
>
> 14/05/16 12:17:17 ERROR Executor: Exception in task ID 45
> java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at
> org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:51)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:57)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1481)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1331)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInp

Re: different in spark on yarn mode and standalone mode

2014-05-16 Thread Vipul Pandey
And I thought I sent it to the right list! Here you go again - Question below : 

On May 14, 2014, at 3:06 PM, Vipul Pandey  wrote:

> So here's a followup question : What's the preferred mode? 
> We have a new cluster coming up with petabytes of data and we intend to take 
> Spark to production. We are trying to figure out what mode would be safe and 
> stable for production like environment. 
> pros and cons? anyone? 
> 
> Any reasons why one would chose Standalone over YARN?
> 
> Thanks,
> Vipul





> 
> On May 4, 2014, at 5:56 PM, Liu, Raymond  wrote:
> 
>> In the core, they are not quite different
>> In standalone mode, you have spark master and spark worker who allocate 
>> driver and executors for your spark app.
>> While in Yarn mode, Yarn resource manager and node manager do this work.
>> When the driver and executors have been launched, the rest part of resource 
>> scheduling go through the same process, say between driver and executor 
>> through akka actor.
>> 
>> Best Regards,
>> Raymond Liu
>> 
>> 
>> -Original Message-
>> From: Sophia [mailto:sln-1...@163.com] 
>> 
>> Hey you guys,
>> What is the different in spark on yarn mode and standalone mode about 
>> resource schedule?
>> Wish you happy everyday.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 



Re: Real world

2014-05-16 Thread Bertrand Dechoux
http://spark-summit.org ?

Bertrand


On Thu, May 8, 2014 at 2:05 AM, Ian Ferreira wrote:

> Folks,
>
> I keep getting questioned on real world experience of Spark as in mission
> critical production deployments. Does anyone have some war stories to share
> or know of resources to review?
>
> Cheers
> - Ian
>


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
The jars are actually there (and in classpath), but you need to load
through reflection. I've another thread giving the workaround.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, May 16, 2014 at 1:37 PM, Robert James wrote:

> I've experienced the same bug, which I had to workaround manually.  I
> posted the details here:
>
> http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster
>
> On 5/15/14, DB Tsai  wrote:
> > Hi guys,
> >
> > I think it maybe a bug in Spark. I wrote some code to demonstrate the
> bug.
> >
> > Example 1) This is how Spark adds jars. Basically, add jars to
> > cutomURLClassLoader.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
> >
> > It doesn't work for two reasons. a) We don't pass the
> > customURLClassLoader to task, so it's only available in the
> > Executor.scala.  b) Even we do so, we need to get the class by
> > loader.loadClass("Class Name").newInstance(), and get the Method by
> > getDeclaredMethod to run it.
> >
> >
> > Example 2) It works by getting the class using loadClass API, and then
> > get and run the Method by getDeclaredMethod. Since we don't know which
> > classes users will use, it's not a solution.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
> >
> >
> > Example 3) Add jars to systemClassLoader and have them accessible in
> > JVM. Users can use the classes directly.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
> >
> > I'm now porting example 3) to Spark, and will let you know if it works.
> >
> > Thanks.
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
> >> Hi Xiangrui,
> >>
> >> We're still using Spark 0.9 branch, and our job is submitted by
> >>
> >> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
> >>   --jar  \
> >>   --class  \
> >>   --args  \
> >>   --num-workers  \
> >>   --master-class 
> >>   --master-memory  \
> >>   --worker-memory  \
> >>   --addJars 
> >>
> >>
> >> Based on my understanding of the code in yarn-standalone mode, the jar
> >> distributing from local machine to application master is through
> >> distributed
> >> cache (using hadoop yarn-client api). From application master to
> >> executors,
> >> it's through http server. I maybe wrong, but if you look at the code in
> >> SparkContext addJar method, you can see the jar is added to http server
> >> in
> >> yarn-standalone mode.
> >>
> >> if (SparkHadoopUtil.get.isYarnMode() && master ==
> >> "yarn-standalone") {
> >>   // In order for this to work in yarn standalone mode the
> >> user
> >> must specify the
> >>   // --addjars option to the client to upload the file into
> >> the
> >> distributed cache
> >>   // of the AM to make it show up in the current working
> >> directory.
> >>   val fileName = new Path(uri.getPath).getName()
> >>   try {
> >> env.httpFileServer.addJar(new File(fileName))
> >>   } catch {
> >>
> >> Those jars will be fetched in Executor from http server and added to
> >> classloader of "Executor" class, see
> >>
> >>   private def updateDependencies(newFiles: HashMap[String, Long],
> >> newJars:
> >> HashMap[String, Long]) {
> >> synchronized {
> >>   // Fetch missing dependencies
> >>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
> >> -1L) < timestamp) {
> >> logInfo("Fetching " + name + " with timestamp " + timestamp)
> >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
> >> conf)
> >> currentFiles(name) = timestamp
> >>   }
> >>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name,
> >> -1L)
> >> < timestamp) {
> >> logInfo("Fetching " + name + " with timestamp " + timestamp)
> >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
> >> conf)
> >> currentJars(name) = timestamp
> >> // Add it to our class loader
> >> val localName = name.split("/").last
> >> val url = new File(SparkFiles.getRootDirectory,
> >> localName).toURI.toURL
> >>
> >> if (!urlClassLoader.getURLs.contains(url)) {
> >>   urlClassLoader.addURL(url)
> >> }
> >>   }
> >>
> >>
> >> The problem seems to be that jars are added to the classloader of
> >> "Executor"
> >> classes, and they are not accessible in Task.scala.
> >>
> >> I verified this by trying to load our custom classes in Executor.scala,
> >> and
> >> it works. But if I tried to load those classes in Task.scala, I'll get
> >> cl

Re: Spark unit testing best practices

2014-05-16 Thread Andras Nemeth
Thanks for the answers!

On a concrete example, here is what I did to test my (wrong :) ) hypothesis
before writing my email:
class SomethingNotSerializable {
  def process(a: Int): Int = 2 *a
}
object NonSerializableClosure extends App {
  val sc = new spark.SparkContext(
  "local",
  "SerTest",
  "/home/xandrew/spark-0.9.0-incubating",
  Seq("target/scala-2.10/sparktests_2.10-0.1-SNAPSHOT.jar"))
  val sns = new SomethingNotSerializable
  println(sc.parallelize(Seq(1,2,3))
.map(sns.process(_))
.reduce(_ + _))
}

This program prints 12 correctly. If I change "local" to point to my spark
master the code fails on the worker with a NullPointerException in the line
".map(sns.process(_))".

But I have to say that my original assumption that this is a serialization
issue was wrong, as adding extends Serializable to my class does _not_
solve the problem in non-local mode. This seems to be something more
convoluted, the sns reference in my closure is probably not stored by
value, instead I guess it's a by name reference to
"NonSerializableClosure.sns". I'm a bit surprised why this results in a
NullPointerException instead of some error when trying to run the
constructor of this object on the worker. Maybe something to do with the
magic of App.

Anyways, while this is indeed an example of an error that doesn't manifest
in local mode, I guess it turns out to be convoluted enough that we won't
worry about it for now, use local in tests, and I'll ask again if we see
some actual prod vs unittest problems.


On using local-cluster, this does sound like exactly what I had in mind.
But it doesn't seem to work for application developers. It seems to assume
you are running within a spark build (it fails while looking for the file
bin/compute-classpath.sh). So maybe that's a reason it's not documented...

Cheers,
Andras





On Wed, May 14, 2014 at 7:58 PM, Mark Hamstra wrote:

> Local mode does serDe, so it should expose serialization problems.
>
>
> On Wed, May 14, 2014 at 10:53 AM, Philip Ogren wrote:
>
>> Have you actually found this to be true?  I have found Spark local mode
>> to be quite good about blowing up if there is something non-serializable
>> and so my unit tests have been great for detecting this.  I have never seen
>> something that worked in local mode that didn't work on the cluster because
>> of different serialization requirements between the two.  Perhaps it is
>> different when using Kryo
>>
>>
>>
>> On 05/14/2014 04:34 AM, Andras Nemeth wrote:
>>
>>> E.g. if I accidentally use a closure which has something
>>> non-serializable in it, then my test will happily succeed in local mode but
>>> go down in flames on a real cluster.
>>>
>>
>>
>


What does Spark cache() actually do?

2014-05-16 Thread PengWeiPRC
Hi there,

I was wondering if some one could explain me how the cache() function works
in Spark in these phases:

(1) If I have a huge file, say 1TB, which cannot be entirely stored in
Memory. What will happen if I try to create a RDD of this huge file and
"cache"? 

(2) If it works in Spark, it can definitely store part of the data. Which
part of the data will be stored in memory, especially, do the new data evict
the old data out of memory just like what cache works?

(3) What would happen if I try to load one RDD and cache, and then another
and cache too, and so on so forth? Will the new RDDs evict the old RDDs
cached in memory?

Thanks very much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-cache-actually-do-tp5778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: slf4j and log4j loop

2014-05-16 Thread Adrian Mocanu
Please ignore. This was sent last week not sure why it arrived so late.

-Original Message-
From: amoc [mailto:amoc...@verticalscope.com] 
Sent: May-09-14 10:13 AM
To: u...@spark.incubator.apache.org
Subject: Re: slf4j and log4j loop

Hi Patrick/Sean,
Sorry to resurrect this thread, but after upgrading to Spark 9.1 I still get 
this error on runtime. ..trying to run some tests here. 
Has this actually been integrated int Spark 9.1?

Thanks again
-A



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/slf4j-and-log4j-loop-tp2699p5524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: different in spark on yarn mode and standalone mode

2014-05-16 Thread Vipul Pandey
Thanks for responding, Sandy. 

YARN for sure is a more mature way of working on shared resources. I was not 
sure about how stable Spark on YARN is and if anyone is using it in production. 
I have been using Standalone mode in our dev cluster but multi-tenancy and 
resource allocation wise it's difficult to call it production ready yet. (I'm 
not sure if 1.0 has significant changes or not as I haven't kept up lately)

What I get from your response below is that for production like environment 
YARN will be a better choice as, for our case, we don't care too much about 
saving a few seconds in startup time. Stability will definitely be a concern 
but I"m assuming that Spark on Yarn is not terrible either and will mature over 
the period of time, in which case we don't have to compromise on other 
important factors (like resource sharing and prioritization)

btw, can I see information on what RDDs are cached and their size etc. on YARN? 
like I see in the standalone mode UI?


~Vipul

On May 15, 2014, at 5:24 PM, Sandy Ryza  wrote:

> Hi Vipul,
> 
> Some advantages of using YARN:
> * YARN allows you to dynamically share and centrally configure the same pool 
> of cluster resources between all frameworks that run on YARN.  You can throw 
> your entire cluster at a MapReduce job, then use some of it on an Impala 
> query and the rest on Spark application, without any changes in configuration.
> * You can take advantage of all the features of YARN schedulers for 
> categorizing, isolating, and prioritizing workloads.
> * YARN provides CPU-isolation between processes with CGroups. Spark 
> standalone mode requires each application to run an executor on every node in 
> the cluster - with YARN, you choose the number of executors to use.
> * YARN is the only cluster manager for Spark that supports security and 
> Kerberized clusters.
> 
> Some advantages of using standalone:
> * It has been around for longer, so it is likely a little more stable.
> * Many report faster startup times for apps.
> 
> -Sandy
> 
> 
> On Wed, May 14, 2014 at 3:06 PM, Vipul Pandey  wrote:
> So here's a followup question : What's the preferred mode?
> We have a new cluster coming up with petabytes of data and we intend to take 
> Spark to production. We are trying to figure out what mode would be safe and 
> stable for production like environment.
> pros and cons? anyone?
> 
> Any reasons why one would chose Standalone over YARN?
> 
> Thanks,
> Vipul
> 
> On May 4, 2014, at 5:56 PM, Liu, Raymond  wrote:
> 
> > In the core, they are not quite different
> > In standalone mode, you have spark master and spark worker who allocate 
> > driver and executors for your spark app.
> > While in Yarn mode, Yarn resource manager and node manager do this work.
> > When the driver and executors have been launched, the rest part of resource 
> > scheduling go through the same process, say between driver and executor 
> > through akka actor.
> >
> > Best Regards,
> > Raymond Liu
> >
> >
> > -Original Message-
> > From: Sophia [mailto:sln-1...@163.com]
> >
> > Hey you guys,
> > What is the different in spark on yarn mode and standalone mode about 
> > resource schedule?
> > Wish you happy everyday.
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 



Re: Schema view of HadoopRDD

2014-05-16 Thread Mayur Rustagi
I guess what you are trying to do is get a columnar projection on your
data, sparksql maybe a good option for you (especially if your data is
sparse & good for columnar projection).
If you are looking to work with simple key value then you are better off
using Hbase input reader in hadoopIO  & get a pairRDD.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, May 8, 2014 at 10:51 AM, Debasish Das wrote:

> Hi,
>
> For each line that we read as textLine from HDFS, we have a schema..if
> there is an API that takes the schema as List[Symbol] and maps each token
> to the Symbol it will be helpful...
>
> One solution is to keep data on hdfs as avro/protobuf serialized objects
> but not sure if that works on HBase input...we are testing HDFS right now
> but finally we will read from a persistent store like hbase...so basically
> the immutableBytes need to be converted to a schema view as well incase we
> don't want to write the whole row as a protobuf...
>
> Does RDDs provide a schema view of the dataset on HDFS / HBase ?
>
> Thanks.
> Deb
>
>


Re: Schema view of HadoopRDD

2014-05-16 Thread Michael Armbrust
Here is a link with more info:
http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html


On Wed, May 7, 2014 at 10:09 PM, Debasish Das wrote:

> Hi,
>
> For each line that we read as textLine from HDFS, we have a schema..if
> there is an API that takes the schema as List[Symbol] and maps each token
> to the Symbol it will be helpful...
>
> Does RDDs provide a schema view of the dataset on HDFS ?
>
> Thanks.
> Deb
>


Re: filling missing values in a sequence

2014-05-16 Thread Sean Owen
Not sure if this is feasible, but this literally does what I think you
are describing:

sc.parallelize(rdd1.first to rdd1.last)

On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi  wrote:
> Hi,
> I am trying to find a way to fill in missing values in an RDD. The RDD is a
> sorted sequence.
> For example, (1, 2, 3, 5, 8, 11, ...)
> I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>
> One way to do this is to "slide and zip"
> rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
> x = rdd1.first
> rdd2 = rdd1 filter (_ != x)
> rdd3 = rdd2 zip rdd1
> rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and y }
>
> Another method which I think is more efficient is to use mapParititions() on
> rdd1 to be able to iterate on elements of rdd1 in each partition. However,
> that leaves the boundaries of the partitions to be "unfilled". Is there a
> way within the function passed to mapPartitions, to read the first element
> in the next partition?
>
> The latter approach also appears to work for a general "sliding window"
> calculation on the RDD. The former technique requires a lot of "sliding and
> zipping" and I believe it is not efficient. If only I could read the next
> partition...I have tried passing a pointer to rdd1 to the function passed to
> mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
> Spark cannot deal with a mapper calling another mapper (since it happens on
> a worker not the driver)
>
> Mohit.


Advanced log processing

2014-05-16 Thread Laurent T
Hi,

I have some complex behavior i'd like to be advised on as i'm really new to
Spark.

I'm reading some log files that contains various events. There are two types
of events: parents and children. A child event can only have one parent and
a parent can have multiple children.

Currently i'm mapping my lines to get a Tuple2(parentID, Tuple2(Parent,
List)) and then reducing by key to combine all children into one list
and associate them with their parent.
.reduceByKey(new Function2>, Tuple2>, Tuple2>>(){...}).

It works fine on static data. But in production, i will have to process only
part of the log files, for instance, everyday at midnight i'll process the
last day of logs.

So i'm facing the problem that a Parent may arrive one day and children on
the next day. Right after reducing, i'm having Tuples with no parent and i'd
like, only for those, to go check the previous log files to find the parent
in a efficient way.

My first idea would be to branch data using a filter and it's opposite. I'll
then read previous files one by one until i've found all parents or i've
reached a predefined limit. I would finally merge back everything to
finalize my job.
The problem is, i'm not even sure how i can do that. The filter part should
be easy but how am i gonna scan files one by one using spark ?

I hope someone can guide me through this.
FYI, there will be gigs of data to process.

Thanks
Laurent



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Advanced-log-processing-tp5743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: is Mesos falling out of favor?

2014-05-16 Thread Christopher Nguyen
Paco, that's a great video reference, thanks.

To be fair to our friends at Yahoo, who have done a tremendous amount to
help advance the cause of the BDAS stack, it's not FUD coming from them,
certainly not in any organized or intentional manner.

In vacuo we prefer Mesos ourselves, but also can't ignore the fact that in
the larger market, many enterprise technology stack decisions are made
based on their existing vendor support relationships.

And in view of Mesos, super happy to see Mesosphere growing!

Sent while mobile. Pls excuse typos etc.
That's FUD. Tracking the Mesos and Spark use cases, there are very large
production deployments of these together. Some are rather private but
others are being surfaced. IMHO, one of the most amazing case studies is
from Christina Delimitrou http://youtu.be/YpmElyi94AA

For a tutorial, use the following but upgrade it to latest production for
Spark. There was a related O'Reilly webcast and Strata tutorial as well:
http://mesosphere.io/learn/run-spark-on-mesos/

FWIW, I teach "Intro to Spark" with sections on CM4, YARN, Mesos, etc.
Based on lots of student experiences, Mesos is clearly the shortest path to
deploying a Spark cluster if you want to leverage the robustness,
multi-tenancy for mixed workloads, less ops overhead, etc., that show up
repeatedly in the use case analyses.

My opinion only and not that of any of my clients: "Don't believe the FUD
from YHOO unless you really want to be stuck in 2009."


On Wed, May 7, 2014 at 8:30 AM, deric  wrote:

> I'm also using right now SPARK_EXECUTOR_URI, though I would prefer
> distributing Spark as a binary package.
>
> For running examples with `./bin/run-example ...` it works fine, however
> tasks from spark-shell are getting lost.
>
> Error: Could not find or load main class
> org.apache.spark.executor.MesosExecutorBackend
>
> which looks more like problem with sbin/spark-executor and missing paths to
> jar. Anyone encountered this error before?
>
> I guess Yahoo invested quite a lot of effort into YARN and Spark
> integration
> (moreover when Mahout is migrating to Spark there's much more interest in
> Hadoop and Spark integration). If there would be some "Mesos company"
> working on Spark - Mesos integration it could be at least on the same
> level.
>
> I don't see any other reason why would be YARN better than Mesos,
> personally
> I like the latter, however I haven't checked YARN for a while, maybe
> they've
> made a significant progress. I think Mesos is more universal and flexible
> than YARN.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5481.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Using String Dataset for Logistic Regression

2014-05-16 Thread Brian Gawalt
Pravesh,

Correct, the logistic regression engine is set up to perform classification
tasks that take feature vectors (arrays of real-valued numbers) that are
given a class label, and learning a linear combination of those features
that divide the classes. As the above commenters have mentioned, there's
lots of different ways to turn string data into feature vectors. 

For instance, if you're classifying documents between, say, spam or valid
email, you may want to start with a bag-of-words model
(http://en.wikipedia.org/wiki/Bag-of-words_model ) or the rescaled variant
TF-IDF ( http://en.wikipedia.org/wiki/Tf%E2%80%93idf ). You'd turn a single
document into a single, high-dimensional, sparse vector whose element j
encodes the number of appearance term j. Maybe you want to try the
experiment by featurizing on bigrams, trigrams, etc...

Or if you're just trying to tell "english language tweets" from "non-english
language tweets", in which case the bag of words might be overkill: you
could instead try featurizing on just the counts of each pair of consecutive
characters. E.g., the first element counts "aa" appearances, then the second
"ab", then "zy" then "zz". Those will be smaller feature vectors,
capturing less information, but it's probably sufficient for the simpler
task, and you'll be able to fit the model with less data than trying to fit
a whole-word-based model.

Different applications are going to need more or less context from your
strings -- whole words? n-grams? just characters? treat them as ENUMs as in
the days of week example? -- so it might not make sense for Spark to come
with "a direct way" to turn a string attribute into a vector for use in
logistic regression. You'll have to settle on the featurization approach
that's right for your domain before you try training the logistic regression
classifier on your labelled feature vectors. 

Best,
-Brian





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p5882.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Andre Bois-Crettez

We never saw your exception when reading bzip2 files with spark.

But when we wrongly compiled spark against older version of hadoop (was
default in spark), we ended up with sequential reading of bzip2 file,
not taking advantage of block splits to work in parallel.
Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read
in parallel, as expected with a recent hadoop.

http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions

Make sure Spark is compiled against Hadoop v2

André

On 2014-05-13 18:08, Xiangrui Meng wrote:

Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:

Hi all,

Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
success?


I'm finding the following results on a recent commit (756c96 from 24hr ago)
and CDH 4.4.0:

Works: val r = sc.textFile("/user/aa/myfile.bz2").count
Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
s+"| " ).count

Specifically, I'm getting an exception coming out of the bzip2 libraries
(see below stacktraces), which is unusual because I'm able to read from that
file without an issue using the same libraries via Pig.  It was originally
created from Pig as well.

Digging a little deeper I found this line in the .bz2 decompressor's javadoc
for CBZip2InputStream:

"Instances of this class are not threadsafe." [source]


My current working theory is that Spark has a much higher level of
parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
exceptions much more frequently (as in can't finish a run over a little 2M
row file) vs hardly at all in other libraries.

The only other reference I could find to the issue was in presto-users, but
the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
the higher compression levels of .bz2.


Would love to hear if I have some kind of configuration issue or if there's
a bug in .bz2 that's fixed in later versions of CDH, or generally any other
thoughts on the issue.


Thanks!
Andrew



Below are examples of some exceptions I'm getting:

14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 65535
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




java.lang.ArrayIndexOutOfBoundsException: 90
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at scala.collec

Re: SparkContext startup time out

2014-05-16 Thread Sophia
How did you deal with this problem, I have met with it these days.God bless
me.

Best regard,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p5738.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem with hdfs access in spark job

2014-05-16 Thread Marcelo Vanzin
Hi Marcin,

On Wed, May 14, 2014 at 7:22 AM, Marcin Cylke
 wrote:
> - This looks like some problems with HA - but I've checked namenodes during 
> the job was running, and there
> was no switch between master and slave namenode.
>
> 14/05/14 15:25:44 ERROR security.UserGroupInformation: 
> PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) 
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category READ is not supported in state standby
> 14/05/14 15:25:44 WARN ipc.Client: Exception encountered while connecting to 
> the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category READ is not supported in state standby
> 14/05/14 15:25:44 ERROR security.UserGroupInformation: 
> PriviledgedActionException as:hc_client_reco_dev (auth:SIMPLE) 
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category READ is not supported in state standby

These are actually not worrisome; that's just the HDFS client doing
its own thing to support HA. It probably picked the "wrong" NN to try
first, and got the "NN in standby" exception, which it logs. Then it
tries the other NN and things just work as expected. Business as
usual.

Not sure about the other exceptions you mention. I've seen the second
one before, but it didn't seem to affect my jobs - maybe some race
during cleanup.

-- 
Marcelo


Re: Spark unit testing best practices

2014-05-16 Thread Nan Zhu
+1, at least with current code  

just watch the log printed by DAGScheduler…  

--  
Nan Zhu


On Wednesday, May 14, 2014 at 1:58 PM, Mark Hamstra wrote:

> serDe  



Re: Is there any problem on the spark mailing list?

2014-05-16 Thread darkjh
Same thing here. There must be a problem ...
I tried also send to user-subscr...@spark.apache.org or user-unsubstribe but
no response.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-on-the-spark-mailing-list-tp5509p5520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Andrew Ash
Hi Xiangrui,

// FYI I'm getting your emails late due to the Apache mailing list outage

I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

I'm also glad you were able to reproduce!  Please paste a link to the
Hadoop bug you file so I can follow along.

Thanks!
Andrew


On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:

> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> the problem you described, but it does contain several fixes to bzip2
> format. -Xiangrui
>
> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
> > Hi all,
> >
> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark
> with
> > success?
> >
> >
> > I'm finding the following results on a recent commit (756c96 from 24hr
> ago)
> > and CDH 4.4.0:
> >
> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String)
> =>
> > s+"| " ).count
> >
> > Specifically, I'm getting an exception coming out of the bzip2 libraries
> > (see below stacktraces), which is unusual because I'm able to read from
> that
> > file without an issue using the same libraries via Pig.  It was
> originally
> > created from Pig as well.
> >
> > Digging a little deeper I found this line in the .bz2 decompressor's
> javadoc
> > for CBZip2InputStream:
> >
> > "Instances of this class are not threadsafe." [source]
> >
> >
> > My current working theory is that Spark has a much higher level of
> > parallelism than Pig/Hadoop does and thus I get these wild
> IndexOutOfBounds
> > exceptions much more frequently (as in can't finish a run over a little
> 2M
> > row file) vs hardly at all in other libraries.
> >
> > The only other reference I could find to the issue was in presto-users,
> but
> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do
> want
> > the higher compression levels of .bz2.
> >
> >
> > Would love to hear if I have some kind of configuration issue or if
> there's
> > a bug in .bz2 that's fixed in later versions of CDH, or generally any
> other
> > thoughts on the issue.
> >
> >
> > Thanks!
> > Andrew
> >
> >
> >
> > Below are examples of some exceptions I'm getting:
> >
> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
> > java.lang.ArrayIndexOutOfBoundsException
> > java.lang.ArrayIndexOutOfBoundsException: 65535
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> > at
> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> > at java.io.InputStream.read(InputStream.java:101)
> > at
> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> > at
> org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> > at
> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> > at
> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> >
> >
> >
> >
> > java.lang.ArrayIndexOutOfBoundsException: 90
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> > at
> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> > at
> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> > at java.io.InputStream.read(InputStream.java:101)
> > at
> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> > at
> org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> > at
> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> > at
> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> > at
> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
> > at

Re: different in spark on yarn mode and standalone mode

2014-05-16 Thread Sandy Ryza
Hi Vipul,

Some advantages of using YARN:
* YARN allows you to dynamically share and centrally configure the same
pool of cluster resources between all frameworks that run on YARN.  You can
throw your entire cluster at a MapReduce job, then use some of it on an
Impala query and the rest on Spark application, without any changes in
configuration.
* You can take advantage of all the features of YARN schedulers for
categorizing, isolating, and prioritizing workloads.
* YARN provides CPU-isolation between processes with CGroups. Spark
standalone mode requires each application to run an executor on every node
in the cluster - with YARN, you choose the number of executors to use.
* YARN is the only cluster manager for Spark that supports security and
Kerberized clusters.

Some advantages of using standalone:
* It has been around for longer, so it is likely a little more stable.
* Many report faster startup times for apps.

-Sandy


On Wed, May 14, 2014 at 3:06 PM, Vipul Pandey  wrote:

> So here's a followup question : What's the preferred mode?
> We have a new cluster coming up with petabytes of data and we intend to
> take Spark to production. We are trying to figure out what mode would be
> safe and stable for production like environment.
> pros and cons? anyone?
>
> Any reasons why one would chose Standalone over YARN?
>
> Thanks,
> Vipul
>
> On May 4, 2014, at 5:56 PM, Liu, Raymond  wrote:
>
> > In the core, they are not quite different
> > In standalone mode, you have spark master and spark worker who allocate
> driver and executors for your spark app.
> > While in Yarn mode, Yarn resource manager and node manager do this work.
> > When the driver and executors have been launched, the rest part of
> resource scheduling go through the same process, say between driver and
> executor through akka actor.
> >
> > Best Regards,
> > Raymond Liu
> >
> >
> > -Original Message-
> > From: Sophia [mailto:sln-1...@163.com]
> >
> > Hey you guys,
> > What is the different in spark on yarn mode and standalone mode about
> resource schedule?
> > Wish you happy everyday.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

I verified that this is due to thread safety. I changed
SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per
worker. Then I can load the file without any problem with different
values of minPartitions. I will submit a JIRA to both Spark and
Hadoop.

Best,
Xiangrui

On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng  wrote:
> Hi Andrew,
>
> Could you try varying the minPartitions parameter? For example:
>
> val r = sc.textFile("/user/aa/myfile.bz2", 4).count
> val r = sc.textFile("/user/aa/myfile.bz2", 8).count
>
> Best,
> Xiangrui
>
> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>> the problem you described, but it does contain several fixes to bzip2
>> format. -Xiangrui
>>
>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>>> Hi all,
>>>
>>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>>> success?
>>>
>>>
>>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>>> and CDH 4.4.0:
>>>
>>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>>> s+"| " ).count
>>>
>>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>>> (see below stacktraces), which is unusual because I'm able to read from that
>>> file without an issue using the same libraries via Pig.  It was originally
>>> created from Pig as well.
>>>
>>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>>> for CBZip2InputStream:
>>>
>>> "Instances of this class are not threadsafe." [source]
>>>
>>>
>>> My current working theory is that Spark has a much higher level of
>>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>>> exceptions much more frequently (as in can't finish a run over a little 2M
>>> row file) vs hardly at all in other libraries.
>>>
>>> The only other reference I could find to the issue was in presto-users, but
>>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>>> the higher compression levels of .bz2.
>>>
>>>
>>> Would love to hear if I have some kind of configuration issue or if there's
>>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>>> thoughts on the issue.
>>>
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>
>>> Below are examples of some exceptions I'm getting:
>>>
>>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>>> java.lang.ArrayIndexOutOfBoundsException
>>> java.lang.ArrayIndexOutOfBoundsException: 65535
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>> at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>> at java.io.InputStream.read(InputStream.java:101)
>>> at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>> at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>> at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>
>>>
>>>
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 90
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>> at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>> at java.io.InputStream.read(InputStream.java:101)
>>> at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>> at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>> at
>>> or

Proper way to create standalone app with custom Spark version

2014-05-16 Thread Andrei
(Sorry if you have already seen this message - it seems like there were
some issues delivering messages to the list yesterday)

We can create standalone Spark application by simply adding
"spark-core_2.x" to build.sbt/pom.xml and connecting it to Spark master.

We can also build custom version of Spark (e.g. compiled against Hadoop
2.x) from source and deploy it to cluster manually.

But what is a proper way to use _custom version_ of Spark in _standalone
application_?


I'm currently trying to deploy custom version to local Maven repository and
add it to SBT project. Another option is to add Spark as local jar to every
project. But both of these ways look overcomplicated and in general wrong.

So what is the implied way to do it?

Thanks,
Andrei


Re: Turn BLAS on MacOSX

2014-05-16 Thread neville.lyh
I've had similar problems before and the following sbt option fixed it.

sbt
-J"-Dcom.github.fommil.netlib.BLAS=com.github.fommil.netlib.NativeRefBLAS"
run

Also you might need blas from homebrew.


On Thu, May 15, 2014 at 10:50 AM, Debasish Das [via Apache Spark User List]
 wrote:

> Hi,
>
> How do I load native BLAS libraries on Mac ?
>
> I am getting the following errors while running LR and SVM with SGD:
>
> 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
>
> 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
>
> centos it was fine...but on mac I am getting these warnings..
>
> Also when it fails to run native blas does it use java code for BLAS
> operations ?
>
> May be after addition of breeze, we should add these details on a page as
> well so that users are aware of it before they report any performance
> results..
>
> Thanks.
>
> Deb
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Turn-BLAS-on-MacOSX-tp5755.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click 
> here
> .
> NAML
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Turn-BLAS-on-MacOSX-tp5755p5767.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi Xiangrui,

We're still using Spark 0.9 branch, and our job is submitted by

./bin/spark-class org.apache.spark.deploy.yarn.Client \
  --jar  \
  --class  \
  --args  \
  --num-workers  \
  --master-class 
  --master-memory  \
  --worker-memory  \
  --addJars 


Based on my understanding of the code in yarn-standalone mode, the jar
distributing from local machine to application master is through
distributed cache (using hadoop yarn-client api). From application master
to executors, it's through http server. I maybe wrong, but if you look at
the code in SparkContext addJar method, you can see the jar is added to
http server in yarn-standalone mode.

if (SparkHadoopUtil.get.isYarnMode() && master ==
"yarn-standalone") {
  // In order for this to work in yarn standalone mode the user
must specify the
  // --addjars option to the client to upload the file into the
distributed cache
  // of the AM to make it show up in the current working
directory.
  val fileName = new Path(uri.getPath).getName()
  try {
env.httpFileServer.addJar(new File(fileName))
  } catch {

Those jars will be fetched in Executor from http server and added to
classloader of "Executor" class, see

  private def updateDependencies(newFiles: HashMap[String, Long], newJars:
HashMap[String, Long]) {
synchronized {
  // Fetch missing dependencies
  for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
-1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentFiles(name) = timestamp
  }
  for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
< timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
val url = new File(SparkFiles.getRootDirectory,
localName).toURI.toURL

if (!urlClassLoader.getURLs.contains(url)) {
  urlClassLoader.addURL(url)
}
  }


The problem seems to be that jars are added to the classloader of
"Executor" classes, and they are not accessible in Task.scala.

I verified this by trying to load our custom classes in Executor.scala, and
it works. But if I tried to load those classes in Task.scala, I'll get
classNotFound exception.

Thanks.





Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:

> In SparkContext#addJar, for yarn-standalone mode, the workers should
> get the jars from local distributed cache instead of fetching them
> from the http server. Could you send the command you used to submit
> the job? -Xiangrui
>
> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
> > Hi Xiangrui,
> >
> > I actually used `yarn-standalone`, sorry for misleading. I did debugging
> in
> > the last couple days, and everything up to updateDependency in
> > executor.scala works. I also checked the file size and md5sum in the
> > executors, and they are the same as the one in driver. Gonna do more
> testing
> > tomorrow.
> >
> > Thanks.
> >
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng 
> wrote:
> >>
> >> I don't know whether this would fix the problem. In v0.9, you need
> >> `yarn-standalone` instead of `yarn-cluster`.
> >>
> >> See
> >>
> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
> >>
> >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng 
> wrote:
> >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
> >> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
> >> >
> >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai 
> wrote:
> >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add
> jar
> >> >> dependencies in command line with "--addJars" option. However, those
> >> >> external jars are only available in the driver (application running
> in
> >> >> hadoop), and not available in the executors (workers).
> >> >>
> >> >> After doing some research, we realize that we've to push those jars
> to
> >> >> executors in driver via sc.AddJar(fileName). Although in the driver's
> >> >> log
> >> >> (see the following), the jar is successfully added in the http server
> >> >> in the
> >> >> driver, and I confirm that it's downloadable from any machine in the
> >> >> network, I still get `java.lang.NoClassDefFoundError` in the
> executors.
> >> >>
> >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Adde

  1   2   >