Re: Running time bottleneck on a few worker

2014-08-12 Thread Akhil Das
This happens when you are playing around with sortByKey, mapPartition,
groupBy, reduceByKey like Operations.
One thing you can try is providing the number of partition (possibly
​> ​
2x number of CPUs) while doing these operations.

Thanks
Best Regards


On Wed, Aug 13, 2014 at 7:54 AM, Bin  wrote:

> Hi All,
>
> I met a problem that for each stage, most workers finished fast (around
> 1min), but a few workers spent like 7min to finish, which significantly
> slow down the process.
>
> As shown below, the running time is very unbalancedly distributed over
> workers.
>
> I wonder whether this is normal? Is it related to the partition strategy?
> For now, I used the default partition strategy.
>
> Looking for advice!
>
> Thanks very much!
>
> Best,
> Bin
>
>
>
>


Re: training recsys model

2014-08-12 Thread Xiangrui Meng
You can define an evaluation metric first and then use a grid search
to find the best set of training parameters. Ampcamp has a tutorial
showing how to do this for ALS:
http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
-Xiangrui

On Tue, Aug 12, 2014 at 8:01 PM, Hoai-Thu Vuong  wrote:
> In MLLib, I found the method to train matrix factorization model to predict
> the taste of user. In this function, there are some parameters such as
> lambda, and rank, I can not find the best value to set these parameters and
> how to optimize this value. Could you please give me some recommends?
>
> --
> Thu.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL JDBC

2014-08-12 Thread Cheng Lian
Oh, thanks for reporting this. This should be a bug since SPARK_HIVE was
deprecated, we shouldn’t rely on it any more.
​


On Wed, Aug 13, 2014 at 1:23 PM, ZHENG, Xu-dong  wrote:

> Just find this is because below lines in make_distribution.sh doesn't work:
>
> if [ "$SPARK_HIVE" == "true" ]; then
>   cp "$FWDIR"/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/"
> fi
>
> There is no definition of $SPARK_HIVE in make_distribution.sh. I should
> set it explicitly.
>
>
>
> On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong  wrote:
>
>> Hi Cheng,
>>
>> I also meet some issues when I try to start ThriftServer based a build
>> from master branch (I could successfully run it from the branch-1.0-jdbc
>> branch). Below is my build command:
>>
>> ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
>> -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver
>>
>> And below are the printed errors:
>>
>> ERROR CompositeService: Error starting services HiveServer2
>> org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
>> at
>> org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
>> at
>> org.apache.hive.service.CompositeService.start(CompositeService.java:70)
>> at
>> org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
>> at
>> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
>> at
>> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
>>  at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: javax.jdo.JDOFatalUserException: Class
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
>> NestedThrowables:
>> java.lang.ClassNotFoundException:
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory
>> at
>> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
>> at
>> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
>> at
>> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
>> at
>> org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
>> at
>> org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
>> at
>> org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
>> at
>> org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
>> at
>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>> at
>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>> at
>> org.apache.hadoop.hive.metastore.RetryingRawStore.(RetryingRawStore.java:64)
>> at
>> org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.(HiveMetaStore.java:286)
>> at
>> org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:54)
>> at
>> org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:121)
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:104)
>> at
>> org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
>> ... 11 more
>> Caused by: java.lang.ClassNotFoundException:
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>   

Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
Just find this is because below lines in make_distribution.sh doesn't work:

if [ "$SPARK_HIVE" == "true" ]; then
  cp "$FWDIR"/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/"
fi

There is no definition of $SPARK_HIVE in make_distribution.sh. I should set
it explicitly.



On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong  wrote:

> Hi Cheng,
>
> I also meet some issues when I try to start ThriftServer based a build
> from master branch (I could successfully run it from the branch-1.0-jdbc
> branch). Below is my build command:
>
> ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
> -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver
>
> And below are the printed errors:
>
> ERROR CompositeService: Error starting services HiveServer2
> org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
> at org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
> at
> org.apache.hive.service.CompositeService.start(CompositeService.java:70)
> at
> org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
> at
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
> at
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
>  at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: javax.jdo.JDOFatalUserException: Class
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
> NestedThrowables:
> java.lang.ClassNotFoundException:
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory
> at
> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
> at
> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
> at
> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at
> org.apache.hadoop.hive.metastore.RetryingRawStore.(RetryingRawStore.java:64)
> at
> org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.(HiveMetaStore.java:286)
> at
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:54)
> at
> org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:121)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:104)
> at org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
> ... 11 more
> Caused by: java.lang.ClassNotFoundException:
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
> at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
>

Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
Hi Cheng,

I also meet some issues when I try to start ThriftServer based a build from
master branch (I could successfully run it from the branch-1.0-jdbc
branch). Below is my build command:

./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
-Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver

And below are the printed errors:

ERROR CompositeService: Error starting services HiveServer2
org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
at org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
at
org.apache.hive.service.CompositeService.start(CompositeService.java:70)
at
org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: javax.jdo.JDOFatalUserException: Class
org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
NestedThrowables:
java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.hive.metastore.RetryingRawStore.(RetryingRawStore.java:64)
at
org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.(HiveMetaStore.java:286)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:54)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
at
org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:121)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:104)
at org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
... 11 more
Caused by: java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.forName(JDOHelper.java:2015)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1162)
... 32 more
14/08/13 13:08:48 INFO AbstractService: Service:OperationManager is stopped.
14/08/13 13:08:48 INFO AbstractService: Service:SessionManager is stopped.
14/08/13 13:08:48 INFO AbstractService: Service:CLIService is stopped.
14/08/13 13:08:48 ERROR HiveThriftServer2: Erro

Re: anaconda and spark integration

2014-08-12 Thread Oleg Ruchovets
Hi Nick ,
  Thank you for the link , Do I understand correct that AMI is for cloud
only.
Currently we are NOT on cloud? Is there an option to use anaconda with
spark not on Amazon cloud?

Thanks


On Wed, Aug 13, 2014 at 12:38 AM, Nick Pentreath 
wrote:

> You may want to take a look at this thread:
> http://apache-spark-user-list.1001560.n3.nabble.com/Anaconda-Spark-AMI-td8749.html
>
> there is a Spark/Anaconda AMI that you can spin up with anaconda
> preinstalled.
>
>
>
> On Tue, Aug 12, 2014 at 5:41 PM, Oleg Ruchovets 
> wrote:
>
>> Hello.
>> Is there an integration spark ( pyspark) with anaconda?
>> I googled a lot and didn't find relevant information.
>> Could you please pointing me on tutorial or simple example.
>>
>> Thanks in advance
>> Oleg.
>>
>>
>


Re: how to access workers from spark context

2014-08-12 Thread S. Zhou
actually if you search the spark mail archives you will find many similar 
topics. At this time, I just want to manage it by myself.


On Tuesday, August 12, 2014 8:46 PM, Stanley Shi  wrote:
 


This seems a bug, right? It's not the user's responsibility to manage the 
workers.



On Wed, Aug 13, 2014 at 11:28 AM, S. Zhou  wrote:

Sometimes workers are dead but spark context does not know it and still send 
jobs.
>
>
>
>On Tuesday, August 12, 2014 7:14 PM, Stanley Shi  wrote:
> 
>
>
>Why do you need to detect the worker status in the application? you 
>application generally don't need to know where it is executed.
>
>
>
>On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou  wrote:
>
>I tried to access worker info from spark context but it seems spark context 
>does no expose such API. The reason of doing that is: it seems spark context 
>itself does not have logic to detect if its workers are in dead status. So I 
>like to add such logic by myself. 
>>
>>
>>BTW, it seems spark web UI has some logic of detecting dead workers. But all 
>>relevant classes are declared as private for the spark package scope. 
>>
>>
>>Please let me know how to solve this issue (or if there is an alternative way 
>>to achieve the same purpose)
>>
>>
>>Thanks
>>
>>
>
>
>
>-- 
>
>Regards,
>Stanley Shi,
>
>
>


-- 

Regards,
Stanley Shi,

Re: how to access workers from spark context

2014-08-12 Thread Stanley Shi
This seems a bug, right? It's not the user's responsibility to manage the
workers.


On Wed, Aug 13, 2014 at 11:28 AM, S. Zhou  wrote:

> Sometimes workers are dead but spark context does not know it and still
> send jobs.
>
>
>   On Tuesday, August 12, 2014 7:14 PM, Stanley Shi 
> wrote:
>
>
> Why do you need to detect the worker status in the application? you
> application generally don't need to know where it is executed.
>
>
> On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou  wrote:
>
> I tried to access worker info from spark context but it seems spark
> context does no expose such API. The reason of doing that is: it seems
> spark context itself does not have logic to detect if its workers are in
> dead status. So I like to add such logic by myself.
>
> BTW, it seems spark web UI has some logic of detecting dead workers. But
> all relevant classes are declared as private for the spark package scope.
>
> Please let me know how to solve this issue (or if there is an alternative
> way to achieve the same purpose)
>
> Thanks
>
>
>
>
> --
> Regards,
> *Stanley Shi,*
>
>
>
>


-- 
Regards,
*Stanley Shi,*


Re: how to access workers from spark context

2014-08-12 Thread S. Zhou
Sometimes workers are dead but spark context does not know it and still send 
jobs.


On Tuesday, August 12, 2014 7:14 PM, Stanley Shi  wrote:
 


Why do you need to detect the worker status in the application? you application 
generally don't need to know where it is executed.



On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou  wrote:

I tried to access worker info from spark context but it seems spark context 
does no expose such API. The reason of doing that is: it seems spark context 
itself does not have logic to detect if its workers are in dead status. So I 
like to add such logic by myself. 
>
>
>BTW, it seems spark web UI has some logic of detecting dead workers. But all 
>relevant classes are declared as private for the spark package scope. 
>
>
>Please let me know how to solve this issue (or if there is an alternative way 
>to achieve the same purpose)
>
>
>Thanks
>
>


-- 

Regards,
Stanley Shi,

OutOfMemor​yError when spark streaming receive flume events

2014-08-12 Thread jason chen
I checked javacore file, there is:

Dump Event "systhrow" (0004) Detail "java/lang/OutOfMemoryError" "Java
heap space" received

After checking the failure thread, I found it occur in
SparkFlumeEvent.readExternal() method:
 71 for (i <- 0 until numHeaders) {
 72   val keyLength = in.readInt()
 73   val keyBuff = new Array[Byte](keyLength)

In line 73, it tried to construct an array with a length, this length may
be a big size so are there some limitations on the size of what flume sent,
or some way to adjust the heap size ?  thanks


training recsys model

2014-08-12 Thread Hoai-Thu Vuong
In MLLib, I found the method to train matrix factorization model to predict
the taste of user. In this function, there are some parameters such as
lambda, and rank, I can not find the best value to set these parameters and
how to optimize this value. Could you please give me some recommends?

-- 
Thu.


Re: Mllib : Save SVM model to disk

2014-08-12 Thread Hoai-Thu Vuong
you should try watching this video
https://www.youtube.com/watch?v=sPhyePwo7FA, for more details, please
search in the archives, I've got a same kind of question and other guys
helped me to solve the problem.

On Tue, Aug 12, 2014 at 12:36 PM, XiaoQinyu 
wrote:

> Have you solved this problem??
>
> And could you share how to save model to hdfs and reload it?
>
> Thanks
>
> XiaoQinyu
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Save-SVM-model-to-disk-tp74p11954.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Thu.


Running time bottleneck on a few worker

2014-08-12 Thread Bin
Hi All,


I met a problem that for each stage, most workers finished fast (around 1min), 
but a few workers spent like 7min to finish, which significantly slow down the 
process.


As shown below, the running time is very unbalancedly distributed over workers.


I wonder whether this is normal? Is it related to the partition strategy? For 
now, I used the default partition strategy.


Looking for advice!


Thanks very much!


Best,
Bin



Re: Spark Streaming example on your mesos cluster

2014-08-12 Thread Tobias Pfeiffer
Hi,

On Wed, Aug 13, 2014 at 4:24 AM, Zia Syed  wrote:
>
> I dont particularly see any errors on my logs, either on console, or on
> slaves. I see slave downloads the  spark-1.0.2-bin-hadoop1.tgz file and
> unpacks them as well. Mesos Master shows quiet alot of Tasks created and
> Finished.  I dont see any output on my console of the Word Counts, like
> in get in the Spark version.
>
> Any suggestions/ideas how i can make it work?
>

You have to check the logs on the Mesos slaves in
 /tmp/mesos/slaves/***/frameworks/ -- I guess that you are missing the jar
that your application is packed in.

Tobias


Fwd: Task closures and synchronization

2014-08-12 Thread Tobias Pfeiffer
Uh, for some reason I don't seem to automatically reply to the list any
more.
Here is again my message to Tom.

-- Forwarded message --

Tom,

On Wed, Aug 13, 2014 at 5:35 AM, Tom Vacek  wrote:

> This is a back-to-basics question.  How do we know when Spark will clone
> an object and distribute it with task closures versus synchronize access to
> it.
>
> For example, the old rookie mistake of random number generation:
>
> import scala.util.Random
> val randRDD = sc.parallelize(0 until 1000).map(ii => Random.nextGaussian)
>
> One can check to see that each partition contains a different set of
> random numbers, so the RNG obviously was not cloned, but access was
> synchronized.
>

In this case, Random is a singleton object; Random.nextGaussian is like a
static method of a Java class. The access is not synchronized (unless I
misunderstand "synchronized"), but each Spark worker will use a JVM-local
instance of the Random object. You don't actually close over the Random
object in this case. In fact, this is one way to have node-local state
(e.g., for DB connection pooling).


> However:
>
> val myMap = collection.mutable.Map.empty[Int,Int]
> sc.parallelize(0 until 100).mapPartitions(it => {it.foreach(ii => myMap(ii) = 
> ii); Array(myMap).iterator}).collect
>
>
> This shows that each partition got a copy of the empty map and filled it
> in with its portion of the rdd.
>

In this case, myMap is an instance of the Map class, so it will be
serialized and shipped around. In fact, if you did `val Random = new
scala.util.Random()` in your code above, then this object would also be
serialized and treated just as myMap. (NB. No, it is not. Spark hangs for
me when I do this and doesn't return anything...)

Tobias


Re: spark.files.userClassPathFirst=true Not Working Correctly

2014-08-12 Thread Marcelo Vanzin
Hi, sorry for the delay. Would you have yarn available to test? Given
the discussion in SPARK-2878, this might be a different incarnation of
the same underlying issue.

The option in Yarn is spark.yarn.user.classpath.first

On Mon, Aug 11, 2014 at 1:33 PM, DNoteboom  wrote:
> I'm currently running on my local machine on standalone. The error shows up
> in my code when I am closing resources using the
> TaskContext.addOnCompleteCallBack. However, the cause of this error is
> because of a faulty classLoader which must occur in the Executor in the
> function createClassLoader.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-files-userClassPathFirst-true-Not-Working-Correctly-tp11917p11921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2014-08-12 Thread Jenny Zhao
Hi Yin,

hive-site.xml was copied to spark/conf and the same as the one under
$HIVE_HOME/conf.

through hive cli, I don't see any problem. but for spark on yarn-cluster
mode, I am not able to switch to a database other than the default one, for
Yarn-client mode, it works fine.

Thanks!

Jenny


On Tue, Aug 12, 2014 at 12:53 PM, Yin Huai  wrote:

> Hi Jenny,
>
> Have you copied hive-site.xml to spark/conf directory? If not, can you
> put it in conf/ and try again?
>
> Thanks,
>
> Yin
>
>
> On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao 
> wrote:
>
>>
>> Thanks Yin!
>>
>> here is my hive-site.xml,  which I copied from $HIVE_HOME/conf, didn't
>> experience problem connecting to the metastore through hive. which uses DB2
>> as metastore database.
>>
>> 
>> 
>> 
>> 
>>  
>>   hive.hwi.listen.port
>>   
>>  
>>  
>>   hive.querylog.location
>>   /var/ibm/biginsights/hive/query/${user.name}
>>  
>>  
>>   hive.metastore.warehouse.dir
>>   /biginsights/hive/warehouse
>>  
>>  
>>   hive.hwi.war.file
>>   lib/hive-hwi-0.12.0.war
>>  
>>  
>>   hive.metastore.metrics.enabled
>>   true
>>  
>>  
>>   javax.jdo.option.ConnectionURL
>>   jdbc:db2://hdtest022.svl.ibm.com:50001/BIDB
>>  
>>  
>>   javax.jdo.option.ConnectionDriverName
>>   com.ibm.db2.jcc.DB2Driver
>>  
>>  
>>   hive.stats.autogather
>>   false
>>  
>>  
>>   javax.jdo.mapping.Schema
>>   HIVE
>>  
>>  
>>   javax.jdo.option.ConnectionUserName
>>   catalog
>>  
>>  
>>   javax.jdo.option.ConnectionPassword
>>   V2pJNWMxbFlVbWhaZHowOQ==
>>  
>>  
>>   hive.metastore.password.encrypt
>>   true
>>  
>>  
>>   org.jpox.autoCreateSchema
>>   true
>>  
>>  
>>   hive.server2.thrift.min.worker.threads
>>   5
>>  
>>  
>>   hive.server2.thrift.max.worker.threads
>>   100
>>  
>>  
>>   hive.server2.thrift.port
>>   1
>>  
>>  
>>   hive.server2.thrift.bind.host
>>   hdtest022.svl.ibm.com
>>  
>>  
>>   hive.server2.authentication
>>   CUSTOM
>>  
>>  
>>   hive.server2.custom.authentication.class
>>
>> org.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl
>>  
>>  
>>   hive.server2.enable.impersonation
>>   true
>>  
>>  
>>   hive.security.webconsole.url
>>   http://hdtest022.svl.ibm.com:8080
>>  
>>  
>>   hive.security.authorization.enabled
>>   true
>>  
>>  
>>   hive.security.authorization.createtable.owner.grants
>>   ALL
>>  
>> 
>>
>>
>>
>> On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai  wrote:
>>
>>> Hi Jenny,
>>>
>>> How's your metastore configured for both Hive and Spark SQL? Which
>>> metastore mode are you using (based on
>>> https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
>>> )?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>>
>>> On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao 
>>> wrote:
>>>


 you can reproduce this issue with the following steps (assuming you
 have Yarn cluster + Hive 12):

 1) using hive shell, create a database, e.g: create database ttt

 2) write a simple spark sql program

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.HiveContext

 object HiveSpark {
   case class Record(key: Int, value: String)

   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName("HiveSpark")
 val sc = new SparkContext(sparkConf)

 // A hive context creates an instance of the Hive Metastore in
 process,
 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hql("use ttt")
 hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
 hql("LOAD DATA INPATH '/user/biadmin/kv1.txt' INTO TABLE src")

 // Queries are expressed in HiveQL
 println("Result of 'SELECT *': ")
 hql("SELECT * FROM src").collect.foreach(println)
 sc.stop()
   }
 }
 3) run it in yarn-cluster mode.


 On Mon, Aug 11, 2014 at 9:44 AM, Cheng Lian 
 wrote:

> Since you were using hql(...), it’s probably not related to JDBC
> driver. But I failed to reproduce this issue locally with a single node
> pseudo distributed YARN cluster. Would you mind to elaborate more about
> steps to reproduce this bug? Thanks
> ​
>
>
> On Sun, Aug 10, 2014 at 9:36 PM, Cheng Lian 
> wrote:
>
>> Hi Jenny, does this issue only happen when running Spark SQL with
>> YARN in your environment?
>>
>>
>> On Sat, Aug 9, 2014 at 3:56 AM, Jenny Zhao 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am able to run my hql query on yarn cluster mode when connecting
>>> to the default hive metastore defined in hive-site.xml.
>>>
>>> however, if I want to switch to a different database, like:
>>>
>>>   hql("use other-database")
>>>
>>>
>>> it only works in yarn client mode, but failed on yarn-cluster mode
>>> with the following stack:
>>>
>>> 14/08/08 12:09:11 INFO HiveMetaStore

Re: how to split RDD by key and save to different path

2014-08-12 Thread 诺铁
understand, thank you
small file is a problem, I am considering process data before put them in
hdfs.


On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO  wrote:

> 1. be careful, HDFS are better for large files, not bunches of small files.
>
> 2. if that's really what you want, roll it your own.
>
> def writeLines(iterator: Iterator[(String, String)]) = {
>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
> map
>   try {
>   while (iterator.hasNext) {
> val item = iterator.next()
> val key = item._1
> val line = item._2
> val writer = writers.get(key) match {
>   case Some(writer) => writer
>   case None =>
> val path = arg(1) + key
> val outputStream = FileSystem.get(new Configuration()).create(new 
> Path(path))
> writer = new BufferedWriter(outputStream)
> }
> writer.writeLine(line)
> } finally {
> writers.values.foreach(._close())
> }
> }
>
> val inputData = sc.textFile()
> val keyValue = inputData.map(line => (key, line))
> val partitions = keValue.partitionBy(new MyPartition(10))
> partitions.foreachPartition(writeLines)
>
>
> class MyPartitioner(partitions: Int) extends Partitioner {
> override def numPartitions: Int = partitions
>
> override def getPartition(key: Any): Int = {
> (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
> sure lines with the same key in the same partition
> }
> }
>
>
>
> 2014-08-12 21:34 GMT+08:00 Fengyun RAO :
>
> 1. be careful, HDFS are better for large files, not bunches of small files.
>>
>> 2. if that's really what you want, roll it your own.
>>
>> def writeAvro(iterator: Iterator[(String, String)]) = {
>>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
>> map
>>   try {
>>   while (iterator.hasNext) {
>> val item = iterator.next()
>> val key = item._1
>> val line = item._2
>> val writer = writers.get(key) match {
>>   case Some(writer) => writer
>>   case None =>
>> val path = arg(1) + key
>> val outputStream = FileSystem.get(new Configuration()).create(new 
>> Path(path))
>> writer = new BufferedWriter(outputStream)
>> }
>> writer.writeLine(line)
>> } finally {
>> writers.values.foreach(._close())
>> }
>> }
>>
>> val inputData = sc.textFile()
>> val keyValue = inputData.map(line => (key, line))
>> val partitions = keValue.partitionBy(new MyPartition(10))
>> partitions.foreachPartition(writeLines)
>>
>>
>> class MyPartitioner(partitions: Int) extends Partitioner {
>> override def numPartitions: Int = partitions
>>
>> override def getPartition(key: Any): Int = {
>> (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
>> sure lines with the same key in the same partition
>>
>> }
>> }
>>
>>
>>
>> 2014-08-11 20:42 GMT+08:00 诺铁 :
>>
>> hi,
>>>
>>> I have googled and find similar question without good answer,
>>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
>>>
>>> in short, I would like to separate raw data and divide by some key, for
>>> example, create date, and put the in directory named by date, so that I can
>>> easily access portion of data later.
>>>
>>>  for now I have to extract all keys and then filter by key and save to
>>> file repeatly. are there any good way to do this?  or maybe I shouldn't do
>>> such thing?
>>>
>>
>>
>


how to access workers from spark context

2014-08-12 Thread S. Zhou
I tried to access worker info from spark context but it seems spark context 
does no expose such API. The reason of doing that is: it seems spark context 
itself does not have logic to detect if its workers are in dead status. So I 
like to add such logic by myself. 

BTW, it seems spark web UI has some logic of detecting dead workers. But all 
relevant classes are declared as private for the spark package scope. 

Please let me know how to solve this issue (or if there is an alternative way 
to achieve the same purpose)

Thanks


Task closures and synchronization

2014-08-12 Thread Tom Vacek
This is a back-to-basics question.  How do we know when Spark will clone an
object and distribute it with task closures versus synchronize access to it.

For example, the old rookie mistake of random number generation:

import scala.util.Random
val randRDD = sc.parallelize(0 until 1000).map(ii => Random.nextGaussian)

One can check to see that each partition contains a different set of random
numbers, so the RNG obviously was not cloned, but access was synchronized.
However:

val myMap = collection.mutable.Map.empty[Int,Int]
sc.parallelize(0 until 100).mapPartitions(it => {it.foreach(ii =>
myMap(ii) = ii); Array(myMap).iterator}).collect


This shows that each partition got a copy of the empty map and filled it in
with its portion of the rdd.


Re: DistCP - Spark-based

2014-08-12 Thread Matei Zaharia
Good question; I don't know of one but I believe people at Cloudera had some 
thoughts of porting Sqoop to Spark in the future, and maybe they'd consider 
DistCP as part of this effort. I agree it's missing right now.

Matei

On August 12, 2014 at 11:04:28 AM, Gary Malouf (malouf.g...@gmail.com) wrote:

We are probably still the minority, but our analytics platform based on Spark + 
HDFS does not have map/reduce installed.  I'm wondering if there is a distcp 
equivalent that leverages Spark to do the work.

Our team is trying to find the best way to do cross-datacenter replication of 
our HDFS data to minimize the impact of outages/dc failure.  

Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2014-08-12 Thread Yin Huai
Hi Jenny,

Have you copied hive-site.xml to spark/conf directory? If not, can you put
it in conf/ and try again?

Thanks,

Yin


On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao  wrote:

>
> Thanks Yin!
>
> here is my hive-site.xml,  which I copied from $HIVE_HOME/conf, didn't
> experience problem connecting to the metastore through hive. which uses DB2
> as metastore database.
>
> 
> 
> 
> 
>  
>   hive.hwi.listen.port
>   
>  
>  
>   hive.querylog.location
>   /var/ibm/biginsights/hive/query/${user.name}
>  
>  
>   hive.metastore.warehouse.dir
>   /biginsights/hive/warehouse
>  
>  
>   hive.hwi.war.file
>   lib/hive-hwi-0.12.0.war
>  
>  
>   hive.metastore.metrics.enabled
>   true
>  
>  
>   javax.jdo.option.ConnectionURL
>   jdbc:db2://hdtest022.svl.ibm.com:50001/BIDB
>  
>  
>   javax.jdo.option.ConnectionDriverName
>   com.ibm.db2.jcc.DB2Driver
>  
>  
>   hive.stats.autogather
>   false
>  
>  
>   javax.jdo.mapping.Schema
>   HIVE
>  
>  
>   javax.jdo.option.ConnectionUserName
>   catalog
>  
>  
>   javax.jdo.option.ConnectionPassword
>   V2pJNWMxbFlVbWhaZHowOQ==
>  
>  
>   hive.metastore.password.encrypt
>   true
>  
>  
>   org.jpox.autoCreateSchema
>   true
>  
>  
>   hive.server2.thrift.min.worker.threads
>   5
>  
>  
>   hive.server2.thrift.max.worker.threads
>   100
>  
>  
>   hive.server2.thrift.port
>   1
>  
>  
>   hive.server2.thrift.bind.host
>   hdtest022.svl.ibm.com
>  
>  
>   hive.server2.authentication
>   CUSTOM
>  
>  
>   hive.server2.custom.authentication.class
>
> org.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl
>  
>  
>   hive.server2.enable.impersonation
>   true
>  
>  
>   hive.security.webconsole.url
>   http://hdtest022.svl.ibm.com:8080
>  
>  
>   hive.security.authorization.enabled
>   true
>  
>  
>   hive.security.authorization.createtable.owner.grants
>   ALL
>  
> 
>
>
>
> On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai  wrote:
>
>> Hi Jenny,
>>
>> How's your metastore configured for both Hive and Spark SQL? Which
>> metastore mode are you using (based on
>> https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
>> )?
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao 
>> wrote:
>>
>>>
>>>
>>> you can reproduce this issue with the following steps (assuming you have
>>> Yarn cluster + Hive 12):
>>>
>>> 1) using hive shell, create a database, e.g: create database ttt
>>>
>>> 2) write a simple spark sql program
>>>
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import org.apache.spark.sql._
>>> import org.apache.spark.sql.hive.HiveContext
>>>
>>> object HiveSpark {
>>>   case class Record(key: Int, value: String)
>>>
>>>   def main(args: Array[String]) {
>>> val sparkConf = new SparkConf().setAppName("HiveSpark")
>>> val sc = new SparkContext(sparkConf)
>>>
>>> // A hive context creates an instance of the Hive Metastore in
>>> process,
>>> val hiveContext = new HiveContext(sc)
>>> import hiveContext._
>>>
>>> hql("use ttt")
>>> hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>> hql("LOAD DATA INPATH '/user/biadmin/kv1.txt' INTO TABLE src")
>>>
>>> // Queries are expressed in HiveQL
>>> println("Result of 'SELECT *': ")
>>> hql("SELECT * FROM src").collect.foreach(println)
>>> sc.stop()
>>>   }
>>> }
>>> 3) run it in yarn-cluster mode.
>>>
>>>
>>> On Mon, Aug 11, 2014 at 9:44 AM, Cheng Lian 
>>> wrote:
>>>
 Since you were using hql(...), it’s probably not related to JDBC
 driver. But I failed to reproduce this issue locally with a single node
 pseudo distributed YARN cluster. Would you mind to elaborate more about
 steps to reproduce this bug? Thanks
 ​


 On Sun, Aug 10, 2014 at 9:36 PM, Cheng Lian 
 wrote:

> Hi Jenny, does this issue only happen when running Spark SQL with YARN
> in your environment?
>
>
> On Sat, Aug 9, 2014 at 3:56 AM, Jenny Zhao 
> wrote:
>
>>
>> Hi,
>>
>> I am able to run my hql query on yarn cluster mode when connecting to
>> the default hive metastore defined in hive-site.xml.
>>
>> however, if I want to switch to a different database, like:
>>
>>   hql("use other-database")
>>
>>
>> it only works in yarn client mode, but failed on yarn-cluster mode
>> with the following stack:
>>
>> 14/08/08 12:09:11 INFO HiveMetaStore: 0: get_database: tt
>> 14/08/08 12:09:11 INFO audit: ugi=biadminip=unknown-ip-addr  
>> cmd=get_database: tt
>> 14/08/08 12:09:11 ERROR RetryingHMSHandler: 
>> NoSuchObjectException(message:There is no database named tt)
>>  at 
>> org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:431)
>>  at 
>> org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:441)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethod

Spark Streaming example on your mesos cluster

2014-08-12 Thread Zia Syed
Hi,

I'm trying to run streaming.NetworkWordCount example on the Mesos Cluster
(0.19.1). I am able to run SparkPi examples on my mesos cluster, can run
the streaming example in local[n] mode, but no luck so far with the Spark
streaming examples running my mesos cluster.

I dont particularly see any errors on my logs, either on console, or on
slaves. I see slave downloads the  spark-1.0.2-bin-hadoop1.tgz file and
unpacks them as well. Mesos Master shows quiet alot of Tasks created and
Finished.  I dont see any output on my console of the Word Counts, like in
get in the Spark version.

Any suggestions/ideas how i can make it work?

Thanks,
Zia


Re: Spark Hbase job taking long time

2014-08-12 Thread Amit Singh Hora
Hi ,
Today i created a table with 3 regions and 2 jobtrackers but still the
spark job is taking lot of time
I also noticed one thing that is the memory of client was increasing
linearly is it like spark job was first bringing the complete data in
memory?


On Thu, Aug 7, 2014 at 7:31 PM, Ted Yu [via Apache Spark User List] <
ml-node+s1001560n11651...@n3.nabble.com> wrote:

> Forgot to include user@
>
> Another email from Amit indicated that there is 1 region in his table.
> This wouldn't give you the benefit TableInputFormat is expected to deliver.
>
> Please split your table into multiple regions.
>
> See http://hbase.apache.org/book.html#d3593e6847 and related links.
>
> Cheers
>
>
> On Wed, Aug 6, 2014 at 6:41 AM, Ted Yu <[hidden email]
> > wrote:
>
>> Can you try specifying some value (100, e.g.) for
>> "hbase.mapreduce.scan.cachedrows" in your conf ?
>>
>> bq.  table contains 10lakh rows
>>
>> How many rows are there in the table ?
>>
>> nit: Example uses classOf[TableInputFormat] instead of
>> TableInputFormat.class.
>>
>> Cheers
>>
>>
>> On Wed, Aug 6, 2014 at 5:54 AM, Amit Singh Hora <[hidden email]
>> > wrote:
>>
>>> Hi All,
>>>
>>> I am trying to run a SQL query on HBase using spark job ,till now i am
>>> able
>>> to get the desierd results but as the data set size increases Spark job
>>> is
>>> taking a long time
>>> I believe i am doing something wrong,as after going through documentation
>>> and videos discussing on  spark performance  it should not take more then
>>> couple of seconds.
>>>
>>> PFB code snippet
>>> HBase table contains 10lakh rows
>>>
>>> JavaPairRDD pairRdd = ctx
>>> .newAPIHadoopRDD(conf,
>>> TableInputFormat.class,
>>>
>>> ImmutableBytesWritable.class,
>>>
>>> org.apache.hadoop.hbase.client.Result.class).cache();
>>>
>>> JavaRDD people = pairRdd
>>> .map(new
>>> Function, Person>() {
>>>
>>> public Person
>>> call(Tuple2 v1)
>>> throws Exception
>>> {
>>>
>>> System.out.println("comming");
>>> Person person = new
>>> Person();
>>> String
>>> key=Bytes.toString(v1._2.getRow());
>>>
>>> key=key.substring(0,key.lastIndexOf("_"));
>>>
>>> person.setCalling(Long.parseLong(key));
>>>
>>> person.setCalled(Bytes.toLong(v1._2.getValue(
>>>
>>> Bytes.toBytes("si"), Bytes.toBytes("called";
>>>
>>> person.setTime(Bytes.toLong(v1._2.getValue(
>>>
>>> Bytes.toBytes("si"), Bytes.toBytes("at";
>>>
>>> return person;
>>> }
>>> });
>>> JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
>>> schemaPeople.registerAsTable("people");
>>>
>>> // SQL can be run over RDDs that have been registered as
>>> tables.
>>> JavaSchemaRDD teenagers = sqlCtx
>>> .sql("SELECT count(*) from people group
>>> by calling");
>>> teenagers.printSchema();
>>>
>>>
>>> I am running spark using start-all.sh script with 2 workers
>>>
>>> Any pointers will be of a great help
>>> Regards,
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: [hidden email]
>>> 
>>> For additional commands, e-mail: [hidden email]
>>> 
>>>
>>>
>>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541p11651.html
>  To unsubscribe from Spark Hbase job taking long time, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.n

Jobs get stuck at reduceByKey stage with spark 1.0.1

2014-08-12 Thread Shivani Rao
Hello spark aficionados,

We upgraded from spark 1.0.0 to 1.0.1 when the new release came out and
started noticing some weird errors. Even a simple operation like
"reduceByKey" or "count" on an RDD gets stuck in "cluster mode". This issue
does not occur with spark 1.0.0 (in cluster or local mode)  or spark 1.0.2
(in cluster or local mode) or with spark 1.0.1 (in local mode).  I looked
at the spark release notes and it did not seem like the "gigantic task
size" issue is a problem because we have tons of resources on this cluster.

Has anyone else encountered this issue before?

Thanks in advance for your help,
Shivani


-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
That should work. Gonna give it a try. Thanks !


On Tue, Aug 12, 2014 at 11:01 AM, Marcelo Vanzin 
wrote:

> You could create a copy of the variable inside your "Parse" class;
> that way it would be serialized with the instance you create when
> calling map() below.
>
> On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri 
> wrote:
> > Are there any other workarounds that could be used to pass in the values
> > from someVariable to the transformation function ?
> >
> >
> > On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen  wrote:
> >>
> >> I don't think static members are going to be serialized in the
> >> closure? the instance of Parse will be looking at its local
> >> SampleOuterClass, which is maybe not initialized on the remote JVM.
> >>
> >> On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri 
> >> wrote:
> >> > I have a class defining an inner static class (map function). The
> inner
> >> > class tries to refer the variable instantiated in the outside class,
> >> > which
> >> > results in a NullPointerException. Sample Code as follows:
> >> >
> >> > class SampleOuterClass {
> >> >
> >> >  private static ArrayList someVariable;
> >> >
> >> >  SampleOuterClass() {
> >> >// initialize someVariable
> >> >  }
> >> >
> >> > public static class Parse implements Function<...> {
> >> >   public TypeReturn call (...) {
> >> >   // Try using someVariable: Raises
> NullPointerException
> >> >   }
> >> > }
> >> >
> >> > public void run() {
> >> > RDD<> rdd = data.map(new Parse()).rdd()
> >> > }
> >> > }
> >> >
> >> > Am I missing something with how Closures work with Spark or something
> >> > else
> >> > is wrong ?
> >> >
> >> > Thanks
> >> > Sunny
> >> >
> >> >
> >
> >
>
>
>
> --
> Marcelo
>


DistCP - Spark-based

2014-08-12 Thread Gary Malouf
We are probably still the minority, but our analytics platform based on
Spark + HDFS does not have map/reduce installed.  I'm wondering if there is
a distcp equivalent that leverages Spark to do the work.

Our team is trying to find the best way to do cross-datacenter replication
of our HDFS data to minimize the impact of outages/dc failure.


Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Marcelo Vanzin
You could create a copy of the variable inside your "Parse" class;
that way it would be serialized with the instance you create when
calling map() below.

On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri  wrote:
> Are there any other workarounds that could be used to pass in the values
> from someVariable to the transformation function ?
>
>
> On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen  wrote:
>>
>> I don't think static members are going to be serialized in the
>> closure? the instance of Parse will be looking at its local
>> SampleOuterClass, which is maybe not initialized on the remote JVM.
>>
>> On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri 
>> wrote:
>> > I have a class defining an inner static class (map function). The inner
>> > class tries to refer the variable instantiated in the outside class,
>> > which
>> > results in a NullPointerException. Sample Code as follows:
>> >
>> > class SampleOuterClass {
>> >
>> >  private static ArrayList someVariable;
>> >
>> >  SampleOuterClass() {
>> >// initialize someVariable
>> >  }
>> >
>> > public static class Parse implements Function<...> {
>> >   public TypeReturn call (...) {
>> >   // Try using someVariable: Raises NullPointerException
>> >   }
>> > }
>> >
>> > public void run() {
>> > RDD<> rdd = data.map(new Parse()).rdd()
>> > }
>> > }
>> >
>> > Am I missing something with how Closures work with Spark or something
>> > else
>> > is wrong ?
>> >
>> > Thanks
>> > Sunny
>> >
>> >
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
Are there any other workarounds that could be used to pass in the values
from *someVariable *to the transformation function ?


On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen  wrote:

> I don't think static members are going to be serialized in the
> closure? the instance of Parse will be looking at its local
> SampleOuterClass, which is maybe not initialized on the remote JVM.
>
> On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri 
> wrote:
> > I have a class defining an inner static class (map function). The inner
> > class tries to refer the variable instantiated in the outside class,
> which
> > results in a NullPointerException. Sample Code as follows:
> >
> > class SampleOuterClass {
> >
> >  private static ArrayList someVariable;
> >
> >  SampleOuterClass() {
> >// initialize someVariable
> >  }
> >
> > public static class Parse implements Function<...> {
> >   public TypeReturn call (...) {
> >   // Try using someVariable: Raises NullPointerException
> >   }
> > }
> >
> > public void run() {
> > RDD<> rdd = data.map(new Parse()).rdd()
> > }
> > }
> >
> > Am I missing something with how Closures work with Spark or something
> else
> > is wrong ?
> >
> > Thanks
> > Sunny
> >
> >
>


Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sean Owen
I don't think static members are going to be serialized in the
closure? the instance of Parse will be looking at its local
SampleOuterClass, which is maybe not initialized on the remote JVM.

On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri  wrote:
> I have a class defining an inner static class (map function). The inner
> class tries to refer the variable instantiated in the outside class, which
> results in a NullPointerException. Sample Code as follows:
>
> class SampleOuterClass {
>
>  private static ArrayList someVariable;
>
>  SampleOuterClass() {
>// initialize someVariable
>  }
>
> public static class Parse implements Function<...> {
>   public TypeReturn call (...) {
>   // Try using someVariable: Raises NullPointerException
>   }
> }
>
> public void run() {
> RDD<> rdd = data.map(new Parse()).rdd()
> }
> }
>
> Am I missing something with how Closures work with Spark or something else
> is wrong ?
>
> Thanks
> Sunny
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
I have a class defining an inner static class (map function). The inner
class tries to refer the variable instantiated in the outside class, which
results in a NullPointerException. Sample Code as follows:

class SampleOuterClass {

 private static ArrayList someVariable;

 SampleOuterClass() {
   // initialize someVariable
 }

public static class Parse implements Function<...> {
  public TypeReturn call (...) {
  // Try using someVariable: *Raises NullPointerException*
  }
}

public void run() {
RDD<> rdd = data.map(new Parse()).rdd()
}
}

Am I missing something with how Closures work with Spark or something else
is wrong ?

Thanks
Sunny


Re: Spark SQL JDBC

2014-08-12 Thread Michael Armbrust
Hive pulls in a ton of dependencies that we were afraid would break
existing spark applications.  For this reason all hive submodules are
optional.


On Tue, Aug 12, 2014 at 7:43 AM, John Omernik  wrote:

> Yin helped me with that, and I appreciate the onlist followup.  A few
> questions: Why is this the case?  I guess, does building it with
> thriftserver add much more time/size to the final build? It seems that
> unless documented well, people will miss that and this situation would
> occur, why would we not just build the thrift server in? (I am not a
> programming expert, and not trying to judge the decision to have it in a
> separate profile, I would just like to understand why it'd done that way)
>
>
>
>
> On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian 
> wrote:
>
>> Hi John, the JDBC Thrift server resides in its own build profile and need
>> to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly.
>> ​
>>
>>
>> On Tue, Aug 5, 2014 at 4:54 AM, John Omernik  wrote:
>>
>>> I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar
>>> with the JDBC thrift server.  I have everything compiled correctly, I can
>>> access data in spark-shell on yarn from my hive installation. Cached
>>> tables, etc all work.
>>>
>>> When I execute ./sbin/start-thriftserver.sh
>>>
>>> I get the error below. Shouldn't it just ready my spark-env? I guess I
>>> am lost on how to make this work.
>>>
>>> Thanks1
>>>
>>> $ ./start-thriftserver.sh
>>>
>>>
>>> Spark assembly has been built with Hive, including Datanucleus jars on
>>> classpath
>>>
>>> Exception in thread "main" java.lang.ClassNotFoundException:
>>> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:270)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>
>>
>


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread Xiangrui Meng
Sorry, I missed #2. My suggestion is the same as #2. You need to set a
bigger numPartitions to avoid hitting integer bound or 2G limitation,
at the cost of increased shuffle size per iteration. If you use a
CombineInputFormat and then cache, it will try to give you roughly the
same size per partition. There will be some remote fetches from HDFS
but still cheaper than calling RDD.repartition().

For coalesce without shuffle, I don't know how to set the right number
of partitions either ...

-Xiangrui

On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong  wrote:
> Hi Xiangrui,
>
> Thanks for your reply!
>
> Yes, our data is very sparse, but RDD.repartition invoke
> RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
> the same effect with #2, right?
>
> For CombineInputFormat, although I haven't tried it, but it sounds that it
> will combine multiple partitions into a large partition if I cache it, so
> same issues as #1?
>
> For coalesce, could you share some best practice how to set the right number
> of partitions to avoid locality problem?
>
> Thanks!
>
>
>
> On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng  wrote:
>>
>> Assuming that your data is very sparse, I would recommend
>> RDD.repartition. But if it is not the case and you don't want to
>> shuffle the data, you can try a CombineInputFormat and then parse the
>> lines into labeled points. Coalesce may cause locality problems if you
>> didn't use the right number of partitions. -Xiangrui
>>
>> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong 
>> wrote:
>> > I think this has the same effect and issue with #1, right?
>> >
>> >
>> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen 
>> > wrote:
>> >>
>> >> How about increase HDFS file extent size? like current value is 128M,
>> >> we
>> >> make it 512M or bigger.
>> >>
>> >>
>> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong 
>> >> wrote:
>> >>>
>> >>> Hi all,
>> >>>
>> >>> We are trying to use Spark MLlib to train super large data (100M
>> >>> features
>> >>> and 5B rows). The input data in HDFS has ~26K partitions. By default,
>> >>> MLlib
>> >>> will create a task for every partition at each iteration. But because
>> >>> our
>> >>> dimensions are also very high, such large number of tasks will
>> >>> increase
>> >>> large network overhead to transfer the weight vector. So we want to
>> >>> reduce
>> >>> the number of tasks, we tried below ways:
>> >>>
>> >>> 1. Coalesce partitions without shuffling, then cache.
>> >>>
>> >>> data.coalesce(numPartitions).cache()
>> >>>
>> >>> This works fine for relative small data, but when data is increasing
>> >>> and
>> >>> numPartitions is fixed, the size of one partition will be large. This
>> >>> introduces two issues: the first is, the larger partition will need
>> >>> larger
>> >>> object and more memory at runtime, and trigger GC more frequently; the
>> >>> second is, we meet the issue 'size exceeds integer.max_value' error,
>> >>> which
>> >>> seems be caused by the size of one partition larger than 2G
>> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
>> >>>
>> >>> 2. Coalesce partitions with shuffling, then cache.
>> >>>
>> >>> data.coalesce(numPartitions, true).cache()
>> >>>
>> >>> It could mitigate the second issue in #1 at some degree, but fist
>> >>> issue
>> >>> is still there, and it also will introduce large amount of shullfling.
>> >>>
>> >>> 3. Cache data first, and coalesce partitions.
>> >>>
>> >>> data.cache().coalesce(numPartitions)
>> >>>
>> >>> In this way, the number of cached partitions is not change, but each
>> >>> task
>> >>> read the data from multiple partitions. However, I find the task will
>> >>> loss
>> >>> locality by this way. I find a lot of 'ANY' tasks, that means that
>> >>> tasks
>> >>> read data from other nodes, and become slower than that read data from
>> >>> local
>> >>> memory.
>> >>>
>> >>> I think the best way should like #3, but leverage locality as more as
>> >>> possible. Is there any way to do that? Any suggestions?
>> >>>
>> >>> Thanks!
>> >>>
>> >>> --
>> >>> ZHENG, Xu-dong
>> >>>
>> >>
>> >
>> >
>> >
>> > --
>> > 郑旭东
>> > ZHENG, Xu-dong
>> >
>
>
>
>
> --
> 郑旭东
> ZHENG, Xu-dong
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Benchmark on physical Spark cluster

2014-08-12 Thread Mozumder, Monir
An on-list follow up: http://prof.ict.ac.cn/BigDataBench/#Benchmarks looks 
promising as it has spark as one of the platforms used.

Bests,
-Monir


From: Mozumder, Monir
Sent: Monday, August 11, 2014 7:18 PM
To: user@spark.apache.org
Subject: Benchmark on physical Spark cluster

I am trying to get some workloads or benchmarks for running on a physical spark 
cluster and find relative speedups on different physical clusters.

The instructions at 
https://databricks.com/blog/2014/02/12/big-data-benchmark.html uses Amazon EC2. 
I was wondering if anyone got other benchmarks for spark on physical clusters. 
Hoping to get a CloudSuite like suite for Spark.

Bests,
-Monir


Re: anaconda and spark integration

2014-08-12 Thread Oleg Ruchovets
Hello.
Is there an integration spark ( pyspark) with anaconda?
I googled a lot and didn't find relevant information.
Could you please pointing me on tutorial or simple example.

Thanks in advance
Oleg.


Re: saveAsTextFiles file not found exception

2014-08-12 Thread Chen Song
Thanks for putting this together, Andrew.


On Tue, Aug 12, 2014 at 2:11 AM, Andrew Ash  wrote:

> Hi Chen,
>
> Please see the bug I filed at
> https://issues.apache.org/jira/browse/SPARK-2984 with the
> FileNotFoundException on _temporary directory issue.
>
> Andrew
>
>
> On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash  wrote:
>
>> Not sure which stalled HDFS client issue your'e referring to, but there
>> was one fixed in Spark 1.0.2 that could help you out --
>> https://github.com/apache/spark/pull/1409.  I've still seen one related
>> to Configuration objects not being threadsafe though so you'd still need to
>> keep speculation on to fix that (SPARK-2546)
>>
>> As it stands now, I can:
>>
>> A) have speculation off, in which case I get random hangs for a variety
>> of reasons (your HDFS stall, my Configuration safety issue)
>>
>> or
>>
>> B) have speculation on, in which case I get random failures related to
>> LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions.
>>
>>
>> Kind of a catch-22 -- there's no reliable way to run large jobs on Spark
>> right now!
>>
>> I'm going to file a bug for the _temporary and LeaseExpiredExceptions as
>> I think these are widespread enough that we need a place to track a
>> resolution.
>>
>>
>> On Mon, Aug 11, 2014 at 9:08 AM, Chen Song 
>> wrote:
>>
>>> Andrew that is a good finding.
>>>
>>> Yes, I have speculative execution turned on, becauseI saw tasks stalled
>>> on HDFS client.
>>>
>>> If I turned off speculative execution, is there a way to circumvent the
>>> hanging task issue?
>>>
>>>
>>>
>>> On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash 
>>> wrote:
>>>
 I've also been seeing similar stacktraces on Spark core (not streaming)
 and have a theory it's related to spark.speculation being turned on.  Do
 you have that enabled by chance?


 On Mon, Aug 11, 2014 at 8:10 AM, Chen Song 
 wrote:

> Bill
>
> Did you get this resolved somehow? Anyone has any insight into this
> problem?
>
> Chen
>
>
> On Mon, Aug 11, 2014 at 10:30 AM, Chen Song 
> wrote:
>
>> The exception was thrown out in application master(spark streaming
>> driver) and the job shut down after this exception.
>>
>>
>> On Mon, Aug 11, 2014 at 10:29 AM, Chen Song 
>> wrote:
>>
>>> I got the same exception after the streaming job runs for a while,
>>> The ERROR message was complaining about a temp file not being found in 
>>> the
>>> output folder.
>>>
>>> 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming
>>> job 140774430 ms.0
>>> java.io.FileNotFoundException: File
>>> hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
>>> does not exist.
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
>>> at
>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
>>> at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>>> at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>> at
>>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>>> at
>>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
>>> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at scala.util.T

Re: Spark SQL JDBC

2014-08-12 Thread John Omernik
Yin helped me with that, and I appreciate the onlist followup.  A few
questions: Why is this the case?  I guess, does building it with
thriftserver add much more time/size to the final build? It seems that
unless documented well, people will miss that and this situation would
occur, why would we not just build the thrift server in? (I am not a
programming expert, and not trying to judge the decision to have it in a
separate profile, I would just like to understand why it'd done that way)




On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian  wrote:

> Hi John, the JDBC Thrift server resides in its own build profile and need
> to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly.
> ​
>
>
> On Tue, Aug 5, 2014 at 4:54 AM, John Omernik  wrote:
>
>> I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar with
>> the JDBC thrift server.  I have everything compiled correctly, I can access
>> data in spark-shell on yarn from my hive installation. Cached tables, etc
>> all work.
>>
>> When I execute ./sbin/start-thriftserver.sh
>>
>> I get the error below. Shouldn't it just ready my spark-env? I guess I am
>> lost on how to make this work.
>>
>> Thanks1
>>
>> $ ./start-thriftserver.sh
>>
>>
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>>
>> Exception in thread "main" java.lang.ClassNotFoundException:
>> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:270)
>>
>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>
>


Fwd: how to split RDD by key and save to different path

2014-08-12 Thread Fengyun RAO
1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
val item = iterator.next()
val key = item._1
val line = item._2
val writer = writers.get(key) match {
  case Some(writer) => writer
  case None =>
val path = arg(1) + key
val outputStream = FileSystem.get(new
Configuration()).create(new Path(path))
writer = new BufferedWriter(outputStream)
}
writer.writeLine(line)
} finally {
writers.values.foreach(._close())
}
}

val inputData = sc.textFile()
val keyValue = inputData.map(line => (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions

override def getPartition(key: Any): Int = {
(key.toString.hashCode & Integer.MAX_VALUE) % numPartitions //
make sure lines with the same key in the same partition
}
}



2014-08-12 21:34 GMT+08:00 Fengyun RAO :

> 1. be careful, HDFS are better for large files, not bunches of small files.
>
> 2. if that's really what you want, roll it your own.
>
> def writeAvro(iterator: Iterator[(String, String)]) = {
>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
> map
>   try {
>   while (iterator.hasNext) {
> val item = iterator.next()
> val key = item._1
> val line = item._2
> val writer = writers.get(key) match {
>   case Some(writer) => writer
>   case None =>
> val path = arg(1) + key
> val outputStream = FileSystem.get(new Configuration()).create(new 
> Path(path))
> writer = new BufferedWriter(outputStream)
> }
> writer.writeLine(line)
> } finally {
> writers.values.foreach(._close())
> }
> }
>
> val inputData = sc.textFile()
> val keyValue = inputData.map(line => (key, line))
> val partitions = keValue.partitionBy(new MyPartition(10))
> partitions.foreachPartition(writeLines)
>
>
> class MyPartitioner(partitions: Int) extends Partitioner {
> override def numPartitions: Int = partitions
>
> override def getPartition(key: Any): Int = {
> (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
> sure lines with the same key in the same partition
>
> }
> }
>
>
>
> 2014-08-11 20:42 GMT+08:00 诺铁 :
>
> hi,
>>
>> I have googled and find similar question without good answer,
>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
>>
>> in short, I would like to separate raw data and divide by some key, for
>> example, create date, and put the in directory named by date, so that I can
>> easily access portion of data later.
>>
>>  for now I have to extract all keys and then filter by key and save to
>> file repeatly. are there any good way to do this?  or maybe I shouldn't do
>> such thing?
>>
>
>


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread ZHENG, Xu-dong
Hi Xiangrui,

Thanks for your reply!

Yes, our data is very sparse, but RDD.repartition invoke
RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
the same effect with #2, right?

For CombineInputFormat, although I haven't tried it, but it sounds that it
will combine multiple partitions into a large partition if I cache it, so
same issues as #1?

For coalesce, could you share some best practice how to set the right
number of partitions to avoid locality problem?

Thanks!



On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng  wrote:

> Assuming that your data is very sparse, I would recommend
> RDD.repartition. But if it is not the case and you don't want to
> shuffle the data, you can try a CombineInputFormat and then parse the
> lines into labeled points. Coalesce may cause locality problems if you
> didn't use the right number of partitions. -Xiangrui
>
> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong 
> wrote:
> > I think this has the same effect and issue with #1, right?
> >
> >
> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen 
> > wrote:
> >>
> >> How about increase HDFS file extent size? like current value is 128M, we
> >> make it 512M or bigger.
> >>
> >>
> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong 
> >> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We are trying to use Spark MLlib to train super large data (100M
> features
> >>> and 5B rows). The input data in HDFS has ~26K partitions. By default,
> MLlib
> >>> will create a task for every partition at each iteration. But because
> our
> >>> dimensions are also very high, such large number of tasks will increase
> >>> large network overhead to transfer the weight vector. So we want to
> reduce
> >>> the number of tasks, we tried below ways:
> >>>
> >>> 1. Coalesce partitions without shuffling, then cache.
> >>>
> >>> data.coalesce(numPartitions).cache()
> >>>
> >>> This works fine for relative small data, but when data is increasing
> and
> >>> numPartitions is fixed, the size of one partition will be large. This
> >>> introduces two issues: the first is, the larger partition will need
> larger
> >>> object and more memory at runtime, and trigger GC more frequently; the
> >>> second is, we meet the issue 'size exceeds integer.max_value' error,
> which
> >>> seems be caused by the size of one partition larger than 2G
> >>> (https://issues.apache.org/jira/browse/SPARK-1391).
> >>>
> >>> 2. Coalesce partitions with shuffling, then cache.
> >>>
> >>> data.coalesce(numPartitions, true).cache()
> >>>
> >>> It could mitigate the second issue in #1 at some degree, but fist issue
> >>> is still there, and it also will introduce large amount of shullfling.
> >>>
> >>> 3. Cache data first, and coalesce partitions.
> >>>
> >>> data.cache().coalesce(numPartitions)
> >>>
> >>> In this way, the number of cached partitions is not change, but each
> task
> >>> read the data from multiple partitions. However, I find the task will
> loss
> >>> locality by this way. I find a lot of 'ANY' tasks, that means that
> tasks
> >>> read data from other nodes, and become slower than that read data from
> local
> >>> memory.
> >>>
> >>> I think the best way should like #3, but leverage locality as more as
> >>> possible. Is there any way to do that? Any suggestions?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>> ZHENG, Xu-dong
> >>>
> >>
> >
> >
> >
> > --
> > 郑旭东
> > ZHENG, Xu-dong
> >
>



-- 
郑旭东
ZHENG, Xu-dong


LDA in MLBase

2014-08-12 Thread Aslan Bekirov
Hi All,

I have a question regarding LDA topic modeling. I need to do topic modeling
on ad data.

Does MLBase supports LDA topic modeling?

Or any stable, tested LDA implementation on Spark?

BR,
Aslan


Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread randylu
hi, TD. Thanks very much! I got it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: KMeans - java.lang.IllegalArgumentException: requirement failed

2014-08-12 Thread Ge, Yao (Y.)
I figured it out. My indices parameters for the sparse vector are messed up. It 
is a good learning for me:
When use the Vectors.sparse(int size, int[] indices, double[] values) to 
generate a vector, size is the size of the whole vector, not just the size of 
the elements with value. The indices array will need to be in ascending order. 
In many cases, it probably easier to use other two forms of Vectors.sparse 
functions if the indices and value positions are not naturally sorted.

-Yao


From: Ge, Yao (Y.)
Sent: Monday, August 11, 2014 11:44 PM
To: 'u...@spark.incubator.apache.org'
Subject: KMeans - java.lang.IllegalArgumentException: requirement failed

I am trying to train a KMeans model with sparse vector with Spark 1.0.1.
When I run the training I got the following exception:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
at 
org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)

What does this means? How do I troubleshoot this problem?
Thanks.

-Yao


Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext

2014-08-12 Thread chutium
more interesting is if spark-shell started on master node (test01)

then

parquetFile.saveAsParquetFile("tachyon://test01.zala:19998/parquet_tablex")

14/08/12 11:42:06 INFO : initialize(tachyon://...
...
...
14/08/12 11:42:06 INFO : File does not exist:
tachyon://test01.zala:19998/parquet_tablex/_metadata
14/08/12 11:42:06 INFO : getWorkingDirectory: /
14/08/12 11:42:06 INFO :
create(tachyon://test01.zala:19998/parquet_tablex/_metadata, rw-r--r--,
true, 65536, 1, 33554432, null)
14/08/12 11:42:06 WARN : tachyon.home is not set. Using
/mnt/tachyon_default_home as the default value.
14/08/12 11:42:06 INFO : Trying to get local worker host : test01.zala
14/08/12 11:42:06 ERROR : No local worker on test01.zala
NoWorkerException(message:No local worker on test01.zala)
at
tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25675)
at
tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25652)
at
tachyon.thrift.MasterService$user_getWorker_result.read(MasterService.java:25591)
at
tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at
tachyon.thrift.MasterService$Client.recv_user_getWorker(MasterService.java:832)
at
tachyon.thrift.MasterService$Client.user_getWorker(MasterService.java:818)
at tachyon.master.MasterClient.user_getWorker(MasterClient.java:648)
at tachyon.worker.WorkerClient.connect(WorkerClient.java:199)
at tachyon.worker.WorkerClient.mustConnect(WorkerClient.java:360)
at
tachyon.worker.WorkerClient.getUserUfsTempFolder(WorkerClient.java:298)
at
tachyon.client.TachyonFS.createAndGetUserUfsTempFolder(TachyonFS.java:270)
at tachyon.client.FileOutStream.(FileOutStream.java:72)
at tachyon.client.TachyonFile.getOutStream(TachyonFile.java:207)
at tachyon.hadoop.AbstractTFS.create(AbstractTFS.java:102)
at tachyon.hadoop.TFS.create(TFS.java:24)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:773)
at
parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:344)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:345)
at
org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:142)
at
org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:120)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:197)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:399)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:397)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:403)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:403)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:406)
at
org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at
org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line12.$read$$iwC$$iwC$$iwC$$iwC.(:17)
at $line12.$read$$iwC$$iwC$$iwC.(:22)
at $line12.$read$$iwC$$iwC.(:24)
at $line12.$read$$iwC.(:26)
at $line12.$read.(:28)
at $line12.$read$.(:32)
at $line12.$read$.()
at $line12.$eval$.(:7)
at $line12.$eval$.()
at $line12.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.sca

Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext

2014-08-12 Thread chutium
spark.speculation was not set, any speculative execution on tachyon side?

tachyon-env.sh only changed following

export TACHYON_MASTER_ADDRESS=test01.zala
#export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
export TACHYON_UNDERFS_ADDRESS=hdfs://test01.zala:8020
export TACHYON_WORKER_MEMORY_SIZE=16GB

test01.zala is master node for HDFS, tachyon, Spark, etc.
worker nodes are
test02.zala
test03.zala
test04.zala

spark-shell run on test02

after parquetFile.saveAsParquetFile("tachyon://test01.zala:19998/parquet_1")

i got FailedToCheckpointException with Failed to rename

but tfs lsr, there are some temporary files and metadata file

0.00 B08-11-2014 16:19:28:054 /parquet_1
881.00 B  08-11-2014 16:19:28:054  In Memory  /parquet_1/_metadata
0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary
0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary/0
0.00 B08-11-2014 16:19:28:931
/parquet_1/_temporary/0/_temporary
0.00 B08-11-2014 16:19:28:931
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33
0.00 B08-11-2014 16:19:28:931  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33/part-r-1.parquet
0.00 B08-11-2014 16:19:28:940
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35
0.00 B08-11-2014 16:19:28:940  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35/part-r-2.parquet
0.00 B08-11-2014 16:19:28:962
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36
0.00 B08-11-2014 16:19:28:962  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36/part-r-4.parquet
0.00 B08-11-2014 16:19:28:971
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34
0.00 B08-11-2014 16:19:28:971  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34/part-r-3.parquet
0.00 B08-11-2014 16:20:06:349
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37
0.00 B08-11-2014 16:20:06:349  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37/part-r-2.parquet
0.00 B08-11-2014 16:20:09:519
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38
0.00 B08-11-2014 16:20:09:519  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38/part-r-4.parquet
0.00 B08-11-2014 16:20:18:777
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39
0.00 B08-11-2014 16:20:18:777  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39/part-r-3.parquet
0.00 B08-11-2014 16:20:28:315
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40
0.00 B08-11-2014 16:20:28:315  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40/part-r-1.parquet
0.00 B08-11-2014 16:20:38:382
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41
0.00 B08-11-2014 16:20:38:382  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41/part-r-2.parquet
0.00 B08-11-2014 16:20:40:681
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42
0.00 B08-11-2014 16:20:40:681  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42/part-r-4.parquet
0.00 B08-11-2014 16:20:50:376
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43
0.00 B08-11-2014 16:20:50:376  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43/part-r-3.parquet
0.00 B08-11-2014 16:21:00:932
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44
0.00 B08-11-2014 16:21:00:932  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44/part-r-1.parquet
0.00 B08-11-2014 16:21:10:355
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45
0.00 B08-11-2014 16:21:10:355  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45/part-r-2.parquet
0.00 B08-11-2014 16:21:11:468
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46
0.00 B08-11-2014 16:21:11:468  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46/part-r-4.parquet
0.00 B08-11-2014 16:21:21:681
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47
0.00 B08-11-2014 16:21:21:681  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47/part-r-3.parquet
0.00 B08-11-2014 16:21:32:583

Re: Transform RDD[List]

2014-08-12 Thread Sean Owen
Sure, just add ".toList.sorted" in there. Putting together in one big
expression:

val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10)))
val result = 
rdd.flatMap(_.zipWithIndex).groupBy(_._2).values.map(_.map(_._1).toList.sorted)

List(2, 7)
List(1, 6)
List(4, 9)
List(3, 8)
List(5, 10)

On Tue, Aug 12, 2014 at 8:58 AM, Kevin Jung  wrote:
> Thanks for your answer.
> Yes, I want to transpose data.
> At this point, I have one more question.
> I tested it with
> RDD1
> List(1, 2, 3, 4, 5)
> List(6, 7, 8, 9, 10)
> List(11, 12, 13, 14, 15)
> List(16, 17, 18, 19, 20)
>
> And the result is...
> ArrayBuffer(11, 1, 16, 6)
> ArrayBuffer(2, 12, 7, 17)
> ArrayBuffer(3, 13, 18, 8)
> ArrayBuffer(9, 19, 4, 14)
> ArrayBuffer(15, 20, 10, 5)
>
> It collects well but the order is shuffled.
> Can I maintain the order?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [spark-streaming] kafka source and flow control

2014-08-12 Thread Gwenhael Pasquiers
I was hoping I could make the system behave as a blocking queue : if the 
outputs is too slow, buffers (storing space for RDDs) fills up, then blocks 
instead of dropping existing rdds, until the input itself blocks (slows down 
it’s consumption).

On a side note I was wondering: is there the same issue with file (hdfs) inputs 
? how can I be sure the input won’t “overflow” the process chain ?


From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: mardi 12 août 2014 02:58
To: Gwenhael Pasquiers
Cc: u...@spark.incubator.apache.org
Subject: Re: [spark-streaming] kafka source and flow control

Hi,

On Mon, Aug 11, 2014 at 9:41 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
We intend to apply other operations on the data later in the same spark 
context, but our first step is to archive it.

Our goal is somth like this
Step 1 : consume kafka
Step 2 : archive to hdfs AND send to step 3
Step 3 : transform data
Step 4 : save transformed data to HDFS as input for M/R

I see. Well I think Spark Streaming may be well suited for that purpose.

To us it looks like a great flaw if, in streaming mode, spark-streaming cannot 
slow down it’s consumption depending on the available resources.

On Mon, Aug 11, 2014 at 10:10 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
I think the kind of self-regulating system you describe would be too difficult 
to implement and probably unreliable (even more with the fact that we have 
multiple slaves).

Isn't "slow down its consumption depending on the available resources" a 
"self-regulating system"? I don't see how you can adapt to available resources 
without measuring your execution time and then change how much you consume. Did 
you have any particular form of adaption in mind?

Tobias


Re: Transform RDD[List]

2014-08-12 Thread Kevin Jung
Thanks for your answer. 
Yes, I want to transpose data.
At this point, I have one more question.
I tested it with
RDD1
List(1, 2, 3, 4, 5)
List(6, 7, 8, 9, 10)
List(11, 12, 13, 14, 15)
List(16, 17, 18, 19, 20)

And the result is... 
ArrayBuffer(11, 1, 16, 6)
ArrayBuffer(2, 12, 7, 17)
ArrayBuffer(3, 13, 18, 8)
ArrayBuffer(9, 19, 4, 14)
ArrayBuffer(15, 20, 10, 5)

It collects well but the order is shuffled.
Can I maintain the order?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948p11974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using very large files for KMeans training -- cluster centers size?

2014-08-12 Thread Xiangrui Meng
What did you set for driver memory? The default value is 256m or 512m,
which is too small. Try to set "--driver-memory 10g" with spark-submit
or spark-shell and see whether it works or not. -Xiangrui

On Mon, Aug 11, 2014 at 6:26 PM, durin  wrote:
> I'm trying to apply KMeans training to some text data, which consists of
> lines that each contain something between 3 and 20 words. For that purpose,
> all unique words are saved in a dictionary. This dictionary can become very
> large as no hashing etc. is done, but it should spill to disk in case it
> doesn't fit into memory anymore:
> var dict = scala.collection.mutable.Map[String,Int]()
> dict.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
> With the help of this dictionary, I build sparse feature vectors for each
> line which are then saved in an RDD that is used as input for KMeans.train.
>
> Spark is running in standalone mode, in this case with 5 worker nodes.
> It appears that anything up to the actual training completes successfully
> with 126G of training data (logs below).
>
> The training data is provided in form a cached, broadcasted variable to all
> worker nodes:
>
> var vectors2 =
> vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
> var broadcastVector = sc.broadcast(vectors2)
> println("-Start model training-");
> var model = KMeans.train(broadcastVector.value, 20, 10)
>
> The first error I get is a null pointer exception, but there is still work
> done after that. I think the real reason this terminates is
> java.lang.OutOfMemoryError: Java heap space.
>
> Is it possible that this happens because the cluster centers in the model
> are represented in dense instead of sparse form, thereby getting large with
> a large vector size? If yes, how can I make sure it doesn't crash because of
> that? It should spill to disk if necessary.
> My goal would be to have the input size only limited by disk space. Sure it
> would get very slow if it spills to disk all the time, but it shouldn't
> terminate.
>
>
>
> Here's the console output from the model.train part:
>
> -Start model training-
> 14/08/11 17:05:17 INFO spark.SparkContext: Starting job: takeSample at
> KMeans.scala:263
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Registering RDD 64
> (repartition at :48)
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Got job 6 (takeSample at
> KMeans.scala:263) with 1000 output partitions (allowLocal=false)
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Final stage: Stage
> 8(takeSample at KMeans.scala:263)
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Parents of final stage:
> List(Stage 9)
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Missing parents: List(Stage
> 9)
> 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Submitting Stage 9
> (MapPartitionsRDD[64] at repartition at :48), which has no missing
> parents
> 4116.323: [GC (Allocation Failure) [PSYoungGen: 1867168K->240876K(2461696K)]
> 4385155K->3164592K(9452544K), 1.4455064 secs] [Times: user=11.33 sys=0.03,
> real=1.44 secs]
> 4174.512: [GC (Allocation Failure) [PSYoungGen: 1679497K->763168K(2338816K)]
> 4603212K->3691609K(9329664K), 0.8050508 secs] [Times: user=6.04 sys=0.01,
> real=0.80 secs]
> 4188.250: [GC (Allocation Failure) [PSYoungGen: 2071822K->986136K(2383360K)]
> 5000263K->4487601K(9374208K), 1.6795174 secs] [Times: user=13.23 sys=0.01,
> real=1.68 secs]
> 14/08/11 17:06:57 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from Stage 9 (MapPartitionsRDD[64] at repartition at :48)
> 14/08/11 17:06:57 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with
> 1 tasks
> 4190.947: [GC (Allocation Failure) [PSYoungGen: 2336718K->918720K(2276864K)]
> 5838183K->5406145K(9267712K), 1.5793066 secs] [Times: user=12.40 sys=0.02,
> real=1.58 secs]
> 14/08/11 17:07:00 WARN scheduler.TaskSetManager: Stage 9 contains a task of
> very large size (272484 KB). The maximum recommended task size is 100 KB.
> 14/08/11 17:07:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
> 9.0 (TID 3053, idp11.foo.bar, PROCESS_LOCAL, 279023993 bytes)
> 4193.607: [GC (Allocation Failure) [PSYoungGen: 2070046K->599908K(2330112K)]
> 6557472K->5393557K(9320960K), 0.3267949 secs] [Times: user=2.53 sys=0.01,
> real=0.33 secs]
> 4194.645: [GC (Allocation Failure) [PSYoungGen: 1516770K->589655K(2330112K)]
> 6310419K->5383352K(9320960K), 0.2566507 secs] [Times: user=1.96 sys=0.00,
> real=0.26 secs]
> 4195.815: [GC (Allocation Failure) [PSYoungGen: 1730909K->275312K(2330112K)]
> 6524606K->5342865K(9320960K), 0.2053884 secs] [Times: user=1.57 sys=0.00,
> real=0.21 secs]
> 14/08/11 17:08:56 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in
> memory on idp11.foo.bar:46418 (size: 136.0 B, free: 10.4 GB)
> 14/08/11 17:08:56 INFO spark.MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 1 to sp...@idp11.foo.bar:57072
> 14/08/11 17

Re: How to save mllib model to hdfs and reload it

2014-08-12 Thread Xiangrui Meng
For linear models, the constructors are now public. You can save the
weights to HDFS, then load the weights back and use the constructor to
create the model. -Xiangrui

On Mon, Aug 11, 2014 at 10:27 PM, XiaoQinyu  wrote:
> hello:
>
> I want to know,if I use history data to training model and I want to use
> this model in other app.How should I do?
>
> Should I save this model in disk? And when I use this model then load it
> from disk.But I don't know how to save the mllib model,and reload it?
>
> I will be very pleasure,if anyone can give some tips.
>
> Thanks
>
> XiaoQinyu
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-tp11953.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread Xiangrui Meng
Assuming that your data is very sparse, I would recommend
RDD.repartition. But if it is not the case and you don't want to
shuffle the data, you can try a CombineInputFormat and then parse the
lines into labeled points. Coalesce may cause locality problems if you
didn't use the right number of partitions. -Xiangrui

On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong  wrote:
> I think this has the same effect and issue with #1, right?
>
>
> On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen 
> wrote:
>>
>> How about increase HDFS file extent size? like current value is 128M, we
>> make it 512M or bigger.
>>
>>
>> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong 
>> wrote:
>>>
>>> Hi all,
>>>
>>> We are trying to use Spark MLlib to train super large data (100M features
>>> and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
>>> will create a task for every partition at each iteration. But because our
>>> dimensions are also very high, such large number of tasks will increase
>>> large network overhead to transfer the weight vector. So we want to reduce
>>> the number of tasks, we tried below ways:
>>>
>>> 1. Coalesce partitions without shuffling, then cache.
>>>
>>> data.coalesce(numPartitions).cache()
>>>
>>> This works fine for relative small data, but when data is increasing and
>>> numPartitions is fixed, the size of one partition will be large. This
>>> introduces two issues: the first is, the larger partition will need larger
>>> object and more memory at runtime, and trigger GC more frequently; the
>>> second is, we meet the issue 'size exceeds integer.max_value' error, which
>>> seems be caused by the size of one partition larger than 2G
>>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>
>>> 2. Coalesce partitions with shuffling, then cache.
>>>
>>> data.coalesce(numPartitions, true).cache()
>>>
>>> It could mitigate the second issue in #1 at some degree, but fist issue
>>> is still there, and it also will introduce large amount of shullfling.
>>>
>>> 3. Cache data first, and coalesce partitions.
>>>
>>> data.cache().coalesce(numPartitions)
>>>
>>> In this way, the number of cached partitions is not change, but each task
>>> read the data from multiple partitions. However, I find the task will loss
>>> locality by this way. I find a lot of 'ANY' tasks, that means that tasks
>>> read data from other nodes, and become slower than that read data from local
>>> memory.
>>>
>>> I think the best way should like #3, but leverage locality as more as
>>> possible. Is there any way to do that? Any suggestions?
>>>
>>> Thanks!
>>>
>>> --
>>> ZHENG, Xu-dong
>>>
>>
>
>
>
> --
> 郑旭东
> ZHENG, Xu-dong
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread Tathagata Das
The long lineage causes a long/deep Java object tree (DAG of RDD objects),
which needs to be serialized as part of the task creation. When
serializing, the whole object DAG needs to be traversed leading to the
stackoverflow error.

TD


On Mon, Aug 11, 2014 at 7:14 PM, randylu  wrote:

> hi, TD. I also fall into the trap of long lineage, and your suggestions do
> work well. But i don't understand why the long lineage can cause stackover,
> and where it takes effect?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Killing spark app problem

2014-08-12 Thread Grzegorz Białek
Hi,

when I run some spark application on my local machine using spark-submit:
$SPARK_HOME/bin/spark-submit --driver-memory 1g  
When I want to interrupt computing by ctrl-c it interrupt current stage but
later it waits and exit after around 5min and sometimes doesn't exit at all,
and the only way that I was able to kill it was "kill -9 ", but after
this
my system doesn't want to boot correctly after reboot.

Is there a better way to properly kill Spark application?

Thanks,
Grzegorz


Re: Transform RDD[List]

2014-08-12 Thread Sean Owen
-incubator, +user

So, are you trying to "transpose" your data?

val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10))).repartition(2)

First you could pair each value with its position in its list:

val withIndex = rdd.flatMap(_.zipWithIndex)

then group by that position, and discard the position:

withIndex.groupBy(_._2).values.map(_.map(_._1))

Printing the RDD gives what you want:

List(5, 10)
List(1, 6)
List(3, 8)
List(2, 7)
List(4, 9)

On Tue, Aug 12, 2014 at 5:42 AM, Kevin Jung  wrote:
> Hi
> It may be simple question, but I can not figure out the most efficient way.
> There is a RDD containing list.
>
> RDD
> (
>  List(1,2,3,4,5)
>  List(6,7,8,9,10)
> )
>
> I want to transform this to
>
> RDD
> (
> List(1,6)
> List(2,7)
> List(3,8)
> List(4,9)
> List(5,10)
> )
>
> And I want to achieve this without using collect method because realworld
> RDD can have a lot of elements then it may cause out of memory.
> Any ideas will be welcome.
>
> Best regards
> Kevin
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parallelizing a task makes it freeze

2014-08-12 Thread sparkuser2345
Actually the program hangs just by calling dataAllRDD.count(). I suspect
creating the RDD is not successful when its elements are too big. When nY =
3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits =
9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB). 

What are the limiting factors to the size of the elements of an RDD? 


sparkuser2345 wrote
> I have an array 'dataAll' of key-value pairs where each value is an array
> of arrays. I would like to parallelize a task over the elements of
> 'dataAll' to the workers. In the dummy example below, the number of
> elements in 'dataAll' is 3 but in real application it would be tens to
> hundreds. 
> 
> Without parallelizing dataAll, 'result' is calculated in less than a
> second: 
> 
> import org.jblas.DoubleMatrix  
> 
> val nY = 5000
> val nX = 400
> 
> val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
> (2, Array.fill(nY)(Array.fill(nX)(1.0))),
> (3, Array.fill(nY)(Array.fill(nX)(1.0
> 
> val w1 = DoubleMatrix.ones(400)
> 
> // This finishes in less than a second: 
> val result = dataAll.map { dat =>
>   val c   = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
>   }
>   (c, test)
> }
> 
> However, when I parallelize dataAll, the same task freezes: 
> 
> val dataAllRDD = sc.parallelize(dataAll, 3)
> 
> // This doesn't finish in several minutes: 
> val result = dataAllRDD.map { dat =>
>   val c   = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
>   }
>   (c, test)
> }.collect
> 
> After sending the above task, nothing is written to the worker logs (as
> viewed through the web UI), but the following output is printed in the
> Spark shell where I'm running the task: 
> 
> 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at 
> 
> :33
> 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at 
> 
> :33) with 3 output partitions (allowLocal=false)
> 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at 
> 
> :33)
> 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
> map at 
> 
> :23), which has no missing parents
> 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage
> 0 (MappedRDD[1] at map at 
> 
> :23)
> 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor 2: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
> bytes in 69 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
> executor 1: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
> bytes in 81 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
> executor 0: 
> 
>  (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
> bytes in 66 ms
> 
> 
> dataAllRDD.map does work with smaller array though (e.g. nY = 100;
> finishes in less than a second). 
> 
> Why is dataAllRDD.map so much slower than dataAll.map, or even not
> executing at all? 
> 
> The Spark version I'm using is 0.9.0.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark: Could not load native gpl library

2014-08-12 Thread Andrew Ash
Hi Jikai,

The reason I ask is because your stacktrace has this section in it:

com.hadoop.compression.lzo.GPLNativeCodeLoader.(
GPLNativeCodeLoader.java:32)
at com.hadoop.compression.lzo.LzoCodec.(LzoCodec.java:71)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(
Configuration.java:1659)
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1624)
at
org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(
CompressionCodecFactory.java:128)
at
org.apache.hadoop.io.compress.CompressionCodecFactory.
(CompressionCodecFactory.java:175)

Maybe you have the Lzo codec defined in your
core-site.xml:io.compression.codecs
setting?  In the short run you could disable it.

In the long run, I wonder if this is an issue with YARN not propagating the
setting through to the executors.  Have you tried in other cluster
deployment modes?




On Fri, Aug 8, 2014 at 7:38 AM, Jikai Lei  wrote:

> Thanks Andrew.  Actually my job did not use any data in .lzo format. Here
> is
> the program itself:
>
> import org.apache.spark._
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>
> object Test {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("SparkMLTest")
> val sc = new SparkContext(sparkConf)
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://url:8020/user/jilei/sparktesttraining_libsvmfmt_10k.txt"
> val model = LogisticRegressionWithSGD.train(training, numIterations
> = 20)
>   }
> }
>
> I copied this form a github gist and want to have a try. The file is a
> libsvm format file and is in HDFS (I removed the actual hdfs url here in
> the
> code.)
>
> And in the spark-env.sh file, I set the evns:
> export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native/
> export
>
> SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.2.0.2.0.6.0-61.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar
>
> Here is the content of the /apache/hadoop/lib/native/ folder:
> ls /apache/hadoop/lib/native/
> libgplcompression.a   libgplcompression.solibgplcompression.so.0.0.0
> libhadooppipes.a  libhadoop.so.1.0.0  libhdfs.a   libhdfs.so.0.0.0
> libsnappy.so.1
> libgplcompression.la  libgplcompression.so.0  libhadoop.a
> libhadoop.so  libhadooputils.alibhdfs.so  libsnappy.so
> libsnappy.so.1.1.4
>
>
>
>
> Andrew Ash wrote
> > Hi Jikai,
> >
> > It looks like you're trying to run a Spark job on data that's stored in
> > HDFS in .lzo format.  Spark can handle this (I do it all the time), but
> > you
> > need to configure your Spark installation to know about the .lzo format.
> >
> > There are two parts to the hadoop lzo library -- the first is the jar
> > (hadoop-lzo.jar) and the second is the native library
> > (libgplcompression.{a,so,la} and liblzo2.{a,so,la}).  You need the jar on
> > the classpath across your cluster, but also the native libraries exposed
> > as
> > well.
> >
> > In Spark 1.0.1 I modify entries in spark-env.sh: set SPARK_LIBRARY_PATH
> to
> > include the path to the native library directory
> > (e.g. /path/to/hadoop/lib/native/Linux-amd64-64) and SPARK_CLASSPATH to
> > include the hadoop-lzo jar.
> >
> > Hope that helps,
> > Andrew
> >
> >
> > On Thu, Aug 7, 2014 at 7:19 PM, Xiangrui Meng <
>
> > mengxr@
>
> > > wrote:
> >
> >> Is the GPL library only available on the driver node? If that is the
> >> case, you need to add them to `--jars` option of spark-submit.
> >> -Xiangrui
> >>
> >> On Thu, Aug 7, 2014 at 6:59 PM, Jikai Lei <
>
> > hangelwen@
>
> > > wrote:
> >> > I had the following error when trying to run a very simple spark job
> >> (which
> >> > uses logistic regression with SGD in mllib):
> >> >
> >> > ERROR GPLNativeCodeLoader: Could not load native gpl library
> >> > java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
> >> > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)
> >> > at java.lang.Runtime.loadLibrary0(Runtime.java:823)
> >> > at java.lang.System.loadLibrary(System.java:1028)
> >> > at
> >> >
> >> com.hadoop.compression.lzo.GPLNativeCodeLoader.
> > 
> > (GPLNativeCodeLoader.java:32)
> >> > at com.hadoop.compression.lzo.LzoCodec.
> > 
> > (LzoCodec.java:71)
> >> > at java.lang.Class.forName0(Native Method)
> >> > at java.lang.Class.forName(Class.java:247)
> >> > at
> >> >
> >>
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659)
> >> > at
> >> >
> >>
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1624)
> >> > at
> >> >
> >>
> org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
> >> > at
> >> >
> >> org.apache.hadoop.io.compress.CompressionCodecFactory.
> > 
> > (CompressionCodecFactory.java:175)
> >> > at
> >> >
> >>
> org.apache.hado