Re: Contribution to Spark MLLib

2014-06-17 Thread Jayati
Hello Xiangrui,

Thanks for sharing the roadmap. I really helped.

Regards,
Jayati





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7826.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Wildcard support in input path

2014-06-17 Thread Jianshi Huang
Hi all,

Thanks for the reply. I'm using parquetFile as input, is that a problem? In
hadoop fs -ls, the path
(hdfs://domain/user/jianshuang/data/parquet/table/month=2014*)
will get list all the files.

I'll test it again.

Jianshi


On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang 
wrote:

> Hi Andrew,
>
> Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
> file not found. I'll try again.
>
> Jianshi
>
>
> On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash  wrote:
>
>> In Spark you can use the normal globs supported by Hadoop's FileSystem,
>> which are documented here:
>> http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)
>>
>>
>> On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW 
>> wrote:
>>
>>> Hi Jianshi,
>>>
>>> I have used wild card characters (*) in my program and it worked..
>>> My code was like this
>>> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>>>
>>> Thanks & Regards,
>>> Meethu M
>>>
>>>
>>>   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
>>> jianshi.hu...@gmail.com> wrote:
>>>
>>>
>>>  It would be convenient if Spark's textFile, parquetFile, etc. can
>>> support path with wildcard, such as:
>>>
>>>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>>>
>>>  Or is there already a way to do it now?
>>>
>>> Jianshi
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>>
>>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Wildcard support in input path

2014-06-17 Thread Jianshi Huang
Hi Andrew,

Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
file not found. I'll try again.

Jianshi


On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash  wrote:

> In Spark you can use the normal globs supported by Hadoop's FileSystem,
> which are documented here:
> http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)
>
>
> On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW 
> wrote:
>
>> Hi Jianshi,
>>
>> I have used wild card characters (*) in my program and it worked..
>> My code was like this
>> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>>
>> Thanks & Regards,
>> Meethu M
>>
>>
>>   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
>> jianshi.hu...@gmail.com> wrote:
>>
>>
>>  It would be convenient if Spark's textFile, parquetFile, etc. can
>> support path with wildcard, such as:
>>
>>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>>
>>  Or is there already a way to do it now?
>>
>> Jianshi
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Memory footprint of Calliope: Spark -> Cassandra writes

2014-06-17 Thread tj opensource
Gerard,

We haven't done a test on Calliope vs a driver.

The thing is Calliope builds on C* thrift (and latest build on DS driver)
and the performance in terms of simple write will be similar to any
existing driver. But then that is not the use case for Calliope.

It is built to be used from Spark and to harness the distributed nature of
Spark. With a regular driver you would have to take care of multithreading,
splitting the data, etc. While with spark and Calliope this comes free.

Regards,
Rohit



On Tue, Jun 17, 2014 at 9:24 PM, Gerard Maas  wrote:

> Hi Rohit,
>
> Thanks a lot for looking at this.  The intention of calculating the data
> upfront it to only benchmark the time it takes store in records/sec
> eliminating the generation factor from it (which will be different on the
> real scenario, reading from HDFS)
> I used a profiler today and indeed it's not the storage part, but the
> generation that's bloating the memory.  Objects in memory take surprisingly
> more space that one would expect based on the data they hold. In my case it
> was 2.1x the size of the original data.
>
> Now that  we are talking about this, do you have some figures of how
> Calliope compares -performance wise- to a classic Cassandra driver
> (DataStax / Astyanax) ?  that would be awesome.
>
> Thanks again!
>
> -kr, Gerard.
>
>
>
>
>
> On Tue, Jun 17, 2014 at 4:27 PM, tj opensource 
> wrote:
>
>> Dear Gerard,
>>
>> I just tried the code you posted in the gist (
>> https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a
>> OOM. It is cause of the data being generated locally and then paralellized
>> -
>>
>>
>> --
>>
>>
>>
>> val entries = for (i <- 1 to total) yield {
>>
>>
>>
>>
>>   Array(s"devy$i", "aggr", "1000", "sum", (i to i+10).mkString(","))
>>
>>
>>
>>
>> }
>>
>>
>>
>> val rdd = sc.parallelize(entries,8)
>>
>>
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> This will generate all the data on the local system and then try to
>> partition it.
>>
>> Instead, we should paralellize the keys (i <- 1 to total) and generate
>> data in the map tasks. This is *closer* to what you will get if you
>> distribute out a file on a DFS like HDFS/SnackFS.
>>
>> I have made the change in the script here (
>> https://gist.github.com/milliondreams/aac52e08953949057e7d)
>>
>>
>> --
>>
>>
>>
>> val rdd = sc.parallelize(1 to total, 8).map(i => Array(s"devy$i", 
>> "aggr", "1000", "sum", (i to i+10).mkString(",")))
>>
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> I was able to insert 50M records using just over 350M RAM. Attaching the
>> log and screenshot.
>>
>> Let me know if you still face this issue... we can do a screen share and
>> resolve thee issue there.
>>
>> And thanks for using Calliope. I hope it serves your needs.
>>
>> Cheers,
>> Rohit
>>
>>
>> On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> I've been doing some testing with Calliope as a way to do batch load
>>> from Spark into Cassandra.
>>> My initial results are promising on the performance area, but worrisome
>>> on the memory footprint side.
>>>
>>> I'm generating N records of about 50 bytes each and using the UPDATE
>>> mutator to insert them into C*.   I get OOM if my memory is below 1GB per
>>> million of records, or about 50Mb of raw data (without counting any
>>> RDD/structural overhead).  (See code [1])
>>>
>>> (so, to avoid confusions: e.g.: I need 4GB RAM to save  4M of 50Byte
>>> records to Cassandra)  That's an order of magnitude more than the RAW data.
>>>
>>> I understood that Calliope builds on top of the Hadoop support of
>>> Cassandra, which builds on top of SSTables and sstableloader.
>>>
>>> I would like to know what's the memory usage factor of Calliope and what
>>> parameters could I use to control/tune that.
>>>
>>> Any experience/advice on that?
>>>
>>>  -kr, Gerard.
>>>
>>> [1] https://gist.github.com/maasg/68de6016bffe5e71b78c
>>>
>>
>>
>


Re: question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-17 Thread santhoma
Thanks, I hope this problem will go away once I upgrade to spark 1.0 where we
can send the clusterwide classpaths using spark-submit command



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Jeremy Lee
Ah, right. So only the launch script has changed. Everything else is still
essentially binary compatible?

Well, that makes it too easy! Thanks!


On Wed, Jun 18, 2014 at 2:35 PM, Patrick Wendell  wrote:

> Actually you'll just want to clone the 1.0 branch then use the
> spark-ec2 script in there to launch your cluster. The --spark-git-repo
> flag is if you want to launch with a different version of Spark on the
> cluster. In your case you just need a different version of the launch
> script itself, which will be present in the 1.0 branch of Spark.
>
> - Patrick
>
> On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee
>  wrote:
> > I am about to spin up some new clusters, so I may give that a go... any
> > special instructions for making them work? I assume I use the "
> > --spark-git-repo=" option on the spark-ec2 command. Is it as easy as
> > concatenating your string as the value?
> >
> > On cluster management GUIs... I've been looking around at Amabari,
> Datastax,
> > Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone
> > using a good one I should know about? I'm really beginning to lean in the
> > direction of Cassandra as the distributed data store...
> >
> >
> > On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell 
> wrote:
> >>
> >> By the way, in case it's not clear, I mean our maintenance branches:
> >>
> >> https://github.com/apache/spark/tree/branch-1.0
> >>
> >> On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell 
> >> wrote:
> >> > Hey Jeremy,
> >> >
> >> > This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
> >> > make a 1.0.1 release soon (this patch being one of the main reasons),
> >> > but if you are itching for this sooner, you can just checkout the head
> >> > of branch-1.0 and you will be able to use r3.XXX instances.
> >> >
> >> > - Patrick
> >> >
> >> > On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
> >> >  wrote:
> >> >> Some people (me included) might have wondered why all our m1.large
> spot
> >> >> instances (in us-west-1) shut down a few hours ago...
> >> >>
> >> >> Simple reason: The EC2 spot price for Spark's default "m1.large"
> >> >> instances
> >> >> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times.
> >> >> Probably
> >> >> something to do with world cup.
> >> >>
> >> >> So far this is just us-west-1, but prices have a tendency to equalize
> >> >> across
> >> >> centers as the days pass. Time to make backups and plans.
> >> >>
> >> >> "m3" spot prices are still down at $0.02 (and being new, will be
> >> >> bypassed by
> >> >> older systems), so it would be REAAALLYY nice if there had been some
> >> >> progress on that issue. Let me know if I can help with testing and
> >> >> whatnot.
> >> >>
> >> >>
> >> >> --
> >> >> Jeremy Lee  BCompSci(Hons)
> >> >>   The Unorthodox Engineers
> >
> >
> >
> >
> > --
> > Jeremy Lee  BCompSci(Hons)
> >   The Unorthodox Engineers
>



-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Re: question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-17 Thread Krishna Sankar
Santhosh,
   All the nodes should have access to the jar file and so the classpath
should be on all the nodes. Usually it is better to rsync the conf
directory to all nodes rather than editing them separately.
Cheers



On Tue, Jun 17, 2014 at 9:26 PM, santhoma  wrote:

> Hi,
>
> This is about spark 0.9.
> I have a 3 node spark cluster. I want to add a locally available jarfile
> (present on all nodes) to the SPARK_CLASPATH variable in
> /etc/spark/conf/spark-env.sh  so that all nodes can access it.
>
> Question is,
> should I edit 'spark-env.sh' on all nodes to add the jar  ?
> Or, is it enough to add it only in the master node from where I am
> submitting jobs?
>
> thanks
> Santhosh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


SparkR Installation

2014-06-17 Thread Stuti Awasthi
Hi All,

I wanted to try SparkR. Do we need preinstalled R on all the nodes of the 
cluster before installing SparkR package ? Please guide me how to proceed with 
this. As of now, I work with R only on single node.
Please suggest

Thanks
Stuti Awasthi


::DISCLAIMER::


The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only.
E-mail transmission is not guaranteed to be secure or error-free as information 
could be intercepted, corrupted,
lost, destroyed, arrive late or incomplete, or may contain viruses in 
transmission. The e mail and its contents
(with or without referred errors) shall therefore not attach any liability on 
the originator or HCL or its affiliates.
Views or opinions, if any, presented in this email are solely those of the 
author and may not necessarily reflect the
views or opinions of HCL or its affiliates. Any form of reproduction, 
dissemination, copying, disclosure, modification,
distribution and / or publication of this message without the prior written 
consent of authorized representative of
HCL is strictly prohibited. If you have received this email in error please 
delete it and notify the sender immediately.
Before opening any email and/or attachments, please check them for viruses and 
other defects.




Re: Issue while trying to aggregate with a sliding window

2014-06-17 Thread Hatch M
Thanks! Will try to get the fix and retest.


On Tue, Jun 17, 2014 at 5:30 PM, onpoq l  wrote:

> There is a bug:
>
> https://github.com/apache/spark/pull/961#issuecomment-45125185
>
>
> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M  wrote:
> > Trying to aggregate over a sliding window, playing with the slide
> duration.
> > Playing around with the slide interval I can see the aggregation works
> but
> > mostly fails with the below error. The stream has records coming in at
> > 100ms.
> >
> > JavaPairDStream aggregatedDStream =
> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
> > Duration(60));
> >
> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
> > invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and
> > difference is 1100 ms
> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
> > 1403050486900 ms
> > java.util.NoSuchElementException: key not found: 1403050486900 ms
> > at scala.collection.MapLike$class.default(MapLike.scala:228)
> >
> > Any hints on whats going on here?
> > Thanks!
> > Hatch
> >
>


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-17 Thread Patrick Wendell
Out of curiosity - are you guys using speculation, shuffle
consolidation, or any other non-default option? If so that would help
narrow down what's causing this corruption.

On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
 wrote:
> Matt/Ryan,
>
> Did you make any headway on this? My team is running into this also.
> Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> generate 100s of GBs in the flow itself.
>
> -Suren
>
>
>
>
> On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton  wrote:
>
>> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>> cluster (no modifications to Spark or its dependencies). The error
>> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>> edge list (GraphX worked beautifully on smaller data).
>>
>> Here's the stacktrace (it's quite similar to yours
>> https://imgur.com/7iBA4nJ ).
>>
>> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>> 4 times; aborting job
>> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>> VertexRDD.scala:100
>> Exception in thread "main" org.apache.spark.SparkException: Job
>> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>> recent failure: Exception failure in TID 29735 on host node18:
>> java.io.StreamCorruptedException: invalid type code: AC
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>>
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>>
>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> org.apache.spark.scheduler.Task.run(Task.scala:51)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> java.lang.Thread.run(Thread.java:662)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurren

Re: Un-serializable 3rd-party classes (Spark, Java)

2014-06-17 Thread Matei Zaharia
There are a few options:

- Kryo might be able to serialize these objects out of the box, depending 
what’s inside them. Try turning it on as described at 
http://spark.apache.org/docs/latest/tuning.html.

- If that doesn’t work, you can create your own “wrapper” objects that 
implement Serializable, or even a subclass of FlexCompRowMatrix. No need to 
change the original library.

- If the library has its own serialization functions, you could also use those 
inside a wrapper object. Take a look at 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
 for an example where we make Hadoop’s Writables serializable.

Matei

On Jun 17, 2014, at 10:11 PM, Daedalus  wrote:

> I'm trying to use  matrix-toolkit-java
>    for an application of
> mine, particularly ,the FlexCompRowMatrix class (used to store sparse
> matrices).
> 
> I have a class Dataframe -- which contains and int array, two double values,
> and one FlexCompRowMatrix.
> 
> When I try and run a simple Spark .foreach() on a JavaRDD created using a
> list of the above mentioned Dataframes, I get the following errors:
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to s
> tage failure:* Task not serializable: java.io.NotSerializableException:
> no.uib.ci
> pr.matrix.sparse.FlexCompRowMatrix*
>at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA
> GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> 
> The FlexCompRowMatrix doesn't seem to implement Serializable. This class
> suits my purpose very well, and I would prefer not to switch over from it.
> 
> Other than writing code to make the class serializable, and then recompiling
> the matrix-toolkit-java source, what options do I have?
> 
> Is there any workaround for this issue?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Un-serializable 3rd-party classes (Spark, Java)

2014-06-17 Thread Daedalus
I'm trying to use  matrix-toolkit-java
   for an application of
mine, particularly ,the FlexCompRowMatrix class (used to store sparse
matrices).

I have a class Dataframe -- which contains and int array, two double values,
and one FlexCompRowMatrix.

When I try and run a simple Spark .foreach() on a JavaRDD created using a
list of the above mentioned Dataframes, I get the following errors:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to s
tage failure:* Task not serializable: java.io.NotSerializableException:
no.uib.ci
pr.matrix.sparse.FlexCompRowMatrix*
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA
GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

The FlexCompRowMatrix doesn't seem to implement Serializable. This class
suits my purpose very well, and I would prefer not to switch over from it.

Other than writing code to make the class serializable, and then recompiling
the matrix-toolkit-java source, what options do I have?

Is there any workaround for this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark SQL: No function to evaluate expression

2014-06-17 Thread Michael Armbrust
Yeah, sorry that error message is not very intuitive.  There is already a
JIRA open to make it better: SPARK-2059


Also, a bug has been fixed in master regarding attributes that contain "_".
 So if you are running 1.0 you might try upgrading.


On Wed, Jun 18, 2014 at 4:05 AM, Tobias Pfeiffer  wrote:

> The error message *means* that there is no column called c_address.
> However, maybe it's a bug with Spark SQL not understanding the
> a.c_address syntax. Can you double-check the column name is correct?
>
> Thanks
> Tobias
>
> On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat
>  wrote:
> > Dear all,
> >
> > I am trying to run the following query on Spark SQL using some custom
> TPC-H
> > tables with standalone Spark cluster configuration:
> >
> > SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE
> > a.c_address <> b.c_address;
> >
> > Unfortunately I get the following error during execution:
> >
> > java.lang.reflect.InvocationTargetException
> >
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.lang.reflect.Method.invoke(Method.java:606)
> >
> > at
> >
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
> >
> > at
> > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> >
> > Caused by: org.apache.spark.SparkException: Job aborted due to stage
> > failure: Task 0.0:2 failed 4 times, most recent failure: Exception
> failure
> > in TID 12 on host kw2260.kaust.edu.sa:
> > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
> function
> > to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address
> >
> >
> >
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59)
> >
> >
> >
> org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147)
> >
> >
> > org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74)
> >
> >
> > org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100)
> >
> >
> > Is this a bug or am I doing something wrong?
> >
> >
> > Regards,
> >
> > Zuhair Khayyat
>


Re: Wildcard support in input path

2014-06-17 Thread Andrew Ash
In Spark you can use the normal globs supported by Hadoop's FileSystem,
which are documented here:
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW 
wrote:

> Hi Jianshi,
>
> I have used wild card characters (*) in my program and it worked..
> My code was like this
> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>
> Thanks & Regards,
> Meethu M
>
>
>   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
> jianshi.hu...@gmail.com> wrote:
>
>
>  It would be convenient if Spark's textFile, parquetFile, etc. can
> support path with wildcard, such as:
>
>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>
>  Or is there already a way to do it now?
>
> Jianshi
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>


Re: Wildcard support in input path

2014-06-17 Thread Patrick Wendell
These paths get passed directly to the Hadoop FileSystem API and I
think the support globbing out-of-the box. So AFAIK it should just
work.

On Tue, Jun 17, 2014 at 9:09 PM, MEETHU MATHEW  wrote:
> Hi Jianshi,
>
> I have used wild card characters (*) in my program and it worked..
> My code was like this
> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>
> Thanks & Regards,
> Meethu M
>
>
> On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang 
> wrote:
>
>
> It would be convenient if Spark's textFile, parquetFile, etc. can support
> path with wildcard, such as:
>
>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>
> Or is there already a way to do it now?
>
> Jianshi
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>


Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Patrick Wendell
Actually you'll just want to clone the 1.0 branch then use the
spark-ec2 script in there to launch your cluster. The --spark-git-repo
flag is if you want to launch with a different version of Spark on the
cluster. In your case you just need a different version of the launch
script itself, which will be present in the 1.0 branch of Spark.

- Patrick

On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee
 wrote:
> I am about to spin up some new clusters, so I may give that a go... any
> special instructions for making them work? I assume I use the "
> --spark-git-repo=" option on the spark-ec2 command. Is it as easy as
> concatenating your string as the value?
>
> On cluster management GUIs... I've been looking around at Amabari, Datastax,
> Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone
> using a good one I should know about? I'm really beginning to lean in the
> direction of Cassandra as the distributed data store...
>
>
> On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell  wrote:
>>
>> By the way, in case it's not clear, I mean our maintenance branches:
>>
>> https://github.com/apache/spark/tree/branch-1.0
>>
>> On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell 
>> wrote:
>> > Hey Jeremy,
>> >
>> > This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
>> > make a 1.0.1 release soon (this patch being one of the main reasons),
>> > but if you are itching for this sooner, you can just checkout the head
>> > of branch-1.0 and you will be able to use r3.XXX instances.
>> >
>> > - Patrick
>> >
>> > On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
>> >  wrote:
>> >> Some people (me included) might have wondered why all our m1.large spot
>> >> instances (in us-west-1) shut down a few hours ago...
>> >>
>> >> Simple reason: The EC2 spot price for Spark's default "m1.large"
>> >> instances
>> >> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times.
>> >> Probably
>> >> something to do with world cup.
>> >>
>> >> So far this is just us-west-1, but prices have a tendency to equalize
>> >> across
>> >> centers as the days pass. Time to make backups and plans.
>> >>
>> >> "m3" spot prices are still down at $0.02 (and being new, will be
>> >> bypassed by
>> >> older systems), so it would be REAAALLYY nice if there had been some
>> >> progress on that issue. Let me know if I can help with testing and
>> >> whatnot.
>> >>
>> >>
>> >> --
>> >> Jeremy Lee  BCompSci(Hons)
>> >>   The Unorthodox Engineers
>
>
>
>
> --
> Jeremy Lee  BCompSci(Hons)
>   The Unorthodox Engineers


Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Jeremy Lee
I am about to spin up some new clusters, so I may give that a go... any
special instructions for making them work? I assume I use the
" --spark-git-repo=" option on the spark-ec2 command. Is it as easy as
concatenating your string as the value?

On cluster management GUIs... I've been looking around at Amabari,
Datastax, Cloudera, OpsCenter etc. Not totally convinced by any of them
yet. Anyone using a good one I should know about? I'm really beginning to
lean in the direction of Cassandra as the distributed data store...


On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell  wrote:

> By the way, in case it's not clear, I mean our maintenance branches:
>
> https://github.com/apache/spark/tree/branch-1.0
>
> On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell 
> wrote:
> > Hey Jeremy,
> >
> > This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
> > make a 1.0.1 release soon (this patch being one of the main reasons),
> > but if you are itching for this sooner, you can just checkout the head
> > of branch-1.0 and you will be able to use r3.XXX instances.
> >
> > - Patrick
> >
> > On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
> >  wrote:
> >> Some people (me included) might have wondered why all our m1.large spot
> >> instances (in us-west-1) shut down a few hours ago...
> >>
> >> Simple reason: The EC2 spot price for Spark's default "m1.large"
> instances
> >> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times.
> Probably
> >> something to do with world cup.
> >>
> >> So far this is just us-west-1, but prices have a tendency to equalize
> across
> >> centers as the days pass. Time to make backups and plans.
> >>
> >> "m3" spot prices are still down at $0.02 (and being new, will be
> bypassed by
> >> older systems), so it would be REAAALLYY nice if there had been some
> >> progress on that issue. Let me know if I can help with testing and
> whatnot.
> >>
> >>
> >> --
> >> Jeremy Lee  BCompSci(Hons)
> >>   The Unorthodox Engineers
>



-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-17 Thread santhoma
Hi, 

This is about spark 0.9. 
I have a 3 node spark cluster. I want to add a locally available jarfile
(present on all nodes) to the SPARK_CLASPATH variable in
/etc/spark/conf/spark-env.sh  so that all nodes can access it.

Question is,
should I edit 'spark-env.sh' on all nodes to add the jar  ?
Or, is it enough to add it only in the master node from where I am
submitting jobs?

thanks
Santhosh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Wildcard support in input path

2014-06-17 Thread MEETHU MATHEW
Hi Jianshi,

I have used wild card characters (*) in my program and it worked..
My code was like this
b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")

 
Thanks & Regards, 
Meethu M


On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang  
wrote:
 


It would be convenient if Spark's textFile, parquetFile, etc. can support path 
with wildcard, such as:

  hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

Or is there already a way to do it now?

Jianshi

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Wildcard support in input path

2014-06-17 Thread Jianshi Huang
It would be convenient if Spark's textFile, parquetFile, etc. can support
path with wildcard, such as:

  hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

Or is there already a way to do it now?

Jianshi

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Patrick Wendell
By the way, in case it's not clear, I mean our maintenance branches:

https://github.com/apache/spark/tree/branch-1.0

On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell  wrote:
> Hey Jeremy,
>
> This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
> make a 1.0.1 release soon (this patch being one of the main reasons),
> but if you are itching for this sooner, you can just checkout the head
> of branch-1.0 and you will be able to use r3.XXX instances.
>
> - Patrick
>
> On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
>  wrote:
>> Some people (me included) might have wondered why all our m1.large spot
>> instances (in us-west-1) shut down a few hours ago...
>>
>> Simple reason: The EC2 spot price for Spark's default "m1.large" instances
>> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably
>> something to do with world cup.
>>
>> So far this is just us-west-1, but prices have a tendency to equalize across
>> centers as the days pass. Time to make backups and plans.
>>
>> "m3" spot prices are still down at $0.02 (and being new, will be bypassed by
>> older systems), so it would be REAAALLYY nice if there had been some
>> progress on that issue. Let me know if I can help with testing and whatnot.
>>
>>
>> --
>> Jeremy Lee  BCompSci(Hons)
>>   The Unorthodox Engineers


rdd.cache() is not faster?

2014-06-17 Thread Wei Tan
Hi, I have a 40G file which is a concatenation of multiple documents, I 
want to extract two features (title and tables) from each doc, so the 
program is like this:

-
val file = sc.textFile("/path/to/40G/file")
//file.cache()   //to enable or disable cache


val titles = file.map(line => (doc_key, getTitle())  // reduce 1; here I 
use text utility functions written in Java
  {
  }).reduceByKey(_ + _,1)


val tables = file.flatMap(line => {
 
  for (table <- all_tables)
yield (doc_key, getTableTitle())  // reduce 2; here I use text 
utility functions written in Java
}).reduceByKey(_ + _,1)

titles.saveAsTextFile("titles.out")   //save_1, will trigger reduce_1
tables.saveAsTextFile("tables.out") //save_2, will trigger reduce_2
-

I expect that with file.cache(), (the later) reduce_2 should be faster 
since it will read from cached data. However, results repeatedly shows 
that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why 
reading from cache does not help in this case?

Stage GUI shows that, with cache, reduce_2 always has a wave of "outlier 
tasks", where the median latency is 2s but max is 1.7 min. 

Metric
Min
25th percentile
Median
75th percentile
Max
Result serialization time
0 ms
0 ms
0 ms
0 ms
1 ms
Duration
0.6 s
2 s
2 s
2 s
1.7 min

But these tasks are not with a long GC pause (26 ms as shown)

173
1210
SUCCESS
PROCESS_LOCAL
localhost
2014/06/17 17:49:43
1.7 min 
26 ms 


9.4 KB 



BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these 
lines in spark-env.sh

SPARK_WORKER_MEMORY=180g
SPARK_MEM=180g
SPARK_JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=500 
-XX:MaxPermSize=256m"


Thanks,

Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Patrick Wendell
Hey Jeremy,

This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
make a 1.0.1 release soon (this patch being one of the main reasons),
but if you are itching for this sooner, you can just checkout the head
of branch-1.0 and you will be able to use r3.XXX instances.

- Patrick

On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
 wrote:
> Some people (me included) might have wondered why all our m1.large spot
> instances (in us-west-1) shut down a few hours ago...
>
> Simple reason: The EC2 spot price for Spark's default "m1.large" instances
> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably
> something to do with world cup.
>
> So far this is just us-west-1, but prices have a tendency to equalize across
> centers as the days pass. Time to make backups and plans.
>
> "m3" spot prices are still down at $0.02 (and being new, will be bypassed by
> older systems), so it would be REAAALLYY nice if there had been some
> progress on that issue. Let me know if I can help with testing and whatnot.
>
>
> --
> Jeremy Lee  BCompSci(Hons)
>   The Unorthodox Engineers


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
Yup, alright, same solution then :)


On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi  wrote:

> I used --privileged to start the container and then unmounted /etc/hosts.
> Then I created a new /etc/hosts file
>
>
> On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson 
> wrote:
>
>> I remember having to do a similar thing in the spark docker scripts for
>> testing purposes. Were you able to modify the /etc/hosts directly? I
>> remember issues with that as docker apparently mounts it as part of its
>> read-only filesystem.
>>
>>
>> On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi 
>> wrote:
>>
>>> It was a DNS issue. AKKA apparently uses the hostname of the endpoints
>>> and hence they need to be resolvable. In my case the hostname of the docker
>>> container was a randomly generated string and was not resolvable. I added a
>>> workaround (entry in etc/hosts file of spark master) for now. If anyone can
>>> point to a more elegant solution, that would be awesome!
>>>
>>>
>>> On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi 
>>> wrote:
>>>
 I am using cutting edge code from git but doing my own sbt assembly.


 On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
 schum...@icsi.berkeley.edu> wrote:

>
> Hi,
>
> are you using the amplab/spark-1.0.0 images from the global registry?
>
> Andre
>
> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
> > Hi Folks,
> >
> > I am having trouble getting spark driver running in docker. If I run
> a
> > pyspark example on my mac it works but the same example on a docker
> image
> > (Via boot2docker) fails with following logs. I am pointing the spark
> driver
> > (which is running the example) to a spark cluster (driver is not
> part of
> > the cluster). I guess this has something to do with docker's
> networking
> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
> > spark-worker or spark-master is trying to create a new TCP
> connection to
> > the driver, instead of responding on the connection initiated by the
> driver.
> >
> > I would appreciate any help in figuring this out.
> >
> > Thanks,
> >
> > Mohit.
> >
> > logs
> >
> > Spark Executor Command: "java" "-cp"
> >
> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler"
> "1"
> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
> > "app-20140616152201-0021"
> >
> > 
> >
> >
> > log4j:WARN No appenders could be found for logger
> > (org.apache.hadoop.conf.Configuration).
> >
> > log4j:WARN Please initialize the log4j system properly.
> >
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
> for
> > more info.
> >
> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
> > profile: org/apache/spark/log4j-defaults.properties
> >
> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
> ayasdi,root
> >
> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
> authentication
> > disabled; ui acls disabled; users with view permissions: Set(xxx,
> xxx)
> >
> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
> >
> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
> addresses
> > :[akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
> > [akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
> driver:
> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
> >
> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
> > akka.tcp://sparkWorker@:33952/user/Worker
> >
> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
> remote
> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
> for
> > 6 ms, all messages to this address will be delivered to dead
> letters.
> >
> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
> Disassociated
> > [akka.tcp://sparkExecutor@:33536] ->
> [akka.tcp://spark@fc31887475e3:43921]
> > disassociated! Shutting down.
> >
>
>

>>>
>>
>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 8:49), Xiangrui Meng wrote:

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem.


It seems that it could cuase certain problems for a convex optimization 
of large training data and a merging tree, like allreduce, would help to 
reduce memory requirements (though time for aggregation might increase).



Which deploy mode are you using, standalone or local?


Standalone.

Setting -driver-memory 8G was not solved the aggregate problem.
Aggregation never finishes.

`ps aux | grep spark` on master is as follows:

myui  7049 79.3  1.1 8768868 592348 pts/2  Sl+  11:10   0:14 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -Djava.library.path= -Xms2g -Xmx2g 
org.apache.spark.deploy.SparkSubmit spark-shell --driver-memory 8G 
--class org.apache.spark.repl.Main


myui  5694  2.5  0.5 6868296 292572 pts/2  Sl   10:59   0:17 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m 
-Xmx512m org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081



Exporting SPARK_DAEMON_MEMORY=4g in spark-env.sh did not take effect for 
the evaluation.


`ps aux | grep spark`
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms4g -Xmx4g 
org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081

...


Thanks,
Makoto


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I used --privileged to start the container and then unmounted /etc/hosts.
Then I created a new /etc/hosts file


On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson  wrote:

> I remember having to do a similar thing in the spark docker scripts for
> testing purposes. Were you able to modify the /etc/hosts directly? I
> remember issues with that as docker apparently mounts it as part of its
> read-only filesystem.
>
>
> On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi  wrote:
>
>> It was a DNS issue. AKKA apparently uses the hostname of the endpoints
>> and hence they need to be resolvable. In my case the hostname of the docker
>> container was a randomly generated string and was not resolvable. I added a
>> workaround (entry in etc/hosts file of spark master) for now. If anyone can
>> point to a more elegant solution, that would be awesome!
>>
>>
>> On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi 
>> wrote:
>>
>>> I am using cutting edge code from git but doing my own sbt assembly.
>>>
>>>
>>> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
>>> schum...@icsi.berkeley.edu> wrote:
>>>

 Hi,

 are you using the amplab/spark-1.0.0 images from the global registry?

 Andre

 On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
 > Hi Folks,
 >
 > I am having trouble getting spark driver running in docker. If I run a
 > pyspark example on my mac it works but the same example on a docker
 image
 > (Via boot2docker) fails with following logs. I am pointing the spark
 driver
 > (which is running the example) to a spark cluster (driver is not part
 of
 > the cluster). I guess this has something to do with docker's
 networking
 > stack (it may be getting NAT'd) but I am not sure why (if at all) the
 > spark-worker or spark-master is trying to create a new TCP connection
 to
 > the driver, instead of responding on the connection initiated by the
 driver.
 >
 > I would appreciate any help in figuring this out.
 >
 > Thanks,
 >
 > Mohit.
 >
 > logs
 >
 > Spark Executor Command: "java" "-cp"
 >
 "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
 > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
 > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
 > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
 > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
 > "app-20140616152201-0021"
 >
 > 
 >
 >
 > log4j:WARN No appenders could be found for logger
 > (org.apache.hadoop.conf.Configuration).
 >
 > log4j:WARN Please initialize the log4j system properly.
 >
 > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
 for
 > more info.
 >
 > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
 > profile: org/apache/spark/log4j-defaults.properties
 >
 > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
 ayasdi,root
 >
 > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager:
 authentication
 > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
 >
 > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
 >
 > 14/06/16 15:22:05 INFO Remoting: Starting remoting
 >
 > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
 addresses
 > :[akka.tcp://sparkExecutor@:33536]
 >
 > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
 > [akka.tcp://sparkExecutor@:33536]
 >
 > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
 > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
 >
 > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
 > akka.tcp://sparkWorker@:33952/user/Worker
 >
 > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
 remote
 > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
 for
 > 6 ms, all messages to this address will be delivered to dead
 letters.
 >
 > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
 > [akka.tcp://sparkExecutor@:33536] ->
 [akka.tcp://spark@fc31887475e3:43921]
 > disassociated! Shutting down.
 >


>>>
>>
>


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Couple more points:
1)The inexplicable stalling of execution with large feature sets appears
similar to that reported with the news-20 dataset:
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E

2) The NPE trying to call mapToPair convert an RDD into a JavaPairRDD, Tuple2> is
unrelated to mllib.

Thanks,
Bharath



On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar 
wrote:

> Hi  Xiangrui ,
>
> I'm using 1.0.0.
>
> Thanks,
> Bharath
> On 18-Jun-2014 1:43 am, "Xiangrui Meng"  wrote:
>
>> Hi Bharath,
>>
>> Thanks for posting the details! Which Spark version are you using?
>>
>> Best,
>> Xiangrui
>>
>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
>> wrote:
>> > Hi,
>> >
>> > (Apologies for the long mail, but it's necessary to provide sufficient
>> > details considering the number of issues faced.)
>> >
>> > I'm running into issues testing LogisticRegressionWithSGD a two node
>> cluster
>> > (each node with 24 cores and 16G available to slaves out of 24G on the
>> > system). Here's a description of the application:
>> >
>> > The model is being trained based on categorical features x, y, and
>> (x,y).
>> > The categorical features are mapped to binary features by converting
>> each
>> > distinct value in the category enum into a binary feature by itself (i.e
>> > presence of that value in a record implies corresponding feature = 1,
>> else
>> > feature = 0. So, there'd be as many distinct features as enum values) .
>> The
>> > training vector is laid out as
>> > [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
>> > training data has only one combination (Xk,Yk) and a label appearing in
>> the
>> > record. Thus, the corresponding labeledpoint sparse vector would only
>> have 3
>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
>> > (though parse) would be nearly 614000.  The number of records is about
>> 1.33
>> > million. The records have been coalesced into 20 partitions across two
>> > nodes. The input data has not been cached.
>> > (NOTE: I do realize the records & features may seem large for a two node
>> > setup, but given the memory & cpu, and the fact that I'm willing to
>> give up
>> > some turnaround time, I don't see why tasks should inexplicably fail)
>> >
>> > Additional parameters include:
>> >
>> > spark.executor.memory = 14G
>> > spark.default.parallelism = 1
>> > spark.cores.max=20
>> > spark.storage.memoryFraction=0.8 //No cache space required
>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
>> help
>> > either)
>> >
>> > The model training was initialized as : new LogisticRegressionWithSGD(1,
>> > maxIterations, 0.0, 0.05)
>> >
>> > However, after 4 iterations of gradient descent, the entire execution
>> > appeared to stall inexplicably. The corresponding executor details and
>> > details of the stalled stage (number 14) are as follows:
>> >
>> > MetricMin25th Median75th Max
>> > Result serialization time12 ms13 ms14 ms16 ms18 ms
>> > Duration4 s4 s5 s5 s
>>  5 s
>> > Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
>> > results
>> > Scheduler delay6 s6 s6 s6 s
>> > 12 s
>> >
>> >
>> > Stage Id
>> > 14 aggregate at GradientDescent.scala:178
>> >
>> > Task IndexTask IDStatusLocality Level Executor
>> > Launch TimeDurationGC Result Ser TimeErrors
>> >
>> > Time
>> >
>> > 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 1.1 h
>> > 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 1.1 h
>> > 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 1.1 h
>> > 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 1.1 h
>> > 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 1.1 h
>> > 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 4 s 2 s 12 ms
>> > 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 4 s 1 s 14 ms
>> > 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 4 s 2 s 12 ms
>> > 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 5 s 1 s 15 ms
>> > 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 5 s 1 s 14 ms
>> > 10 610 SUCCESS PROCESS_LOCAL
>> serious.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 5 s 1 s 15 ms
>> > 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>> > 2014/06/17 10:32:27 4 

Re: Yarn-client mode and standalone-client mode hang during job start

2014-06-17 Thread Jianshi Huang
Hi Andrew,

I submitted that within the cluster. Looks like the standalone-cluster mode
didn't put the jars to its http server, and passed the file:/... to the
driver node. That's why the driver node couldn't find the jars.

However, I copied my files to all slaves, it still didn't work, see my
second email.

I have no idea why yarn-client didn't work. I'm suspecting the following
code is problematic, possible?

I have multiple files that needs the SparkContext, so I put it in an object
(instead of the main function), and SContext is imported to multiple places.

object SContext {

  var conf = new
SparkConf().setAppName(Conf.getString("spark.conf.app_name")).setMaster(Conf.getString("spark.conf.master"))
  var sc = new SparkContext(conf)

}

spark.conf.master is "yarn-cluster" in my application.conf, but I think
spark-submit will override the master mode, right?



Jianshi




On Wed, Jun 18, 2014 at 12:37 AM, Andrew Or  wrote:

> Standalone-client mode is not officially supported at the moment. For
> standalone-cluster and yarn-client modes, however, they should work.
>
> For both modes, are you running spark-submit from within the cluster, or
> outside of it? If the latter, could you try running it from within the
> cluster and see if it works? (Does your rtgraph.jar exist on the machine
> from which you run spark-submit?)
>
>
> 2014-06-17 2:41 GMT-07:00 Jianshi Huang :
>
> Hi,
>>
>> I've stuck using either yarn-client or standalone-client mode. Either
>> will stuck when I submit jobs, the last messages it printed were:
>>
>> ...
>> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
>> file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at
>> http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp
>> 1402997837065
>> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
>> file:/x/home/jianshuang/tmp/rtgraph.jar at
>> http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065
>> 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created
>> YarnClusterScheduler
>> 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown
>> hook for context org.apache.spark.SparkContext@6655cf60
>>
>> I can use yarn-cluster to run my app but it's not very convenient to
>> monitor the progress.
>>
>> Standalone-cluster mode doesn't work, it reports file not found error:
>>
>> Driver successfully submitted as driver-20140617023956-0003
>> ... waiting before polling master for driver state
>> ... polling master for driver state
>> State of driver-20140617023956-0003 is ERROR
>> Exception from cluster was: java.io.FileNotFoundException: File
>> file:/x/home/jianshuang/tmp/rtgraph.jar does not exist
>>
>>
>> I'm using Spark 1.0.0 and my submit command looks like this:
>>
>>   ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph'
>> --class com.paypal.rtgraph.demo.MapReduceWriter --master spark://
>> lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n'
>> ','` --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster
>> rtgraph.jar
>>
>> List of jars I put in --jars option are:
>>
>> accumulo-core.jar
>> accumulo-fate.jar
>> accumulo-minicluster.jar
>> accumulo-trace.jar
>> accumulo-tracer.jar
>> chill_2.10-0.3.6.jar
>> commons-math.jar
>> commons-vfs2.jar
>> config-1.2.1.jar
>> gson.jar
>> guava.jar
>> joda-convert-1.2.jar
>> joda-time-2.3.jar
>> kryo-2.21.jar
>> libthrift.jar
>> quasiquotes_2.10-2.0.0-M8.jar
>> scala-async_2.10-0.9.1.jar
>> scala-library-2.10.4.jar
>> scala-reflect-2.10.4.jar
>>
>>
>> Anyone has hint what went wrong? Really confused.
>>
>>
>> Cheers,
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: NullPointerExceptions when using val or broadcast on a standalone cluster.

2014-06-17 Thread bdamos
Hi, I think this is a bug in Spark, because changing my program to using
a main method instead of using the App trait fixes this problem.
I've filed this as SPARK-2175, apologies if this turns out to be a
duplicate.

https://issues.apache.org/jira/browse/SPARK-2175

Regards,
Brandon.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524p7797.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming Example with CDH5

2014-06-17 Thread manas Kar
Hi Spark Gurus, 
 I am trying to compile a spark streaming example with CDH5 and having
problem compiling it. 
Has anyone created an example spark streaming using CDH5(preferably Spark
0.9.1) and would be kind enough to share the build.sbt(.scala) file?(or
point to their example on github). I know there is a streaming example  here
   but I am looking
for something that runs with CDH5.


My build.scala files looks like given below.

 object Dependency {
// Versions
object V {
val Akka = "2.3.0"
val scala = "2.10.4" 
val cloudera = "0.9.0-cdh5.0.0"
}

val sparkCore  = "org.apache.spark" %% "spark-core"% V.cloudera
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % V.cloudera

resolvers ++= Seq( "cloudera repo" at
"https://repository.cloudera.com/artifactory/cloudera-repos/";,
  "haddop repo" at
"https://repository.cloudera.com/content/repositories/releases/";)

I have also attached the complete build.scala file for sake of completeness.
sbt dist gives the following error:
 object SecurityManager is not a member of package org.apache.spark
[error] import org.apache.spark.{SparkConf, SecurityManager}


build.scala
  


Appreciate the great work the spark community is doing. It is by far the
best thing I have worked on.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Example-with-CDH5-tp7796.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark SQL: No function to evaluate expression

2014-06-17 Thread Tobias Pfeiffer
The error message *means* that there is no column called c_address.
However, maybe it's a bug with Spark SQL not understanding the
a.c_address syntax. Can you double-check the column name is correct?

Thanks
Tobias

On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat
 wrote:
> Dear all,
>
> I am trying to run the following query on Spark SQL using some custom TPC-H
> tables with standalone Spark cluster configuration:
>
> SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE
> a.c_address <> b.c_address;
>
> Unfortunately I get the following error during execution:
>
> java.lang.reflect.InvocationTargetException
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
>
> at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure
> in TID 12 on host kw2260.kaust.edu.sa:
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function
> to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address
>
>
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59)
>
>
> org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147)
>
>
> org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74)
>
>
> org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100)
>
>
> Is this a bug or am I doing something wrong?
>
>
> Regards,
>
> Zuhair Khayyat


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Hi  Xiangrui ,

I'm using 1.0.0.

Thanks,
Bharath
On 18-Jun-2014 1:43 am, "Xiangrui Meng"  wrote:

> Hi Bharath,
>
> Thanks for posting the details! Which Spark version are you using?
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
> wrote:
> > Hi,
> >
> > (Apologies for the long mail, but it's necessary to provide sufficient
> > details considering the number of issues faced.)
> >
> > I'm running into issues testing LogisticRegressionWithSGD a two node
> cluster
> > (each node with 24 cores and 16G available to slaves out of 24G on the
> > system). Here's a description of the application:
> >
> > The model is being trained based on categorical features x, y, and (x,y).
> > The categorical features are mapped to binary features by converting each
> > distinct value in the category enum into a binary feature by itself (i.e
> > presence of that value in a record implies corresponding feature = 1,
> else
> > feature = 0. So, there'd be as many distinct features as enum values) .
> The
> > training vector is laid out as
> > [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
> > training data has only one combination (Xk,Yk) and a label appearing in
> the
> > record. Thus, the corresponding labeledpoint sparse vector would only
> have 3
> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
> > (though parse) would be nearly 614000.  The number of records is about
> 1.33
> > million. The records have been coalesced into 20 partitions across two
> > nodes. The input data has not been cached.
> > (NOTE: I do realize the records & features may seem large for a two node
> > setup, but given the memory & cpu, and the fact that I'm willing to give
> up
> > some turnaround time, I don't see why tasks should inexplicably fail)
> >
> > Additional parameters include:
> >
> > spark.executor.memory = 14G
> > spark.default.parallelism = 1
> > spark.cores.max=20
> > spark.storage.memoryFraction=0.8 //No cache space required
> > (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
> help
> > either)
> >
> > The model training was initialized as : new LogisticRegressionWithSGD(1,
> > maxIterations, 0.0, 0.05)
> >
> > However, after 4 iterations of gradient descent, the entire execution
> > appeared to stall inexplicably. The corresponding executor details and
> > details of the stalled stage (number 14) are as follows:
> >
> > MetricMin25th Median75th Max
> > Result serialization time12 ms13 ms14 ms16 ms18 ms
> > Duration4 s4 s5 s5 s
>  5 s
> > Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
> > results
> > Scheduler delay6 s6 s6 s6 s
> > 12 s
> >
> >
> > Stage Id
> > 14 aggregate at GradientDescent.scala:178
> >
> > Task IndexTask IDStatusLocality Level Executor
> > Launch TimeDurationGC Result Ser TimeErrors
> >
> > Time
> >
> > 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 1.1 h
> > 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 1.1 h
> > 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 1.1 h
> > 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 1.1 h
> > 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 1.1 h
> > 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 2 s 12 ms
> > 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 1 s 14 ms
> > 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 2 s 12 ms
> > 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 5 s 1 s 15 ms
> > 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 5 s 1 s 14 ms
> > 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 5 s 1 s 15 ms
> > 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 1 s 13 ms
> > 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 5 s 1 s 18 ms
> > 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 5 s 1 s 13 ms
> > 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 1 s 14 ms
> > 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> > 2014/06/17 10:32:27 4 s 1 s 12 ms
> > 16 616 SUCCESS

Re: Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
Perfect!! That makes so much sense to me now. Thanks a ton



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7793.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Issue while trying to aggregate with a sliding window

2014-06-17 Thread onpoq l
There is a bug:

https://github.com/apache/spark/pull/961#issuecomment-45125185


On Tue, Jun 17, 2014 at 8:19 PM, Hatch M  wrote:
> Trying to aggregate over a sliding window, playing with the slide duration.
> Playing around with the slide interval I can see the aggregation works but
> mostly fails with the below error. The stream has records coming in at
> 100ms.
>
> JavaPairDStream aggregatedDStream =
> pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
> Duration(60));
>
> 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
> invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and
> difference is 1100 ms
> 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
> 1403050486900 ms
> java.util.NoSuchElementException: key not found: 1403050486900 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
>
> Any hints on whats going on here?
> Thanks!
> Hatch
>


Issue while trying to aggregate with a sliding window

2014-06-17 Thread Hatch M
Trying to aggregate over a sliding window, playing with the slide duration.
Playing around with the slide interval I can see the aggregation works but
mostly fails with the below error. The stream has records coming in at
100ms.

JavaPairDStream aggregatedDStream =
pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
Duration(60));

14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and
difference is 1100 ms
14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
1403050486900 ms
java.util.NoSuchElementException: key not found: 1403050486900 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)

Any hints on whats going on here?
Thanks!
Hatch


Re: Executors not utilized properly.

2014-06-17 Thread Aaron Davidson
repartition() is actually just an alias of coalesce(), but which the
shuffle flag to set to true. This shuffle is probably what you're seeing as
taking longer, but it is required when you go from a smaller number of
partitions to a larger.

When actually decreasing the number of partitions, coalesce(shuffle =
false) will be fully pipelined, but is limited in how it can redistribute
data, as it can only combine whole partitions into larger partitions. For
example, if you have an rdd with 101 partitions, and you do
rdd.coalesce(100, shuffle = false), then the resultant rdd will have 99 of
the original partitions, and 1 partition will just be 2 original partitions
combined. This can lead to increased data skew, but requires no effort to
create.

On the other hand, if you do rdd.coalesce(100, shuffle = true), then all of
the data will actually be reshuffled into 100 new evenly-sized partitions,
eliminating any data skew at the cost of actually moving all data around.


On Tue, Jun 17, 2014 at 4:52 PM, abhiguruvayya 
wrote:

> I found the main reason to be that i was using coalesce instead of
> repartition. coalesce was shrinking the portioning so the number of tasks
> were very less to be executed by all of the executors. Can you help me in
> understudying when to use coalesce and when to use repartition. In
> application coalesce is being processed faster then repartition. Which is
> unusual.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui

On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng  wrote:
> DB, Yes, reduce and aggregate are linear.
>
> Makoto, dense vectors are used to in aggregation. If you have 32
> partitions and each one sending a dense vector of size 1,354,731 to
> master. Then the driver needs 300M+. That may be the problem. Which
> deploy mode are you using, standalone or local?
>
> Debasish, there is an old PR for butterfly allreduce. However, it
> doesn't seem to be the right way to go for Spark. I just sent out the
> PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
> needs more testing before we are confident to merge it. It would be
> great if you can help test it.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das  
> wrote:
>> Xiangrui,
>>
>> Could you point to the JIRA related to tree aggregate ? ...sounds like the
>> allreduce idea...
>>
>> I would definitely like to try it on our dataset...
>>
>> Makoto,
>>
>> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
>> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
>> memory...
>>
>> Although the best result on the same dataset came out of liblinear and
>> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
>> other heuristics...it was arnd 5% off...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai  wrote:
>>>
>>> Hi Xiangrui,
>>>
>>> Does it mean that mapPartition and then reduce shares the same
>>> behavior as aggregate operation which is O(n)?
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> ---
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
>>> > Hi DB,
>>> >
>>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>>> > compromise between current reduce and butterfly allReduce. The former
>>> > runs in linear time on the number of partitions, the latter introduces
>>> > too many dependencies. treeAggregate with depth = 2 should run in
>>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>>> > great if someone can help test its scalability.
>>> >
>>> > Best,
>>> > Xiangrui
>>> >
>>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>>> >> Hi Xiangrui,
>>> >>
>>> >>
>>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>>> >>>
>>> >>> How many partitions did you set? If there are too many partitions,
>>> >>> please do a coalesce before calling ML algorithms.
>>> >>
>>> >>
>>> >> The training data "news20.random.1000" is small and thus only 2
>>> >> partitions
>>> >> are used by the default.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false).
>>> >>
>>> >> We also tried 32 partitions as follows but the aggregate never
>>> >> finishes.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>> >>
>>> >>
>>> >>> Btw, could you try the tree branch in my repo?
>>> >>> https://github.com/mengxr/spark/tree/tree
>>> >>>
>>> >>> I used tree aggregate in this branch. It should help with the
>>> >>> scalability.
>>> >>
>>> >>
>>> >> Is treeAggregate itself available on Spark 1.0?
>>> >>
>>> >> I wonder.. Could I test your modification just by running the following
>>> >> code
>>> >> on REPL?
>>> >>
>>> >> ---
>>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>>> >> i)
>>> >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>> >>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>>> >> features)) =>
>>> >> val l = gradient.compute(features, label, weights,
>>> >> Vectors.fromBreeze(grad))
>>> >> (grad, loss + l)
>>> >>   },
>>> >>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>>> >> (grad2, loss2)) =>
>>> >> (grad1 += grad2, loss1 + loss2)
>>> >>   }, 2)
>>> >> -
>>> >>
>>> >> Rebuilding Spark is quite something to do evaluation.
>>> >>
>>> >> Thanks,
>>> >> Makoto
>>
>>


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
I remember having to do a similar thing in the spark docker scripts for
testing purposes. Were you able to modify the /etc/hosts directly? I
remember issues with that as docker apparently mounts it as part of its
read-only filesystem.


On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi  wrote:

> It was a DNS issue. AKKA apparently uses the hostname of the endpoints and
> hence they need to be resolvable. In my case the hostname of the docker
> container was a randomly generated string and was not resolvable. I added a
> workaround (entry in etc/hosts file of spark master) for now. If anyone can
> point to a more elegant solution, that would be awesome!
>
>
> On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi  wrote:
>
>> I am using cutting edge code from git but doing my own sbt assembly.
>>
>>
>> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
>> schum...@icsi.berkeley.edu> wrote:
>>
>>>
>>> Hi,
>>>
>>> are you using the amplab/spark-1.0.0 images from the global registry?
>>>
>>> Andre
>>>
>>> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
>>> > Hi Folks,
>>> >
>>> > I am having trouble getting spark driver running in docker. If I run a
>>> > pyspark example on my mac it works but the same example on a docker
>>> image
>>> > (Via boot2docker) fails with following logs. I am pointing the spark
>>> driver
>>> > (which is running the example) to a spark cluster (driver is not part
>>> of
>>> > the cluster). I guess this has something to do with docker's networking
>>> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
>>> > spark-worker or spark-master is trying to create a new TCP connection
>>> to
>>> > the driver, instead of responding on the connection initiated by the
>>> driver.
>>> >
>>> > I would appreciate any help in figuring this out.
>>> >
>>> > Thanks,
>>> >
>>> > Mohit.
>>> >
>>> > logs
>>> >
>>> > Spark Executor Command: "java" "-cp"
>>> >
>>> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
>>> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
>>> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
>>> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
>>> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
>>> > "app-20140616152201-0021"
>>> >
>>> > 
>>> >
>>> >
>>> > log4j:WARN No appenders could be found for logger
>>> > (org.apache.hadoop.conf.Configuration).
>>> >
>>> > log4j:WARN Please initialize the log4j system properly.
>>> >
>>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>>> for
>>> > more info.
>>> >
>>> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
>>> > profile: org/apache/spark/log4j-defaults.properties
>>> >
>>> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
>>> ayasdi,root
>>> >
>>> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
>>> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
>>> >
>>> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
>>> >
>>> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
>>> >
>>> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
>>> addresses
>>> > :[akka.tcp://sparkExecutor@:33536]
>>> >
>>> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
>>> > [akka.tcp://sparkExecutor@:33536]
>>> >
>>> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
>>> driver:
>>> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
>>> >
>>> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
>>> > akka.tcp://sparkWorker@:33952/user/Worker
>>> >
>>> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
>>> remote
>>> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated
>>> for
>>> > 6 ms, all messages to this address will be delivered to dead
>>> letters.
>>> >
>>> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
>>> Disassociated
>>> > [akka.tcp://sparkExecutor@:33536] ->
>>> [akka.tcp://spark@fc31887475e3:43921]
>>> > disassociated! Shutting down.
>>> >
>>>
>>>
>>
>


Re: Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
I found the main reason to be that i was using coalesce instead of
repartition. coalesce was shrinking the portioning so the number of tasks
were very less to be executed by all of the executors. Can you help me in
understudying when to use coalesce and when to use repartition. In
application coalesce is being processed faster then repartition. Which is
unusual.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
DB, Yes, reduce and aggregate are linear.

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem. Which
deploy mode are you using, standalone or local?

Debasish, there is an old PR for butterfly allreduce. However, it
doesn't seem to be the right way to go for Spark. I just sent out the
PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
needs more testing before we are confident to merge it. It would be
great if you can help test it.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das  wrote:
> Xiangrui,
>
> Could you point to the JIRA related to tree aggregate ? ...sounds like the
> allreduce idea...
>
> I would definitely like to try it on our dataset...
>
> Makoto,
>
> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
> memory...
>
> Although the best result on the same dataset came out of liblinear and
> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
> other heuristics...it was arnd 5% off...
>
> Thanks.
> Deb
>
>
>
> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai  wrote:
>>
>> Hi Xiangrui,
>>
>> Does it mean that mapPartition and then reduce shares the same
>> behavior as aggregate operation which is O(n)?
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
>> > Hi DB,
>> >
>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>> > compromise between current reduce and butterfly allReduce. The former
>> > runs in linear time on the number of partitions, the latter introduces
>> > too many dependencies. treeAggregate with depth = 2 should run in
>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>> > great if someone can help test its scalability.
>> >
>> > Best,
>> > Xiangrui
>> >
>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>> >> Hi Xiangrui,
>> >>
>> >>
>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>> >>>
>> >>> How many partitions did you set? If there are too many partitions,
>> >>> please do a coalesce before calling ML algorithms.
>> >>
>> >>
>> >> The training data "news20.random.1000" is small and thus only 2
>> >> partitions
>> >> are used by the default.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false).
>> >>
>> >> We also tried 32 partitions as follows but the aggregate never
>> >> finishes.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>> >>
>> >>
>> >>> Btw, could you try the tree branch in my repo?
>> >>> https://github.com/mengxr/spark/tree/tree
>> >>>
>> >>> I used tree aggregate in this branch. It should help with the
>> >>> scalability.
>> >>
>> >>
>> >> Is treeAggregate itself available on Spark 1.0?
>> >>
>> >> I wonder.. Could I test your modification just by running the following
>> >> code
>> >> on REPL?
>> >>
>> >> ---
>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>> >> i)
>> >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>> >>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> >> features)) =>
>> >> val l = gradient.compute(features, label, weights,
>> >> Vectors.fromBreeze(grad))
>> >> (grad, loss + l)
>> >>   },
>> >>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> >> (grad2, loss2)) =>
>> >> (grad1 += grad2, loss1 + loss2)
>> >>   }, 2)
>> >> -
>> >>
>> >> Rebuilding Spark is quite something to do evaluation.
>> >>
>> >> Thanks,
>> >> Makoto
>
>


Re: Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
My use case was to read 3000 files from 3000 different HDFS directories so i
was reading each file and creating RDD and adding it to array of JavaRDD
then do a union(rdd...). Because of this my prog was very slow(5 minutes).
After i replaced this logic with textFile(path1,path2,path3) it is working
super fast(56 sec). So union() was the overhead.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
It was a DNS issue. AKKA apparently uses the hostname of the endpoints and
hence they need to be resolvable. In my case the hostname of the docker
container was a randomly generated string and was not resolvable. I added a
workaround (entry in etc/hosts file of spark master) for now. If anyone can
point to a more elegant solution, that would be awesome!


On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi  wrote:

> I am using cutting edge code from git but doing my own sbt assembly.
>
>
> On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
> schum...@icsi.berkeley.edu> wrote:
>
>>
>> Hi,
>>
>> are you using the amplab/spark-1.0.0 images from the global registry?
>>
>> Andre
>>
>> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
>> > Hi Folks,
>> >
>> > I am having trouble getting spark driver running in docker. If I run a
>> > pyspark example on my mac it works but the same example on a docker
>> image
>> > (Via boot2docker) fails with following logs. I am pointing the spark
>> driver
>> > (which is running the example) to a spark cluster (driver is not part of
>> > the cluster). I guess this has something to do with docker's networking
>> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
>> > spark-worker or spark-master is trying to create a new TCP connection to
>> > the driver, instead of responding on the connection initiated by the
>> driver.
>> >
>> > I would appreciate any help in figuring this out.
>> >
>> > Thanks,
>> >
>> > Mohit.
>> >
>> > logs
>> >
>> > Spark Executor Command: "java" "-cp"
>> >
>> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
>> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
>> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
>> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
>> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
>> > "app-20140616152201-0021"
>> >
>> > 
>> >
>> >
>> > log4j:WARN No appenders could be found for logger
>> > (org.apache.hadoop.conf.Configuration).
>> >
>> > log4j:WARN Please initialize the log4j system properly.
>> >
>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>> for
>> > more info.
>> >
>> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
>> > profile: org/apache/spark/log4j-defaults.properties
>> >
>> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
>> ayasdi,root
>> >
>> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
>> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
>> >
>> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
>> >
>> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
>> >
>> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
>> addresses
>> > :[akka.tcp://sparkExecutor@:33536]
>> >
>> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
>> > [akka.tcp://sparkExecutor@:33536]
>> >
>> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
>> driver:
>> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
>> >
>> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
>> > akka.tcp://sparkWorker@:33952/user/Worker
>> >
>> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
>> remote
>> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
>> > 6 ms, all messages to this address will be delivered to dead
>> letters.
>> >
>> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
>> Disassociated
>> > [akka.tcp://sparkExecutor@:33536] -> [akka.tcp://spark@fc31887475e3
>> :43921]
>> > disassociated! Shutting down.
>> >
>>
>>
>


Re: Spark streaming RDDs to Parquet records

2014-06-17 Thread Michael Armbrust
If you convert the data to a SchemaRDD you can save it as Parquet:
http://spark.apache.org/docs/latest/sql-programming-guide.html#using-parquet


On Tue, Jun 17, 2014 at 11:47 PM, Padmanabhan, Mahesh (contractor) <
mahesh.padmanab...@twc-contractor.com> wrote:

>  Thanks Krishna. Seems like you have to use Avro and then convert that to
> Parquet. I was hoping to directly convert RDDs to Parquet files. I’ll look
> into this some more.
>
>  Thanks,
> Mahesh
>
>   From: Krishna Sankar 
> Reply-To: "user@spark.apache.org" 
> Date: Tuesday, June 17, 2014 at 2:41 PM
> To: "user@spark.apache.org" 
> Subject: Re: Spark streaming RDDs to Parquet records
>
>  Mahesh,
>
>- One direction could be : create a parquet schema, convert & save the
>records to hdfs.
>- This might help
>
> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala
>
>  Cheers
> 
>
>
> On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc <
> mahesh.padmanab...@twc-contractor.com> wrote:
>
>> Hello,
>>
>> Is there an easy way to convert RDDs within a DStream into Parquet
>> records?
>> Here is some incomplete pseudo code:
>>
>> // Create streaming context
>> val ssc = new StreamingContext(...)
>>
>> // Obtain a DStream of events
>> val ds = KafkaUtils.createStream(...)
>>
>> // Get Spark context to get to the SQL context
>> val sc = ds.context.sparkContext
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>
>> // For each RDD
>> ds.foreachRDD((rdd: RDD[Array[Byte]]) => {
>>
>> // What do I do next?
>> })
>>
>> Thanks,
>> Mahesh
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
> --
> This E-mail and any of its attachments may contain Time Warner Cable
> proprietary information, which is privileged, confidential, or subject to
> copyright belonging to Time Warner Cable. This E-mail is intended solely
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient of this E-mail, you are hereby notified that
> any dissemination, distribution, copying, or action taken in relation to
> the contents of and attachments to this E-mail is strictly prohibited and
> may be unlawful. If you have received this E-mail in error, please notify
> the sender immediately and permanently delete the original and any copy of
> this E-mail and any printout.
>


Enormous EC2 price jump makes "r3.large" patch more important

2014-06-17 Thread Jeremy Lee
Some people (me included) might have wondered why all our m1.large spot
instances (in us-west-1) shut down a few hours ago...

Simple reason: The EC2 spot price for Spark's default "m1.large" instances
just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably
something to do with world cup.

So far this is just us-west-1, but prices have a tendency to equalize
across centers as the days pass. Time to make backups and plans.

"m3" spot prices are still down at $0.02 (and being new, will be bypassed
by older systems), so it would be REAAALLYY nice if there had been some
progress on that issue. Let me know if I can help with testing and whatnot.


-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Mohit Jaggi
I am using cutting edge code from git but doing my own sbt assembly.


On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher <
schum...@icsi.berkeley.edu> wrote:

>
> Hi,
>
> are you using the amplab/spark-1.0.0 images from the global registry?
>
> Andre
>
> On 06/17/2014 01:36 AM, Mohit Jaggi wrote:
> > Hi Folks,
> >
> > I am having trouble getting spark driver running in docker. If I run a
> > pyspark example on my mac it works but the same example on a docker image
> > (Via boot2docker) fails with following logs. I am pointing the spark
> driver
> > (which is running the example) to a spark cluster (driver is not part of
> > the cluster). I guess this has something to do with docker's networking
> > stack (it may be getting NAT'd) but I am not sure why (if at all) the
> > spark-worker or spark-master is trying to create a new TCP connection to
> > the driver, instead of responding on the connection initiated by the
> driver.
> >
> > I would appreciate any help in figuring this out.
> >
> > Thanks,
> >
> > Mohit.
> >
> > logs
> >
> > Spark Executor Command: "java" "-cp"
> >
> "::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> > "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
> > "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> > "akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
> > "cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
> > "app-20140616152201-0021"
> >
> > 
> >
> >
> > log4j:WARN No appenders could be found for logger
> > (org.apache.hadoop.conf.Configuration).
> >
> > log4j:WARN Please initialize the log4j system properly.
> >
> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> > more info.
> >
> > 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
> > profile: org/apache/spark/log4j-defaults.properties
> >
> > 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to:
> ayasdi,root
> >
> > 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)
> >
> > 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started
> >
> > 14/06/16 15:22:05 INFO Remoting: Starting remoting
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
> > [akka.tcp://sparkExecutor@:33536]
> >
> > 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to
> driver:
> > akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler
> >
> > 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
> > akka.tcp://sparkWorker@:33952/user/Worker
> >
> > 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable
> remote
> > address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
> > 6 ms, all messages to this address will be delivered to dead letters.
> >
> > 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver
> Disassociated
> > [akka.tcp://sparkExecutor@:33536] -> [akka.tcp://spark@fc31887475e3
> :43921]
> > disassociated! Shutting down.
> >
>
>


Why MLLib classes are so badly organized?

2014-06-17 Thread frol
Can anybody explain WHY:

1) LabeledPoint is in regression/LabeledPoint.scala? This cause import
regression modules from classification modules.

2) Vector and SparseVector are in linalg? OK. GeneralizedLinearModel is in
regression/GeneralizedLinearAlgorithm.scala? Really? 

3) LinearModel is in regression.py (Python MLLib module), but also imported
from classification.py?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-MLLib-classes-are-so-badly-organized-tp7780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Best practices for removing lineage of a RDD or Graph object?

2014-06-17 Thread dash
If a RDD object have non-empty .dependencies, does that means it have
lineage? How could I remove it?

I'm doing iterative computing and each iteration depends on the result
computed in previous iteration. After several iteration, it will throw
StackOverflowError.

At first I'm trying to use cache, I read the code in pregel.scala, which is
part of GraphX, they use a count method to materialize the object after
cache, but I attached a debugger and seems such approach does not empty
.dependencies, and that also does not work in my code.

Another alternative approach is using checkpoint, I tried checkpoint
vertices and edges for my Graph object and then materialize it by count
vertices and edges. Then I use .isCheckpointed to check if it is correctly
checkpointed, but it always return false.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark sql unable to connect to db2 hive metastore

2014-06-17 Thread Jenny Zhao
finally got it work out, mimicked how spark added datanucleus jars in
compute-classpath.sh, and added the db2jcc*.jar in the classpath, it works
now.

Thanks!


On Tue, Jun 17, 2014 at 10:50 AM, Jenny Zhao  wrote:

> Thanks Michael!
>
> as I run it using spark-shell, so I added both jars through
> bin/spark-shell --jars options.  I noticed if I don't pass these jars, it
> complains it couldn't find the driver, if I pass them through --jars
> options, it complains there is no suitable driver.
>
> Regards.
>
>
> On Tue, Jun 17, 2014 at 2:43 AM, Michael Armbrust 
> wrote:
>
>> First a clarification:  Spark SQL does not talk to HiveServer2, as that
>> JDBC interface is for retrieving results from queries that are executed
>> using Hive.  Instead Spark SQL will execute queries itself by directly
>> accessing your data using Spark.
>>
>> Spark SQL's Hive module can use JDBC to connect to an external metastore,
>> in your case DB2. This is only used to retrieve the metadata (i.e., column
>> names and types, HDFS locations for data)
>>
>> Looking at your exception I still see "java.sql.SQLException: No
>> suitable driver", so my guess would be that the DB2 JDBC drivers are not
>> being correctly included.  How are you trying to add them to the classpath?
>>
>> Michael
>>
>>
>> On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> my hive configuration use db2 as it's metastore database, I have built
>>> spark with the extra step sbt/sbt assembly/assembly to include the
>>> dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf.
>>> when I ran :
>>>
>>> hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>>
>>> got following exception, pasted portion of the stack trace here, looking
>>> at the stack, this made me wondering if Spark supports remote metastore
>>> configuration, it seems spark doesn't talk to hiveserver2 directly?  the
>>> driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are
>>> included in the classpath, otherwise, it will complain it couldn't find the
>>> driver.
>>>
>>> Appreciate any help to resolve it.
>>>
>>> Thanks!
>>>
>>> Caused by: java.sql.SQLException: Unable to open a test connection to
>>> the given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username =
>>> catalog. Terminating connection pool. Original Exception: --
>>> java.sql.SQLException: No suitable driver
>>> at java.sql.DriverManager.getConnection(DriverManager.java:422)
>>> at java.sql.DriverManager.getConnection(DriverManager.java:374)
>>> at
>>> com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254)
>>> at com.jolbox.bonecp.BoneCP.(BoneCP.java:305)
>>> at
>>> com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150)
>>> at
>>> com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112)
>>> at
>>> org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479)
>>> at
>>> org.datanucleus.store.rdbms.RDBMSStoreManager.(RDBMSStoreManager.java:304)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39)
>>> at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:527)
>>> at
>>> org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
>>> at
>>> org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
>>> at
>>> org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069)
>>> at
>>> org.datanucleus.NucleusContext.initialise(NucleusContext.java:359)
>>> at
>>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768)
>>> at
>>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326)
>>> at
>>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
>>> at java.lang.reflect.Method.invoke(Method.java:611)
>>> at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
>>> at
>>> java.security.AccessController.doPrivileged(AccessController.java:277)
>>> at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
>>> at
>>> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryO

Re: Spark streaming RDDs to Parquet records

2014-06-17 Thread contractor
Thanks Krishna. Seems like you have to use Avro and then convert that to 
Parquet. I was hoping to directly convert RDDs to Parquet files. I’ll look into 
this some more.

Thanks,
Mahesh

From: Krishna Sankar mailto:ksanka...@gmail.com>>
Reply-To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Date: Tuesday, June 17, 2014 at 2:41 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: Spark streaming RDDs to Parquet records

Mahesh,

 *   One direction could be : create a parquet schema, convert & save the 
records to hdfs.
 *   This might help 
https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala

Cheers



On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc 
mailto:mahesh.padmanab...@twc-contractor.com>>
 wrote:
Hello,

Is there an easy way to convert RDDs within a DStream into Parquet records?
Here is some incomplete pseudo code:

// Create streaming context
val ssc = new StreamingContext(...)

// Obtain a DStream of events
val ds = KafkaUtils.createStream(...)

// Get Spark context to get to the SQL context
val sc = ds.context.sparkContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// For each RDD
ds.foreachRDD((rdd: RDD[Array[Byte]]) => {

// What do I do next?
})

Thanks,
Mahesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



This E-mail and any of its attachments may contain Time Warner Cable 
proprietary information, which is privileged, confidential, or subject to 
copyright belonging to Time Warner Cable. This E-mail is intended solely for 
the use of the individual or entity to which it is addressed. If you are not 
the intended recipient of this E-mail, you are hereby notified that any 
dissemination, distribution, copying, or action taken in relation to the 
contents of and attachments to this E-mail is strictly prohibited and may be 
unlawful. If you have received this E-mail in error, please notify the sender 
immediately and permanently delete the original and any copy of this E-mail and 
any printout.


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Debasish Das
Xiangrui,

Could you point to the JIRA related to tree aggregate ? ...sounds like the
allreduce idea...

I would definitely like to try it on our dataset...

Makoto,

I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
got 100 iterations of SGD running in 200 seconds...10 executors each with
16 GB memory...

Although the best result on the same dataset came out of liblinear and
BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
other heuristics...it was arnd 5% off...

Thanks.
Deb



On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai  wrote:

> Hi Xiangrui,
>
> Does it mean that mapPartition and then reduce shares the same
> behavior as aggregate operation which is O(n)?
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
> > Hi DB,
> >
> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
> > compromise between current reduce and butterfly allReduce. The former
> > runs in linear time on the number of partitions, the latter introduces
> > too many dependencies. treeAggregate with depth = 2 should run in
> > O(sqrt(n)) time, where n is the number of partitions. It would be
> > great if someone can help test its scalability.
> >
> > Best,
> > Xiangrui
> >
> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
> >> Hi Xiangrui,
> >>
> >>
> >> (2014/06/18 4:58), Xiangrui Meng wrote:
> >>>
> >>> How many partitions did you set? If there are too many partitions,
> >>> please do a coalesce before calling ML algorithms.
> >>
> >>
> >> The training data "news20.random.1000" is small and thus only 2
> partitions
> >> are used by the default.
> >>
> >> val training = MLUtils.loadLibSVMFile(sc,
> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> >> multiclass=false).
> >>
> >> We also tried 32 partitions as follows but the aggregate never finishes.
> >>
> >> val training = MLUtils.loadLibSVMFile(sc,
> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
> >>
> >>
> >>> Btw, could you try the tree branch in my repo?
> >>> https://github.com/mengxr/spark/tree/tree
> >>>
> >>> I used tree aggregate in this branch. It should help with the
> scalability.
> >>
> >>
> >> Is treeAggregate itself available on Spark 1.0?
> >>
> >> I wonder.. Could I test your modification just by running the following
> code
> >> on REPL?
> >>
> >> ---
> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
> i)
> >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
> >>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> >> features)) =>
> >> val l = gradient.compute(features, label, weights,
> >> Vectors.fromBreeze(grad))
> >> (grad, loss + l)
> >>   },
> >>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
> >> (grad2, loss2)) =>
> >> (grad1 += grad2, loss1 + loss2)
> >>   }, 2)
> >> -
> >>
> >> Rebuilding Spark is quite something to do evaluation.
> >>
> >> Thanks,
> >> Makoto
>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 6:03), Xiangrui Meng wrote:

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?


I am using Spark 1.0.

588.8 MB is allocated for  RDDs.
I am setting SPARK_DRIVER_MEMORY=2g in the conf/spark-env.sh.

The value allocated for  RDDs in the web UI was not changed by 
doing as follows:

$ SPARK_DRIVER_MEMORY=6g bin/spark-shell

I set "-verbose:gc" but full GC (or continuous GCs) does not happen 
during the aggregate at the driver.


Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread DB Tsai
Hi Xiangrui,

Does it mean that mapPartition and then reduce shares the same
behavior as aggregate operation which is O(n)?

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> ---
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>> val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>> (grad, loss + l)
>>   },
>>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>> (grad1 += grad2, loss1 + loss2)
>>   }, 2)
>> -
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?

treeAggregate is not part of 1.0.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> ---
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>> val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>> (grad, loss + l)
>>   },
>>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>> (grad1 += grad2, loss1 + loss2)
>>   }, 2)
>> -
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto


Unit test failure: Address already in use

2014-06-17 Thread SK
Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt "test-only ",
all the 3 pass the test. But when I run them all using "sbt test", then they
fail with the warning below. I am wondering if the binding exception results
in failure to run the job, thereby causing the failure. If so, what can I do
to address this binding exception? I am running these tests locally on a
standalone machine (i.e. SparkContext("local", "test")).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi DB,

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
> Hi Xiangrui,
>
>
> (2014/06/18 4:58), Xiangrui Meng wrote:
>>
>> How many partitions did you set? If there are too many partitions,
>> please do a coalesce before calling ML algorithms.
>
>
> The training data "news20.random.1000" is small and thus only 2 partitions
> are used by the default.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false).
>
> We also tried 32 partitions as follows but the aggregate never finishes.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>
>
>> Btw, could you try the tree branch in my repo?
>> https://github.com/mengxr/spark/tree/tree
>>
>> I used tree aggregate in this branch. It should help with the scalability.
>
>
> Is treeAggregate itself available on Spark 1.0?
>
> I wonder.. Could I test your modification just by running the following code
> on REPL?
>
> ---
> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> features)) =>
> val l = gradient.compute(features, label, weights,
> Vectors.fromBreeze(grad))
> (grad, loss + l)
>   },
>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
> (grad2, loss2)) =>
> (grad1 += grad2, loss1 + loss2)
>   }, 2)
> -
>
> Rebuilding Spark is quite something to do evaluation.
>
> Thanks,
> Makoto


Re: Executors not utilized properly.

2014-06-17 Thread Jey Kottalam
Hi Abhishek,

> Where mapreduce is taking 2 mins, spark is taking 5 min to complete the
job.

Interesting. Could you tell us more about your program? A "code skeleton"
would certainly be helpful.

Thanks!

-Jey


On Tue, Jun 17, 2014 at 3:21 PM, abhiguruvayya 
wrote:

> I did try creating more partitions by overriding the default number of
> partitions determined by HDFS splits. Problem is, in this case program will
> run for ever. I have same set of inputs for map reduce and spark. Where map
> reduce is taking 2 mins, spark is taking 5 min to complete the job. I
> thought because all of the executors are not being utilized properly my
> spark program is running slower than map reduce. I can provide you my code
> skeleton for your reference. Please help me with this.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7759.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark streaming RDDs to Parquet records

2014-06-17 Thread Krishna Sankar
Mahesh,

   - One direction could be : create a parquet schema, convert & save the
   records to hdfs.
   - This might help
   
https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala

Cheers



On Tue, Jun 17, 2014 at 12:52 PM, maheshtwc <
mahesh.padmanab...@twc-contractor.com> wrote:

> Hello,
>
> Is there an easy way to convert RDDs within a DStream into Parquet records?
> Here is some incomplete pseudo code:
>
> // Create streaming context
> val ssc = new StreamingContext(...)
>
> // Obtain a DStream of events
> val ds = KafkaUtils.createStream(...)
>
> // Get Spark context to get to the SQL context
> val sc = ds.context.sparkContext
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> // For each RDD
> ds.foreachRDD((rdd: RDD[Array[Byte]]) => {
>
> // What do I do next?
> })
>
> Thanks,
> Mahesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 4:58), Xiangrui Meng wrote:

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.


The training data "news20.random.1000" is small and thus only 2 
partitions are used by the default.


val training = MLUtils.loadLibSVMFile(sc, 
"hdfs://host:8020/dataset/news20-binary/news20.random.1000", 
multiclass=false).


We also tried 32 partitions as follows but the aggregate never finishes.

val training = MLUtils.loadLibSVMFile(sc, 
"hdfs://host:8020/dataset/news20-binary/news20.random.1000", 
multiclass=false, numFeatures = 1354731 , minPartitions = 32)



Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.


Is treeAggregate itself available on Spark 1.0?

I wonder.. Could I test your modification just by running the following 
code on REPL?


---
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
  seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
val l = gradient.compute(features, label, weights, 
Vectors.fromBreeze(grad))

(grad, loss + l)
  },
  combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), 
(grad2, loss2)) =>

(grad1 += grad2, loss1 + loss2)
  }, 2)
-

Rebuilding Spark is quite something to do evaluation.

Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread DB Tsai
Hi Xiangrui,

What's different between treeAggregate and aggregate? Why
treeAggregate scales better? What if we just use mapPartition, will it
be as fast as treeAggregate?

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 12:58 PM, Xiangrui Meng  wrote:
> Hi Makoto,
>
> How many partitions did you set? If there are too many partitions,
> please do a coalesce before calling ML algorithms.
>
> Btw, could you try the tree branch in my repo?
> https://github.com/mengxr/spark/tree/tree
>
> I used tree aggregate in this branch. It should help with the scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui  wrote:
>> Here is follow-up to the previous evaluation.
>>
>> "aggregate at GradientDescent.scala:178" never finishes at
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178
>>
>> We confirmed, by -verbose:gc, that GC is not happening during the aggregate
>> and the cumulative CPU time for the task is increasing little by little.
>>
>> LBFGS also does not work for large # of features (news20.random.1000)
>> though it works fine for small # of features (news20.binary.1000).
>>
>> "aggregate at LBFGS.scala:201" also never finishes at
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201
>>
>> ---
>> [Evaluated code for LBFGS]
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.mllib.classification.LogisticRegressionModel
>> import org.apache.spark.mllib.optimization._
>>
>> val data = MLUtils.loadLibSVMFile(sc,
>> "hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false)
>> val numFeatures = data.take(1)(0).features.size
>>
>> val training = data.map(x => (x.label, 
>> MLUtils.appendBias(x.features))).cache()
>>
>> // Run training algorithm to build the model
>> val numCorrections = 10
>> val convergenceTol = 1e-4
>> val maxNumIterations = 20
>> val regParam = 0.1
>> val initialWeightsWithIntercept = Vectors.dense(new
>> Array[Double](numFeatures + 1))
>>
>> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
>>   training,
>>   new LogisticGradient(),
>>   new SquaredL2Updater(),
>>   numCorrections,
>>   convergenceTol,
>>   maxNumIterations,
>>   regParam,
>>   initialWeightsWithIntercept)
>> ---
>>
>>
>> Thanks,
>> Makoto
>>
>> 2014-06-17 21:32 GMT+09:00 Makoto Yui :
>>> Hello,
>>>
>>> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
>>> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
>>> the number of training examples used in the evaluation is just 1,000.
>>>
>>> It works fine for the dataset *news20.binary.1000* that has 178,560
>>> features. However, it does not work for *news20.random.1000* where # of
>>> features is large  (1,354,731 features) though we used a sparse vector
>>> through MLUtils.loadLibSVMFile().
>>>
>>> The execution seems not progressing while no error is reported in the
>>> spark-shell as well as in the stdout/stderr of executors.
>>>
>>> We used 32 executors with each allocating 7GB (2GB is for RDD) for
>>> working memory.
>>>
>>> Any suggesions? Your help is really appreciated.
>>>
>>> ==
>>> Executed code
>>> ==
>>> import org.apache.spark.mllib.util.MLUtils
>>> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>>>
>>> //val training = MLUtils.loadLibSVMFile(sc,
>>> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
>>> multiclass=false)
>>> val training = MLUtils.loadLibSVMFile(sc,
>>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> multiclass=false)
>>>
>>> val numFeatures = training .take(1)(0).features.size
>>> //numFeatures: Int = 178560 for news20.binary.1000
>>> //numFeatures: Int = 1354731 for news20.random.1000
>>> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>>>
>>> ==
>>> The dataset used in the evaluation
>>> ==
>>>
>>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>>>
>>> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
>>> news20.binary.1000
>>> $ sort -R news20.binary > news20.random
>>> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
>>> news20.random.1000
>>>
>>> You can find the dataset in
>>> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
>>> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>>>
>>>
>>> Thanks,
>>> Makoto


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Bharath,

Thanks for posting the details! Which Spark version are you using?

Best,
Xiangrui

On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar  wrote:
> Hi,
>
> (Apologies for the long mail, but it's necessary to provide sufficient
> details considering the number of issues faced.)
>
> I'm running into issues testing LogisticRegressionWithSGD a two node cluster
> (each node with 24 cores and 16G available to slaves out of 24G on the
> system). Here's a description of the application:
>
> The model is being trained based on categorical features x, y, and (x,y).
> The categorical features are mapped to binary features by converting each
> distinct value in the category enum into a binary feature by itself (i.e
> presence of that value in a record implies corresponding feature = 1, else
> feature = 0. So, there'd be as many distinct features as enum values) . The
> training vector is laid out as
> [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
> training data has only one combination (Xk,Yk) and a label appearing in the
> record. Thus, the corresponding labeledpoint sparse vector would only have 3
> values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
> (though parse) would be nearly 614000.  The number of records is about 1.33
> million. The records have been coalesced into 20 partitions across two
> nodes. The input data has not been cached.
> (NOTE: I do realize the records & features may seem large for a two node
> setup, but given the memory & cpu, and the fact that I'm willing to give up
> some turnaround time, I don't see why tasks should inexplicably fail)
>
> Additional parameters include:
>
> spark.executor.memory = 14G
> spark.default.parallelism = 1
> spark.cores.max=20
> spark.storage.memoryFraction=0.8 //No cache space required
> (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
> either)
>
> The model training was initialized as : new LogisticRegressionWithSGD(1,
> maxIterations, 0.0, 0.05)
>
> However, after 4 iterations of gradient descent, the entire execution
> appeared to stall inexplicably. The corresponding executor details and
> details of the stalled stage (number 14) are as follows:
>
> MetricMin25th Median75th Max
> Result serialization time12 ms13 ms14 ms16 ms18 ms
> Duration4 s4 s5 s5 s5 s
> Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
> results
> Scheduler delay6 s6 s6 s6 s
> 12 s
>
>
> Stage Id
> 14 aggregate at GradientDescent.scala:178
>
> Task IndexTask IDStatusLocality Level Executor
> Launch TimeDurationGC Result Ser TimeErrors
>
> Time
>
> 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 2 s 12 ms
> 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 14 ms
> 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 2 s 12 ms
> 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 14 ms
> 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 13 ms
> 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 18 ms
> 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 13 ms
> 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 14 ms
> 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 12 ms
> 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 18 ms
> 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32

Re: Contribution to Spark MLLib

2014-06-17 Thread Xiangrui Meng
Hi Jayati,

Thanks for asking! MLlib algorithms are all implemented in Scala. It
makes us easier to maintain if we have the implementations in one
place. For the roadmap, please visit
http://www.slideshare.net/xrmeng/m-llib-hadoopsummit to see features
planned for v1.1. Before contributing new algorithms, it would be
great if you can start with working on an existing JIRA.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 AM, Jayati  wrote:
> Hello,
>
> I wish to contribute some algorithms to the MLLib of Spark but at the same
> time wanted to make sure that I don't try something redundant.
>
> Will it be okay with you to let me know the set of algorithms which aren't
> there in your road map in the near future ?
>
> Also, can I use Java to write machine learning algorithms for Spark MLLib
> instead of Scala ?
>
> Regards,
> Jayati
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark SQL: No function to evaluate expression

2014-06-17 Thread Zuhair Khayyat
Dear all,

I am trying to run the following query on Spark SQL using some custom TPC-H
tables with standalone Spark cluster configuration:

SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE
a.c_address <> b.c_address;

Unfortunately I get the following error during execution:

java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)

at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure
in TID 12 on host kw2260.kaust.edu.sa:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function
to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address


org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59)


org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147)


org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74)


  org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100)


Is this a bug or am I doing something wrong?


Regards,

Zuhair Khayyat


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.

Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui  wrote:
> Here is follow-up to the previous evaluation.
>
> "aggregate at GradientDescent.scala:178" never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178
>
> We confirmed, by -verbose:gc, that GC is not happening during the aggregate
> and the cumulative CPU time for the task is increasing little by little.
>
> LBFGS also does not work for large # of features (news20.random.1000)
> though it works fine for small # of features (news20.binary.1000).
>
> "aggregate at LBFGS.scala:201" also never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201
>
> ---
> [Evaluated code for LBFGS]
>
> import org.apache.spark.SparkContext
> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionModel
> import org.apache.spark.mllib.optimization._
>
> val data = MLUtils.loadLibSVMFile(sc,
> "hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false)
> val numFeatures = data.take(1)(0).features.size
>
> val training = data.map(x => (x.label, 
> MLUtils.appendBias(x.features))).cache()
>
> // Run training algorithm to build the model
> val numCorrections = 10
> val convergenceTol = 1e-4
> val maxNumIterations = 20
> val regParam = 0.1
> val initialWeightsWithIntercept = Vectors.dense(new
> Array[Double](numFeatures + 1))
>
> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
>   training,
>   new LogisticGradient(),
>   new SquaredL2Updater(),
>   numCorrections,
>   convergenceTol,
>   maxNumIterations,
>   regParam,
>   initialWeightsWithIntercept)
> ---
>
>
> Thanks,
> Makoto
>
> 2014-06-17 21:32 GMT+09:00 Makoto Yui :
>> Hello,
>>
>> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
>> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
>> the number of training examples used in the evaluation is just 1,000.
>>
>> It works fine for the dataset *news20.binary.1000* that has 178,560
>> features. However, it does not work for *news20.random.1000* where # of
>> features is large  (1,354,731 features) though we used a sparse vector
>> through MLUtils.loadLibSVMFile().
>>
>> The execution seems not progressing while no error is reported in the
>> spark-shell as well as in the stdout/stderr of executors.
>>
>> We used 32 executors with each allocating 7GB (2GB is for RDD) for
>> working memory.
>>
>> Any suggesions? Your help is really appreciated.
>>
>> ==
>> Executed code
>> ==
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>>
>> //val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
>> multiclass=false)
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false)
>>
>> val numFeatures = training .take(1)(0).features.size
>> //numFeatures: Int = 178560 for news20.binary.1000
>> //numFeatures: Int = 1354731 for news20.random.1000
>> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>>
>> ==
>> The dataset used in the evaluation
>> ==
>>
>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>>
>> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.binary.1000
>> $ sort -R news20.binary > news20.random
>> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.random.1000
>>
>> You can find the dataset in
>> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
>> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>>
>>
>> Thanks,
>> Makoto


Spark streaming RDDs to Parquet records

2014-06-17 Thread maheshtwc
Hello,

Is there an easy way to convert RDDs within a DStream into Parquet records?
Here is some incomplete pseudo code:

// Create streaming context
val ssc = new StreamingContext(...)

// Obtain a DStream of events
val ds = KafkaUtils.createStream(...)

// Get Spark context to get to the SQL context
val sc = ds.context.sparkContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// For each RDD
ds.foreachRDD((rdd: RDD[Array[Byte]]) => {

// What do I do next?
})

Thanks,
Mahesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-RDDs-to-Parquet-records-tp7762.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Here is follow-up to the previous evaluation.

"aggregate at GradientDescent.scala:178" never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

We confirmed, by -verbose:gc, that GC is not happening during the aggregate
and the cumulative CPU time for the task is increasing little by little.

LBFGS also does not work for large # of features (news20.random.1000)
though it works fine for small # of features (news20.binary.1000).

"aggregate at LBFGS.scala:201" also never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

---
[Evaluated code for LBFGS]

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc,
"hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new
Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)
---


Thanks,
Makoto

2014-06-17 21:32 GMT+09:00 Makoto Yui :
> Hello,
>
> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
> the number of training examples used in the evaluation is just 1,000.
>
> It works fine for the dataset *news20.binary.1000* that has 178,560
> features. However, it does not work for *news20.random.1000* where # of
> features is large  (1,354,731 features) though we used a sparse vector
> through MLUtils.loadLibSVMFile().
>
> The execution seems not progressing while no error is reported in the
> spark-shell as well as in the stdout/stderr of executors.
>
> We used 32 executors with each allocating 7GB (2GB is for RDD) for
> working memory.
>
> Any suggesions? Your help is really appreciated.
>
> ==
> Executed code
> ==
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>
> //val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
> multiclass=false)
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false)
>
> val numFeatures = training .take(1)(0).features.size
> //numFeatures: Int = 178560 for news20.binary.1000
> //numFeatures: Int = 1354731 for news20.random.1000
> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>
> ==
> The dataset used in the evaluation
> ==
>
> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>
> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
> news20.binary.1000
> $ sort -R news20.binary > news20.random
> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
> news20.random.1000
>
> You can find the dataset in
> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>
>
> Thanks,
> Makoto


Re: Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
I did try creating more partitions by overriding the default number of
partitions determined by HDFS splits. Problem is, in this case program will
run for ever. I have same set of inputs for map reduce and spark. Where map
reduce is taking 2 mins, spark is taking 5 min to complete the job. I
thought because all of the executors are not being utilized properly my
spark program is running slower than map reduce. I can provide you my code
skeleton for your reference. Please help me with this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7759.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Problems running Spark job on mesos in fine-grained mode

2014-06-17 Thread Sébastien Rainville
Hi,

I'm having trouble running spark on mesos in fine-grained mode. I'm running
spark 1.0.0 and mesos 0.18.0. The tasks are failing randomly, which most of
the time, but not always, cause the job to fail. The same code is running
fine in coarse-grained mode. I see the following exceptions in the logs of
the spark driver:

W0617 10:57:36.774382  8735 sched.cpp:901] Attempting to launch task 21
with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
W0617 10:57:36.774433  8735 sched.cpp:901] Attempting to launch task 22
with an unknown offer 20140416-011500-1369465866-5050-26096-52332715
14/06/17 10:57:36 INFO TaskSetManager: Re-queueing tasks for
201311011608-1369465866-5050-9189-46 from TaskSet 0.0
14/06/17 10:57:36 WARN TaskSetManager: Lost TID 22 (task 0.0:2)
14/06/17 10:57:36 WARN TaskSetManager: Lost TID 19 (task 0.0:0)
14/06/17 10:57:36 WARN TaskSetManager: Lost TID 21 (task 0.0:1)
14/06/17 10:57:36 INFO DAGScheduler: Executor lost:
201311011608-1369465866-5050-9189-46 (epoch 0)
14/06/17 10:57:36 INFO BlockManagerMasterActor: Trying to remove executor
201311011608-1369465866-5050-9189-46 from BlockManagerMaster.
14/06/17 10:57:36 INFO BlockManagerMaster: Removed
201311011608-1369465866-5050-9189-46 successfully in removeExecutor
14/06/17 10:57:36 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
14/06/17 10:57:36 INFO DAGScheduler: Host added was in lost list earlier:
ca1-dcc1-0065.lab.mtl

I don't see any exceptions in the spark executor logs. The only error
message I found in mesos itself is warnings in the mesos master:

W0617 10:57:36.816748 26100 master.cpp:1615] Failed to validate task 21 :
Task 21 attempted to use cpus(*):1 combined with already used cpus(*):1;
mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
ports(*):[11900-11919, 1192
1-11995, 11997-11999]; cpus(*):1
W0617 10:57:36.819807 26100 master.cpp:1615] Failed to validate task 22 :
Task 22 attempted to use cpus(*):1 combined with already used cpus(*):1;
mem(*):2048 is greater than offered mem(*):3216; disk(*):98304;
ports(*):[11900-11919, 1192
1-11995, 11997-11999]; cpus(*):1
W0617 10:57:36.932287 26102 master.cpp:1615] Failed to validate task 28 :
Task 28 attempted to use cpus(*):1 combined with already used cpus(*):1;
mem(*):2048 is greater than offered cpus(*):1; mem(*):3216; disk(*):98304;
ports(*):[11900-
11960, 11962-11978, 11980-11999]
W0617 11:05:52.783133 26098 master.cpp:2106] Ignoring unknown exited
executor 201311011608-1369465866-5050-9189-46 on slave
201311011608-1369465866-5050-9189-46 (ca1-dcc1-0065.lab.mtl)
W0617 11:05:52.787739 26103 master.cpp:2106] Ignoring unknown exited
executor 201311011608-1369465866-5050-9189-34 on slave
201311011608-1369465866-5050-9189-34 (ca1-dcc1-0053.lab.mtl)
W0617 11:05:52.790292 26102 master.cpp:2106] Ignoring unknown exited
executor 201311011608-1369465866-5050-9189-59 on slave
201311011608-1369465866-5050-9189-59 (ca1-dcc1-0079.lab.mtl)
W0617 11:05:52.800649 26099 master.cpp:2106] Ignoring unknown exited
executor 201311011608-1369465866-5050-9189-18 on slave
201311011608-1369465866-5050-9189-18 (ca1-dcc1-0027.lab.mtl)
... (more of those "Ignoring unknown exited executor")


I analyzed the difference in between the execution of the same job in
coarse-grained mode and fine-grained mode, and I noticed that in the
fine-grained mode the tasks get executed on executors different than the
ones reported in spark, as if spark and mesos get out of sync as to which
executor is responsible for which task. See the following:


Coarse-grained mode:

SparkMesosTask IndexTask IDExecutorStatusTask ID (UI)Task NameTask ID (logs)
ExecutorState0066SUCCESS4"Task 4"066RUNNING1159SUCCESS0"Task 0"159RUNNING22
54SUCCESS10"Task 10"254RUNNING33128SUCCESS6"Task 6"3128RUNNING...

Fine-grained mode:

SparkMesosTask IndexTask IDExecutorTask ID (UI)Task NameTask ID (logs)
ExecutorState023108SUCCESS23"task 0.0:0"2327FINISHED01965FAILED19"task
0.0:0"1986FINISHED12165FAILEDMesos executor was never
created12492SUCCESS24"task
0.0:1"24129FINISHED22265FAILEDMesos executor was never created225100SUCCESS
25"task 0.0:2"2584FINISHED32680SUCCESS26"task 0.0:3"26124FINISHED42765FAILED
27"task 0.0:4"27108FINISHED42992SUCCESS29"task
0.0:4"2965FINISHED52865FAILEDMesos
executor was never created53077SUCCESS30"task
0.0:5"3062FINISHED6053SUCCESS0"task
0.0:6"041FINISHED7177SUCCESS1"task 0.0:7"1114FINISHED...


Is it normal that the executor reported in spark and mesos to be different
when running in fine-grained mode?

Please note that in this particular example the job actually succeeded, but
most of the time it's failing after 4 failed attempts of a given task. This
job never fails in coarse-grained mode. Every job is working in
coarse-grained mode and failing the same way in fine-grained mode.

Does anybody have an idea what the problem could be?

Thanks,

- Sebastien


Re: join operation is taking too much time

2014-06-17 Thread Daniel Darabos
I've been wondering about this. Is there a difference in performance
between these two?

val rdd1 = sc.textFile(files.mkString(",")) val rdd2 = sc.union(files.map(sc
.textFile(_)))

I don't know about your use-case, Meethu, but it may be worth trying to see
if reading all the files into one RDD (like rdd1) would perform better in
the join. (If this is possible in your situation.)



On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or  wrote:

> How long does it get stuck for? This is a common sign for the OS thrashing
> due to out of memory exceptions. If you keep it running longer, does it
> throw an error?
>
> Depending on how large your other RDD is (and your join operation), memory
> pressure may or may not be the problem at all. It could be that spilling
> your shuffles
> to disk is slowing you down (but probably shouldn't hang your
> application). For the 5 RDDs case, what happens if you set
> spark.shuffle.spill to false?
>
>
> 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW :
>
>
>>  Hi all,
>>
>> I want  to do a recursive leftOuterJoin between an RDD (created from
>>  file) with 9 million rows(size of the file is 100MB) and 30 other
>> RDDs(created from 30 diff files in each iteration of a loop) varying from 1
>> to 6 million rows.
>> When I run it for 5 RDDs,its running successfully  in 5 minutes.But when
>> I increase it to 10 or 30 RDDs its gradually slowing down and finally
>> getting stuck without showing any warning or error.
>>
>> I am running in standalone mode with 2 workers of 4GB each and a total
>> of 16 cores .
>>
>> Any of you facing similar problems with JOIN  or is it a problem with my
>> configuration.
>>
>> Thanks & Regards,
>> Meethu M
>>
>
>


Re: Executors not utilized properly.

2014-06-17 Thread Sean Owen
It sounds like your job has 9 tasks and all are executing simultaneously in
parallel. This is as good as it gets right? Are you asking how to break the
work into more tasks, like 120 to match your 10*12 cores? Make your RDD
have more partitions. For example the textFile method can override the
default number of partitions determined by HDFS splits.
On Jun 17, 2014 5:37 PM, "abhiguruvayya"  wrote:

> I am creating around 10 executors with 12 cores and 7g memory, but when i
> launch a task not all executors are being used. For example if my job has 9
> tasks, only 3 executors are being used with 3 task each and i believe this
> is making my app slower than map reduce program for the same use case. Can
> any one throw some light on executor configuration if any?How can i use all
> the executors. I am running spark on yarn and Hadoop 2.4.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-17 Thread Gino Bustelo
Luis' experience validates what I'm seeing. You have to still set the 
properties in the SparkConf for the context to work. For example, master URL 
and jars are specified again in the app. 

Gino B.

> On Jun 17, 2014, at 12:05 PM, Luis Ángel Vicente Sánchez 
>  wrote:
> 
> I have been able to submit a job successfully but I had to config my spark 
> job this way:
> 
>   val sparkConf: SparkConf =
> new SparkConf()
>   .setAppName("TwitterPopularTags")
>   .setMaster("spark://int-spark-master:7077")
>   .setSparkHome("/opt/spark")
>   .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))
> 
> Now I'm getting this error on my worker:
> 
> 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any 
> resources; check your cluster UI to ensure that workers are registered and 
> have sufficient memory
> 
> 
> 
> 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez 
> :
>> After playing a bit, I have been able to create a fatjar this way:
>> 
>> lazy val rootDependencies = Seq(
>>   "org.apache.spark" %% "spark-core"  % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" 
>> exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark", 
>> "spark-streaming_2.10")
>> )
>> 
>> Excluding those transitive dependencies, we can create a fatjar ~400Kb 
>> instead of 40Mb.
>> 
>> My problem is not to run the streaming job locally but trying to submit it 
>> to standalone cluster using spark-submit, everytime I ran the following 
>> command, my workers died:
>> 
>> ~/development/tools/spark/1.0.0/bin/spark-submit \
>> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
>> --master "spark://int-spark-master:7077" \
>> --deploy-mode "cluster" \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>> 
>> I have copied my fatjar to my master /tmp folder.
>> 
>> 
>> 2014-06-17 10:30 GMT+01:00 Michael Cutler :
>> 
>>> Admittedly getting Spark Streaming / Kafka working for the first time can 
>>> be a bit tricky with the web of dependencies that get pulled in.  I've 
>>> taken the KafkaWorkCount example from the Spark project and set up a simple 
>>> standalone SBT project that shows you how to get it working and using 
>>> spark-submit.
>>> 
>>> https://github.com/cotdp/spark-example-kafka
>>> 
>>> The key trick is in the use of sbt-assembly instead of relying on any of 
>>> the "add jars" functionality.  You mark "spark-core" and "spark-streaming" 
>>> as provided, because they are part of the core spark-assembly already 
>>> running your cluster.  However "spark-streaming-kafka" is not, so you need 
>>> to package it in your 'fat JAR' while excluding all the mess that causes 
>>> the build to break.
>>> 
>>> build.sbt:
>>> 
>>> import AssemblyKeys._
>>> 
>>> 
>>> assemblySettings
>>> 
>>> 
>>> name := "spark-example-kafka"
>>> 
>>> 
>>> version := "1.0"
>>> 
>>> 
>>> scalaVersion := "2.10.4"
>>> 
>>> 
>>> 
>>> 
>>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>> 
>>> 
>>> 
>>> 
>>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>> 
>>> 
>>> 
>>> 
>>> libraryDependencies ++= Seq(
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>> 
>>> 
>>> 
>>> exclude("commons-beanutils", "commons-beanutils").
>>> 
>>> 
>>> 
>>> exclude("commons-collections", "commons-collections").
>>> 
>>> 
>>> 
>>> exclude("com.esotericsoftware.minlog", "minlog")
>>> 
>>> 
>>> 
>>> )
>>> 
>>> 
>>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>> 
>>> 
>>> 
>>>   {
>>> case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>> 
>>> 
>>> 
>>> case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>> 
>>> 
>>> 
>>> case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>> 
>>> 
>>> 
>>> case x => old(x)
>>> 
>>> 
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> You can see the "exclude()" has to go around the spark-streaming-kafka 
>>> dependency, and I've used a MergeStrategy to solve the "deduplicate: 
>>> different file contents found in the following" errors.
>>> 
>>> Build the JAR with sbt assembly and use the scripts in bin/ to run the 
>>> examples.
>>> 
>>> I'm using this same approach to run my Spark Streaming jobs with 
>>> spark-submit and have them managed using Mesos/Marathon to handle failures 
>>> and restarts with long running processes.
>>> 
>>> Good luck!
>>> 
>>> MC
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Michael Cutler
>>> Founder, CTO
>>> 
>>> 
>>> Mobile: +44 789 990 7847
>>> Email:   mich...@tumra.com
>>> Web: tumra.com
>>> Visit us at our offices in Chiswick Park
>>> Registered in England & Wales, 07916412. VAT No. 130595328
>>> 
>>> 
>>> This email and any files transm

Re: What is the best way to handle transformations or actions that takes forever?

2014-06-17 Thread Daniel Darabos
I think you need to implement a timeout in your code. As far as I know,
Spark will not interrupt the execution of your code as long as the driver
is connected. Might be an idea though.


On Tue, Jun 17, 2014 at 7:54 PM, Peng Cheng  wrote:

> I've tried enabling the speculative jobs, this seems partially solved the
> problem, however I'm not sure if it can handle large-scale situations as it
> only start when 75% of the job is finished.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
Can some one help me with this. Any help is appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: What is the best way to handle transformations or actions that takes forever?

2014-06-17 Thread Peng Cheng
I've tried enabling the speculative jobs, this seems partially solved the
problem, however I'm not sure if it can handle large-scale situations as it
only start when 75% of the job is finished.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark sql unable to connect to db2 hive metastore

2014-06-17 Thread Jenny Zhao
Thanks Michael!

as I run it using spark-shell, so I added both jars through bin/spark-shell
--jars options.  I noticed if I don't pass these jars, it complains it
couldn't find the driver, if I pass them through --jars options, it
complains there is no suitable driver.

Regards.


On Tue, Jun 17, 2014 at 2:43 AM, Michael Armbrust 
wrote:

> First a clarification:  Spark SQL does not talk to HiveServer2, as that
> JDBC interface is for retrieving results from queries that are executed
> using Hive.  Instead Spark SQL will execute queries itself by directly
> accessing your data using Spark.
>
> Spark SQL's Hive module can use JDBC to connect to an external metastore,
> in your case DB2. This is only used to retrieve the metadata (i.e., column
> names and types, HDFS locations for data)
>
> Looking at your exception I still see "java.sql.SQLException: No suitable
> driver", so my guess would be that the DB2 JDBC drivers are not being
> correctly included.  How are you trying to add them to the classpath?
>
> Michael
>
>
> On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao 
> wrote:
>
>>
>> Hi,
>>
>> my hive configuration use db2 as it's metastore database, I have built
>> spark with the extra step sbt/sbt assembly/assembly to include the
>> dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf.
>> when I ran :
>>
>> hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>
>> got following exception, pasted portion of the stack trace here, looking
>> at the stack, this made me wondering if Spark supports remote metastore
>> configuration, it seems spark doesn't talk to hiveserver2 directly?  the
>> driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are
>> included in the classpath, otherwise, it will complain it couldn't find the
>> driver.
>>
>> Appreciate any help to resolve it.
>>
>> Thanks!
>>
>> Caused by: java.sql.SQLException: Unable to open a test connection to the
>> given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username =
>> catalog. Terminating connection pool. Original Exception: --
>> java.sql.SQLException: No suitable driver
>> at java.sql.DriverManager.getConnection(DriverManager.java:422)
>> at java.sql.DriverManager.getConnection(DriverManager.java:374)
>> at
>> com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254)
>> at com.jolbox.bonecp.BoneCP.(BoneCP.java:305)
>> at
>> com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150)
>> at
>> com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112)
>> at
>> org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479)
>> at
>> org.datanucleus.store.rdbms.RDBMSStoreManager.(RDBMSStoreManager.java:304)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:527)
>> at
>> org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
>> at
>> org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
>> at
>> org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069)
>> at
>> org.datanucleus.NucleusContext.initialise(NucleusContext.java:359)
>> at
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768)
>> at
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326)
>> at
>> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
>> at java.lang.reflect.Method.invoke(Method.java:611)
>> at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
>> at
>> java.security.AccessController.doPrivileged(AccessController.java:277)
>> at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
>> at
>> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
>> at
>> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
>> at
>> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
>> at
>> org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
>> at
>> org.apache.ha

Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-17 Thread Surendranauth Hiraman
Matt/Ryan,

Did you make any headway on this? My team is running into this also.
Doesn't happen on smaller datasets. Our input set is about 10 GB but we
generate 100s of GBs in the flow itself.

-Suren




On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton  wrote:

> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> cluster (no modifications to Spark or its dependencies). The error
> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> edge list (GraphX worked beautifully on smaller data).
>
> Here's the stacktrace (it's quite similar to yours
> https://imgur.com/7iBA4nJ ).
>
> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> 4 times; aborting job
> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> VertexRDD.scala:100
> Exception in thread "main" org.apache.spark.SparkException: Job
> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> recent failure: Exception failure in TID 29735 on host node18:
> java.io.StreamCorruptedException: invalid type code: AC
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> java.lang.Thread.run(Thread.java:662)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>
> On Wed, Jun 4, 2014 at 7:50 AM, 

Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-17 Thread Luis Ángel Vicente Sánchez
I have been able to submit a job successfully but I had to config my spark
job this way:

  val sparkConf: SparkConf =
new SparkConf()
  .setAppName("TwitterPopularTags")
  .setMaster("spark://int-spark-master:7077")
  .setSparkHome("/opt/spark")
  .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))

Now I'm getting this error on my worker:

4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory



2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> After playing a bit, I have been able to create a fatjar this way:
>
> lazy val rootDependencies = Seq(
>   "org.apache.spark" %% "spark-core"  % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
> exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
> "spark-streaming_2.10")
> )
>
> Excluding those transitive dependencies, we can create a fatjar ~400Kb
> instead of 40Mb.
>
> My problem is not to run the streaming job locally but trying to submit it
> to standalone cluster using spark-submit, everytime I ran the following
> command, my workers died:
>
> ~/development/tools/spark/1.0.0/bin/spark-submit \
> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
> --master "spark://int-spark-master:7077" \
> --deploy-mode "cluster" \
> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>
> I have copied my fatjar to my master /tmp folder.
>
>
> 2014-06-17 10:30 GMT+01:00 Michael Cutler :
>
> Admittedly getting Spark Streaming / Kafka working for the first time can
>> be a bit tricky with the web of dependencies that get pulled in.  I've
>> taken the KafkaWorkCount example from the Spark project and set up a simple
>> standalone SBT project that shows you how to get it working and using
>> spark-submit.
>>
>> *https://github.com/cotdp/spark-example-kafka
>> *
>>
>> The key trick is in the use of sbt-assembly instead of relying on any of
>> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
>> as provided, because they are part of the core spark-assembly already
>> running your cluster.  However "spark-streaming-kafka" is not, so you need
>> to package it in your 'fat JAR' while excluding all the mess that causes
>> the build to break.
>>
>> build.sbt
>> :
>>
>> import AssemblyKeys._
>>
>> assemblySettings
>>
>> name := "spark-example-kafka"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>>
>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>
>>
>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>
>>
>> libraryDependencies ++= Seq(
>>
>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>
>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>
>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>
>> exclude("commons-beanutils", "commons-beanutils").
>>
>> exclude("commons-collections", "commons-collections").
>>
>> exclude("com.esotericsoftware.minlog", "minlog")
>>
>> )
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>
>>   {
>> case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>
>> case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>
>> case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>
>> case x => old(x)
>>
>>   }
>> }
>>
>>
>> You can see the "exclude()" has to go around the spark-streaming-kafka 
>> dependency,
>> and I've used a MergeStrategy to solve the "deduplicate: different file
>> contents found in the following" errors.
>>
>> Build the JAR with sbt assembly and use the scripts in bin/ to run the
>> examples.
>>
>> I'm using this same approach to run my Spark Streaming jobs with
>> spark-submit and have them managed using Mesos/Marathon
>>  to handle failures and restarts with long
>> running processes.
>>
>> Good luck!
>>
>> MC
>>
>>
>>
>>
>>
>>  *Michael Cutler*
>> Founder, CTO
>>
>>
>> * Mobile: +44 789 990 7847 Email:   mich...@tumra.com 
>> Web: tumra.com
>>  *
>> *Visit us at our offices in Chiswick Park *
>> *Registered in England & Wales, 07916412. VAT No. 130595328*
>>
>>
>> This email and any files transmitted with it are confidential and may
>> also be privileged. It is intended only for the person to whom it is
>> addressed. If you have received this email in error, please inform the
>> sender immediately. If you are not the intended recipient you must not
>> use, disclose, copy, print, distribute or rely on this email.
>>
>>
>> On 17 June 2014 02:51, Gino Bustelo  wrote:
>>
>>> +1 for this issue. Documenta

Re: Worker dies while submitting a job

2014-06-17 Thread Luis Ángel Vicente Sánchez
I have been able to submit a job successfully but I had to config my spark
job this way:

  val sparkConf: SparkConf =
new SparkConf()
  .setAppName("TwitterPopularTags")
  .setMaster("spark://int-spark-master:7077")
  .setSparkHome("/opt/spark")
  .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))

Now I'm getting this error on my worker:

4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory




2014-06-17 17:36 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> Ok... I was checking the wrong version of that file yesterday. My worker
> is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no
> case branch for that state and the worker is crashing. I still don't know
> why I'm getting a FAILED state but I'm sure that should kill the actor due
> to a scala.MatchError.
>
> Usually in scala is a best-practice to use a sealed trait and case
> classes/objects in a match statement instead of an enumeration (the
> compiler will complain about missing cases); I think that should be
> refactored to catch this kind of errors at compile time.
>
> Now I need to find why that state changed message is sent... I will
> continue updating this thread until I found the problem :D
>
>
> 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com>:
>
> I'm playing with a modified version of the TwitterPopularTags example and
>> when I tried to submit the job to my cluster, workers keep dying with this
>> message:
>>
>> 14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
>> "/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
>> "-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
>> "org.apache.spark.deploy.worker.DriverWrapper"
>> "akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
>> "org.apache.spark.examples.streaming.TwitterPopularTags"
>> 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
>> scala.Enumeration$Val)
>> scala.MatchError: FAILED (of class scala.Enumeration$Val)
>> at
>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 14/06/16 17:11:17 INFO Worker: Starting Spark worker
>> int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
>> 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
>> 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
>> http://int-spark-app-ie005d6a3.mclabs.io:8081
>> 14/06/16 17:11:17 INFO Worker: Connecting to master
>> spark://int-spark-app-ie005d6a3.mclabs.io:7077...
>> 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
>> re-register worker at same address: akka.tcp://
>> sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676
>>
>> This happens when the worker receive a DriverStateChanged(driverId,
>> state, exception) message.
>>
>> To deploy the job I copied the jar file to the temporary folder of master
>> node and execute the following command:
>>
>> ./spark-submit \
>> --class org.apache.spark.examples.streaming.TwitterPopularTags \
>> --master spark://int-spark-master:7077 \
>> --deploy-mode cluster \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>>
>> I don't really know what the problem could be as there is a 'case _' that
>> should avoid that problem :S
>>
>
>


Re: join operation is taking too much time

2014-06-17 Thread Andrew Or
How long does it get stuck for? This is a common sign for the OS thrashing
due to out of memory exceptions. If you keep it running longer, does it
throw an error?

Depending on how large your other RDD is (and your join operation), memory
pressure may or may not be the problem at all. It could be that spilling
your shuffles
to disk is slowing you down (but probably shouldn't hang your application).
For the 5 RDDs case, what happens if you set spark.shuffle.spill to false?


2014-06-17 5:59 GMT-07:00 MEETHU MATHEW :

>
>  Hi all,
>
> I want  to do a recursive leftOuterJoin between an RDD (created from
>  file) with 9 million rows(size of the file is 100MB) and 30 other
> RDDs(created from 30 diff files in each iteration of a loop) varying from 1
> to 6 million rows.
> When I run it for 5 RDDs,its running successfully  in 5 minutes.But when I
> increase it to 10 or 30 RDDs its gradually slowing down and finally getting
> stuck without showing any warning or error.
>
> I am running in standalone mode with 2 workers of 4GB each and a total of
> 16 cores .
>
> Any of you facing similar problems with JOIN  or is it a problem with my
> configuration.
>
> Thanks & Regards,
> Meethu M
>


Re: Yarn-client mode and standalone-client mode hang during job start

2014-06-17 Thread Andrew Or
Standalone-client mode is not officially supported at the moment. For
standalone-cluster and yarn-client modes, however, they should work.

For both modes, are you running spark-submit from within the cluster, or
outside of it? If the latter, could you try running it from within the
cluster and see if it works? (Does your rtgraph.jar exist on the machine
from which you run spark-submit?)


2014-06-17 2:41 GMT-07:00 Jianshi Huang :

> Hi,
>
> I've stuck using either yarn-client or standalone-client mode. Either will
> stuck when I submit jobs, the last messages it printed were:
>
> ...
> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
> file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at
> http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp
> 1402997837065
> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
> file:/x/home/jianshuang/tmp/rtgraph.jar at
> http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065
> 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created
> YarnClusterScheduler
> 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown
> hook for context org.apache.spark.SparkContext@6655cf60
>
> I can use yarn-cluster to run my app but it's not very convenient to
> monitor the progress.
>
> Standalone-cluster mode doesn't work, it reports file not found error:
>
> Driver successfully submitted as driver-20140617023956-0003
> ... waiting before polling master for driver state
> ... polling master for driver state
> State of driver-20140617023956-0003 is ERROR
> Exception from cluster was: java.io.FileNotFoundException: File
> file:/x/home/jianshuang/tmp/rtgraph.jar does not exist
>
>
> I'm using Spark 1.0.0 and my submit command looks like this:
>
>   ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph'
> --class com.paypal.rtgraph.demo.MapReduceWriter --master spark://
> lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','`
> --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster
> rtgraph.jar
>
> List of jars I put in --jars option are:
>
> accumulo-core.jar
> accumulo-fate.jar
> accumulo-minicluster.jar
> accumulo-trace.jar
> accumulo-tracer.jar
> chill_2.10-0.3.6.jar
> commons-math.jar
> commons-vfs2.jar
> config-1.2.1.jar
> gson.jar
> guava.jar
> joda-convert-1.2.jar
> joda-time-2.3.jar
> kryo-2.21.jar
> libthrift.jar
> quasiquotes_2.10-2.0.0-M8.jar
> scala-async_2.10-0.9.1.jar
> scala-library-2.10.4.jar
> scala-reflect-2.10.4.jar
>
>
> Anyone has hint what went wrong? Really confused.
>
>
> Cheers,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: Worker dies while submitting a job

2014-06-17 Thread Luis Ángel Vicente Sánchez
Ok... I was checking the wrong version of that file yesterday. My worker is
sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no case
branch for that state and the worker is crashing. I still don't know why
I'm getting a FAILED state but I'm sure that should kill the actor due to a
scala.MatchError.

Usually in scala is a best-practice to use a sealed trait and case
classes/objects in a match statement instead of an enumeration (the
compiler will complain about missing cases); I think that should be
refactored to catch this kind of errors at compile time.

Now I need to find why that state changed message is sent... I will
continue updating this thread until I found the problem :D


2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I'm playing with a modified version of the TwitterPopularTags example and
> when I tried to submit the job to my cluster, workers keep dying with this
> message:
>
> 14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
> "/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
> "-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
> "org.apache.spark.deploy.worker.DriverWrapper"
> "akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
> "org.apache.spark.examples.streaming.TwitterPopularTags"
> 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
> scala.Enumeration$Val)
> scala.MatchError: FAILED (of class scala.Enumeration$Val)
> at
> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/06/16 17:11:17 INFO Worker: Starting Spark worker
> int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
> 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
> 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
> http://int-spark-app-ie005d6a3.mclabs.io:8081
> 14/06/16 17:11:17 INFO Worker: Connecting to master
> spark://int-spark-app-ie005d6a3.mclabs.io:7077...
> 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
> re-register worker at same address: akka.tcp://
> sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676
>
> This happens when the worker receive a DriverStateChanged(driverId, state,
> exception) message.
>
> To deploy the job I copied the jar file to the temporary folder of master
> node and execute the following command:
>
> ./spark-submit \
> --class org.apache.spark.examples.streaming.TwitterPopularTags \
> --master spark://int-spark-master:7077 \
> --deploy-mode cluster \
> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>
> I don't really know what the problem could be as there is a 'case _' that
> should avoid that problem :S
>


Executors not utilized properly.

2014-06-17 Thread abhiguruvayya
I am creating around 10 executors with 12 cores and 7g memory, but when i
launch a task not all executors are being used. For example if my job has 9
tasks, only 3 executors are being used with 3 task each and i believe this
is making my app slower than map reduce program for the same use case. Can
any one throw some light on executor configuration if any?How can i use all
the executors. I am running spark on yarn and Hadoop 2.4.0.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory footprint of Calliope: Spark -> Cassandra writes

2014-06-17 Thread Andrew Ash
Gerard,

Strings in particular are very inefficient because they're stored in a
two-byte format by the JVM.  If you use the Kryo serializer and have use
StorageLevel.MEMORY_ONLY_SER then Kryo stores Strings in UTF8, which for
ASCII-like strings will take half the space.

Andrew


On Tue, Jun 17, 2014 at 8:54 AM, Gerard Maas  wrote:

> Hi Rohit,
>
> Thanks a lot for looking at this.  The intention of calculating the data
> upfront it to only benchmark the time it takes store in records/sec
> eliminating the generation factor from it (which will be different on the
> real scenario, reading from HDFS)
> I used a profiler today and indeed it's not the storage part, but the
> generation that's bloating the memory.  Objects in memory take surprisingly
> more space that one would expect based on the data they hold. In my case it
> was 2.1x the size of the original data.
>
> Now that  we are talking about this, do you have some figures of how
> Calliope compares -performance wise- to a classic Cassandra driver
> (DataStax / Astyanax) ?  that would be awesome.
>
> Thanks again!
>
> -kr, Gerard.
>
>
>
>
>
> On Tue, Jun 17, 2014 at 4:27 PM, tj opensource 
> wrote:
>
>> Dear Gerard,
>>
>> I just tried the code you posted in the gist (
>> https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a
>> OOM. It is cause of the data being generated locally and then paralellized
>> -
>>
>>
>> --
>>
>>
>>
>> val entries = for (i <- 1 to total) yield {
>>
>>
>>
>>
>>   Array(s"devy$i", "aggr", "1000", "sum", (i to i+10).mkString(","))
>>
>>
>>
>>
>> }
>>
>>
>>
>> val rdd = sc.parallelize(entries,8)
>>
>>
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> This will generate all the data on the local system and then try to
>> partition it.
>>
>> Instead, we should paralellize the keys (i <- 1 to total) and generate
>> data in the map tasks. This is *closer* to what you will get if you
>> distribute out a file on a DFS like HDFS/SnackFS.
>>
>> I have made the change in the script here (
>> https://gist.github.com/milliondreams/aac52e08953949057e7d)
>>
>>
>> --
>>
>>
>>
>> val rdd = sc.parallelize(1 to total, 8).map(i => Array(s"devy$i", 
>> "aggr", "1000", "sum", (i to i+10).mkString(",")))
>>
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> I was able to insert 50M records using just over 350M RAM. Attaching the
>> log and screenshot.
>>
>> Let me know if you still face this issue... we can do a screen share and
>> resolve thee issue there.
>>
>> And thanks for using Calliope. I hope it serves your needs.
>>
>> Cheers,
>> Rohit
>>
>>
>> On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> I've been doing some testing with Calliope as a way to do batch load
>>> from Spark into Cassandra.
>>> My initial results are promising on the performance area, but worrisome
>>> on the memory footprint side.
>>>
>>> I'm generating N records of about 50 bytes each and using the UPDATE
>>> mutator to insert them into C*.   I get OOM if my memory is below 1GB per
>>> million of records, or about 50Mb of raw data (without counting any
>>> RDD/structural overhead).  (See code [1])
>>>
>>> (so, to avoid confusions: e.g.: I need 4GB RAM to save  4M of 50Byte
>>> records to Cassandra)  That's an order of magnitude more than the RAW data.
>>>
>>> I understood that Calliope builds on top of the Hadoop support of
>>> Cassandra, which builds on top of SSTables and sstableloader.
>>>
>>> I would like to know what's the memory usage factor of Calliope and what
>>> parameters could I use to control/tune that.
>>>
>>> Any experience/advice on that?
>>>
>>>  -kr, Gerard.
>>>
>>> [1] https://gist.github.com/maasg/68de6016bffe5e71b78c
>>>
>>
>>
>


Re: Memory footprint of Calliope: Spark -> Cassandra writes

2014-06-17 Thread Gerard Maas
Hi Rohit,

Thanks a lot for looking at this.  The intention of calculating the data
upfront it to only benchmark the time it takes store in records/sec
eliminating the generation factor from it (which will be different on the
real scenario, reading from HDFS)
I used a profiler today and indeed it's not the storage part, but the
generation that's bloating the memory.  Objects in memory take surprisingly
more space that one would expect based on the data they hold. In my case it
was 2.1x the size of the original data.

Now that  we are talking about this, do you have some figures of how
Calliope compares -performance wise- to a classic Cassandra driver
(DataStax / Astyanax) ?  that would be awesome.

Thanks again!

-kr, Gerard.





On Tue, Jun 17, 2014 at 4:27 PM, tj opensource 
wrote:

> Dear Gerard,
>
> I just tried the code you posted in the gist (
> https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a
> OOM. It is cause of the data being generated locally and then paralellized
> -
>
>
> --
>
>
> val entries = for (i <- 1 to total) yield {
>
>
>   Array(s"devy$i", "aggr", "1000", "sum", (i to i+10).mkString(","))
>
>
> }
>
>
>
> val rdd = sc.parallelize(entries,8)
>
>
>
> --
>
>
>
> This will generate all the data on the local system and then try to
> partition it.
>
> Instead, we should paralellize the keys (i <- 1 to total) and generate
> data in the map tasks. This is *closer* to what you will get if you
> distribute out a file on a DFS like HDFS/SnackFS.
>
> I have made the change in the script here (
> https://gist.github.com/milliondreams/aac52e08953949057e7d)
>
>
> --
>
>
>
> val rdd = sc.parallelize(1 to total, 8).map(i => Array(s"devy$i", "aggr", 
> "1000", "sum", (i to i+10).mkString(",")))
>
>
> --
>
>
>
> I was able to insert 50M records using just over 350M RAM. Attaching the
> log and screenshot.
>
> Let me know if you still face this issue... we can do a screen share and
> resolve thee issue there.
>
> And thanks for using Calliope. I hope it serves your needs.
>
> Cheers,
> Rohit
>
>
> On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas 
> wrote:
>
>> Hi,
>>
>> I've been doing some testing with Calliope as a way to do batch load from
>> Spark into Cassandra.
>> My initial results are promising on the performance area, but worrisome
>> on the memory footprint side.
>>
>> I'm generating N records of about 50 bytes each and using the UPDATE
>> mutator to insert them into C*.   I get OOM if my memory is below 1GB per
>> million of records, or about 50Mb of raw data (without counting any
>> RDD/structural overhead).  (See code [1])
>>
>> (so, to avoid confusions: e.g.: I need 4GB RAM to save  4M of 50Byte
>> records to Cassandra)  That's an order of magnitude more than the RAW data.
>>
>> I understood that Calliope builds on top of the Hadoop support of
>> Cassandra, which builds on top of SSTables and sstableloader.
>>
>> I would like to know what's the memory usage factor of Calliope and what
>> parameters could I use to control/tune that.
>>
>> Any experience/advice on that?
>>
>>  -kr, Gerard.
>>
>> [1] https://gist.github.com/maasg/68de6016bffe5e71b78c
>>
>
>


Re: spark streaming questions

2014-06-17 Thread Anwar Rizal
On Tue, Jun 17, 2014 at 5:39 PM, Chen Song  wrote:

> Hey
>
> I am new to spark streaming and apologize if these questions have been
> asked.
>
> * In StreamingContext, reduceByKey() seems to only work on the RDDs of the
> current batch interval, not including RDDs of previous batches. Is my
> understanding correct?
>

It's correct.


>
> * If the above statement is correct, what functions to use if one wants to
> do processing on the continuous stream batches of data? I see 2 functions,
> reduceByKeyAndWindow and updateStateByKey which serve this purpose.
>

I presume that you need to keep a state that goes beyond one batch, so
multiple batches. In this case, yes, updateStateByKey is the one you will
use. Basically, updateStateByKey wraps a state into an RDD.




>
> My use case is an aggregation and doesn't fit a windowing scenario.
>
> * As for updateStateByKey, I have a few questions.
> ** Over time, will spark stage original data somewhere to replay in case
> of failures? Say the Spark job run for weeks, I am wondering how that
> sustains?
> ** Say my reduce key space is partitioned by some date field and I would
> like to stop processing old dates after a period time (this is not a simply
> windowing scenario as which date the data belongs to is not the same thing
> when the data arrives). How can I handle this to tell spark to discard data
> for old dates?
>

You will need to call checkpoint (see
http://spark.apache.org/docs/latest/streaming-programming-guide.html#rdd-checkpointing)
 that will persist the metadata of RDD that will consume memory (and stack
execution) otherwise. You can set the interval of checkpointing that suits
your need.

Now, if you want to also reset your state after some times, there is no
immediate way I can think of ,but you can do it through updateStateByKey,
maybe by book-keeping the timestamp.



>
> Thank you,
>
> Best
> Chen
>
>
>


Re: Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space

2014-06-17 Thread Sguj
Am I trying to reduce it to the minimum number of partitions, or increase the
number of partitions with that change?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735p7739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark streaming questions

2014-06-17 Thread Chen Song
Hey

I am new to spark streaming and apologize if these questions have been
asked.

* In StreamingContext, reduceByKey() seems to only work on the RDDs of the
current batch interval, not including RDDs of previous batches. Is my
understanding correct?

* If the above statement is correct, what functions to use if one wants to
do processing on the continuous stream batches of data? I see 2 functions,
reduceByKeyAndWindow and updateStateByKey which serve this purpose.

My use case is an aggregation and doesn't fit a windowing scenario.

* As for updateStateByKey, I have a few questions.
** Over time, will spark stage original data somewhere to replay in case of
failures? Say the Spark job run for weeks, I am wondering how that sustains?
** Say my reduce key space is partitioned by some date field and I would
like to stop processing old dates after a period time (this is not a simply
windowing scenario as which date the data belongs to is not the same thing
when the data arrives). How can I handle this to tell spark to discard data
for old dates?

Thank you,

Best
Chen


Re: wholeTextFiles not working with HDFS

2014-06-17 Thread Sguj
I can write one if you'll point me to where I need to write it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space

2014-06-17 Thread abhiguruvayya
Try repartitioning the RDD using coalsce(int partitions) before performing
any transforms.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735p7736.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space

2014-06-17 Thread Sguj
I've been trying to figure out how to increase the heap space for my spark
environment in 1.0.0, and all of the things I've found tell me I have export
something in Java Opts, which is deprecated in 1.0.0, or in increase the
spark.executor.memory, which is at 6G. I'm only trying to process about
400-500 mB of text, but I get this error whenever I try to collect: 

java.lang.OutOfMemoryError: Java heap space 
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:39) 
at java.nio.ByteBuffer.allocate(ByteBuffer.java:312) 
at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94) 
at
org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) 
at
org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) 
at
org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
 
at
org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128)
 
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489)
 
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487)
 
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487) 
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:481) 
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:53)
 
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
 
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) 
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
 
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
 
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
at java.lang.Thread.run(Thread.java:695) 

Any idea how to fix heap space errors in 1.0.0?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space

2014-06-17 Thread Sguj
I've been trying to figure out how to increase the heap space for my spark
environment in 1.0.0, and all of the things I've found tell me I have export
something in Java Opts, which is deprecated in 1.0.0, or in increase the
spark.executor.memory, which is at 6G. I'm only trying to process about
400-500 mB of text, but I get this error whenever I try to collect:

14/06/17 11:00:21 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 0 to sp...@salinger.ornl.gov:50251
14/06/17 11:00:21 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 0 is 165 bytes
14/06/17 11:00:35 INFO BlockManagerInfo: Added taskresult_14 in memory on
salinger.ornl.gov:50253 (size: 123.7 MB, free: 465.1 MB)
14/06/17 11:00:35 INFO BlockManagerInfo: Added taskresult_13 in memory on
salinger.ornl.gov:50253 (size: 127.7 MB, free: 337.4 MB)
14/06/17 11:00:36 ERROR Utils: Uncaught exception in thread Result resolver
thread-2
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:39)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:312)
at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94)
at
org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
at
org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
at
org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
at
org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128)
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489)
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487)
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:481)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:53)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)

Any idea how to fix heap space errors in 1.0.0?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7733.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Jacob Eisinger

Long story [1] short, akka opens up dynamic, random ports for each job [2].
So, simple NAT fails.  You might try some trickery with a DNS server and
docker's --net=host .


[1]
http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tt5384.html#none
[2]
http://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Mohit Jaggi 
To: user@spark.apache.org
Date:   06/16/2014 05:36 PM
Subject:spark with docker: errors with akka, NAT?



Hi Folks,


I am having trouble getting spark driver running in docker. If I run a
pyspark example on my mac it works but the same example on a docker image
(Via boot2docker) fails with following logs. I am pointing the spark driver
(which is running the example) to a spark cluster (driver is not part of
the cluster). I guess this has something to do with docker's networking
stack (it may be getting NAT'd) but I am not sure why (if at all) the
spark-worker or spark-master is trying to create a new TCP connection to
the driver, instead of responding on the connection initiated by the
driver.


I would appreciate any help in figuring this out.


Thanks,


Mohit.


logs


Spark Executor Command: "java" "-cp"
"::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
 "-Xms2g" "-Xmx2g" "-Xms512M" "-Xmx512M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler" "1"
"cobalt" "24" "akka.tcp://sparkWorker@:33952/user/Worker"
"app-20140616152201-0021"








log4j:WARN No appenders could be found for logger
(org.apache.hadoop.conf.Configuration).


log4j:WARN Please initialize the log4j system properly.


log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.


14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties


14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root


14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(xxx, xxx)


14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started


14/06/16 15:22:05 INFO Remoting: Starting remoting


14/06/16 15:22:06 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkExecutor@:33536]


14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@:33536]


14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler


14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@:33952/user/Worker


14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for
6 ms, all messages to this address will be delivered to dead letters.


14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@:33536] ->
[akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.



Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-17 Thread Luis Ángel Vicente Sánchez
After playing a bit, I have been able to create a fatjar this way:

lazy val rootDependencies = Seq(
  "org.apache.spark" %% "spark-core"  % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
"spark-streaming_2.10")
)

Excluding those transitive dependencies, we can create a fatjar ~400Kb
instead of 40Mb.

My problem is not to run the streaming job locally but trying to submit it
to standalone cluster using spark-submit, everytime I ran the following
command, my workers died:

~/development/tools/spark/1.0.0/bin/spark-submit \
--class "org.apache.spark.examples.streaming.TwitterPopularTags" \
--master "spark://int-spark-master:7077" \
--deploy-mode "cluster" \
file:///tmp/spark-test-0.1-SNAPSHOT.jar

I have copied my fatjar to my master /tmp folder.


2014-06-17 10:30 GMT+01:00 Michael Cutler :

> Admittedly getting Spark Streaming / Kafka working for the first time can
> be a bit tricky with the web of dependencies that get pulled in.  I've
> taken the KafkaWorkCount example from the Spark project and set up a simple
> standalone SBT project that shows you how to get it working and using
> spark-submit.
>
> *https://github.com/cotdp/spark-example-kafka
> *
>
> The key trick is in the use of sbt-assembly instead of relying on any of
> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
> as provided, because they are part of the core spark-assembly already
> running your cluster.  However "spark-streaming-kafka" is not, so you need
> to package it in your 'fat JAR' while excluding all the mess that causes
> the build to break.
>
> build.sbt
> :
>
> import AssemblyKeys._
>
> assemblySettings
>
> name := "spark-example-kafka"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>
> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
> exclude("commons-beanutils", "commons-beanutils").
> exclude("commons-collections", "commons-collections").
> exclude("com.esotericsoftware.minlog", "minlog")
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
> case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
> case x if x.startsWith("plugin.properties") => MergeStrategy.last
> case x => old(x)
>   }
> }
>
>
> You can see the "exclude()" has to go around the spark-streaming-kafka 
> dependency,
> and I've used a MergeStrategy to solve the "deduplicate: different file
> contents found in the following" errors.
>
> Build the JAR with sbt assembly and use the scripts in bin/ to run the
> examples.
>
> I'm using this same approach to run my Spark Streaming jobs with
> spark-submit and have them managed using Mesos/Marathon
>  to handle failures and restarts with long running
> processes.
>
> Good luck!
>
> MC
>
>
>
>
>
>  *Michael Cutler*
> Founder, CTO
>
>
> * Mobile: +44 789 990 7847 Email:   mich...@tumra.com 
> Web: tumra.com
>  *
> *Visit us at our offices in Chiswick Park *
> *Registered in England & Wales, 07916412. VAT No. 130595328*
>
>
> This email and any files transmitted with it are confidential and may also
> be privileged. It is intended only for the person to whom it is addressed.
> If you have received this email in error, please inform the sender 
> immediately.
> If you are not the intended recipient you must not use, disclose, copy,
> print, distribute or rely on this email.
>
>
> On 17 June 2014 02:51, Gino Bustelo  wrote:
>
>> +1 for this issue. Documentation for spark-submit are misleading. Among
>> many issues, the jar support is bad. HTTP urls do not work. This is because
>> spark is using hadoop's FileSystem class. You have to specify the jars
>> twice to get things to work. Once for the DriverWrapper to laid your
>> classes and a 2nd time in the Context to distribute to workers.
>>
>> I would like to see some contrib response to this issue.
>>
>> Gino B.
>>
>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <
>> langel.gro...@gmail.com> wrote:
>>
>> Did you manage to make it work? I'm facing similar problems and this a
>> serious blocker issue. spark-submit seems kind of broken to me if you can
>> use it for spark-streaming.
>>
>> Regards,
>>
>> Luis
>>
>>
>> 2014-06-11 1:48 GMT+01:00 lannyripple :
>>
>>> I am using Spark 1.0.0 c

Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Bharath Ravi Kumar
Hi,

(Apologies for the long mail, but it's necessary to provide sufficient
details considering the number of issues faced.)

I'm running into issues testing LogisticRegressionWithSGD a two node
cluster (each node with 24 cores and 16G available to slaves out of 24G on
the system). Here's a description of the application:

The model is being trained based on categorical features x, y, and (x,y).
The categorical features are mapped to binary features by converting each
distinct value in the category enum into a binary feature by itself (i.e
presence of that value in a record implies corresponding feature = 1, else
feature = 0. So, there'd be as many distinct features as enum values) . The
training vector is laid out as
[x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
training data has only one combination (Xk,Yk) and a label appearing in the
record. Thus, the corresponding labeledpoint sparse vector would only have
3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
(though parse) would be nearly 614000.  The number of records is about 1.33
million. The records have been coalesced into 20 partitions across two
nodes. The input data has not been cached.
(NOTE: I do realize the records & features may seem large for a two node
setup, but given the memory & cpu, and the fact that I'm willing to give up
some turnaround time, I don't see why tasks should inexplicably fail)

Additional parameters include:

spark.executor.memory = 14G
spark.default.parallelism = 1
spark.cores.max=20
spark.storage.memoryFraction=0.8 //No cache space required
(Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
either)

The model training was initialized as : new LogisticRegressionWithSGD(1,
maxIterations, 0.0, 0.05)

However, after 4 iterations of gradient descent, the entire execution
appeared to stall inexplicably. The corresponding executor details and
details of the stalled stage (number 14) are as follows:

MetricMin25th Median75th Max
Result serialization time12 ms13 ms14 ms16 ms18 ms
Duration4 s4 s5 s5 s5 s
Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
results
Scheduler delay6 s6 s6 s6 s
12 s


Stage Id
14 aggregate at GradientDescent.scala:178

Task IndexTask IDStatusLocality Level Executor
Launch TimeDurationGC Result Ser
TimeErrors

Time

0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 14 ms
10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 13 ms
12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 13 ms
14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 12 ms
16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 16 ms
19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 18 ms

Executor stats:

RDD BlocksMemory UsedDisk UsedActive TasksFailed Tasks
Complete TasksTotal TasksTask Time   

Re: wholeTextFiles not working with HDFS

2014-06-17 Thread Xusen Yin
Hi Sguj and littlebird,

I'll try to fix it tomorrow evening and the day after tomorrow, because I
am now busy preparing a talk (slides) tomorrow. Sorry for the inconvenience
to you. Would you mind to write an issue on Spark JIRA?


2014-06-17 20:55 GMT+08:00 Sguj :

> I didn't fix the issue so much as work around it. I was running my cluster
> locally, so using HDFS was just a preference. The code worked with the
> local
> file system, so that's what I'm using until I can get some help.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7726.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Best Regards
---
Xusen Yin(尹绪森)
Intel Labs China
Homepage: *http://yinxusen.github.io/ *


join operation is taking too much time

2014-06-17 Thread MEETHU MATHEW


 Hi all,

I want  to do a recursive leftOuterJoin between an RDD (created from  file) 
with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 
30 diff files in each iteration of a loop) varying from 1 to 6 million rows.
When I run it for 5 RDDs,its running successfully  in 5 minutes.But when I 
increase it to 10 or 30 RDDs its gradually slowing down and finally getting 
stuck without showing any warning or error.

I am running in standalone mode with 2 workers of 4GB each and a total of 16 
cores .

Any of you facing similar problems with JOIN  or is it a problem with my 
configuration.

Thanks & Regards, 
Meethu M

Re: wholeTextFiles not working with HDFS

2014-06-17 Thread Sguj
I didn't fix the issue so much as work around it. I was running my cluster
locally, so using HDFS was just a preference. The code worked with the local
file system, so that's what I'm using until I can get some help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p7726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Hello,

I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
the number of training examples used in the evaluation is just 1,000.

It works fine for the dataset *news20.binary.1000* that has 178,560
features. However, it does not work for *news20.random.1000* where # of
features is large  (1,354,731 features) though we used a sparse vector
through MLUtils.loadLibSVMFile().

The execution seems not progressing while no error is reported in the
spark-shell as well as in the stdout/stderr of executors.

We used 32 executors with each allocating 7GB (2GB is for RDD) for
working memory.

Any suggesions? Your help is really appreciated.

==
Executed code
==
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc,
"hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
multiclass=false)
val training = MLUtils.loadLibSVMFile(sc,
"hdfs://host:8020/dataset/news20-binary/news20.random.1000",
multiclass=false)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)

==
The dataset used in the evaluation
==

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

$ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
news20.binary.1000
$ sort -R news20.binary > news20.random
$ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
news20.random.1000

You can find the dataset in
https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


Thanks,
Makoto


Re: Yarn-client mode and standalone-client mode hang during job start

2014-06-17 Thread Jianshi Huang
For standalone-cluster mode, there's a scala.MatchError.

Also it looks like the --jars configurations are not passed to the
driver/worker node? (also copying from file:/ doesn't seem correct,
shouldn't it copy form http:/// ?)

...
14/06/17 04:15:30 INFO Worker: Asked to launch driver
driver-20140617041530-
14/06/17 04:15:30 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/06/17 04:15:30 INFO DriverRunner: Copying user jar
file:/x/home/jianshuang/tmp/rtgraph.jar to
/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/work/driver-20140617041530-/rtgraph.jar
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
14/06/17 04:15:30 INFO DriverRunner: Launch Command:
"/usr/java/jdk1.7.0_40/bin/java" "-cp"
"/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/work/driver-20140617041530-/rtgraph.jar:::/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/conf:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/spark-assembly-1.0.0-hadoop2.4.0.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-api-jdo-3.2.1.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-core-3.2.2.jar:/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0/lib/datanucleus-rdbms-3.2.1.jar:/etc/hadoop/conf:/usr/lib/hadoop-yarn/conf"
"-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
"org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
sparkwor...@lvshdc5dn0321.lvs.paypal.com:41987/user/Worker"
"com.paypal.rtgraph.demo.MapReduceWriter"
14/06/17 04:15:32 ERROR OneForOneStrategy: FAILED (of class
scala.Enumeration$Val)
scala.MatchError: FAILED (of class scala.Enumeration$Val)
at
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/06/17 04:15:32 INFO Worker: Starting Spark worker
lvshdc5dn0321.lvs.paypal.com:41987 with 32 cores, 125.0 GB RAM
14/06/17 04:15:32 INFO Worker: Spark home:
/x/home/jianshuang/spark/spark-1.0.0-hadoop2.4.0
14/06/17 04:15:32 INFO WorkerWebUI: Started WorkerWebUI at
http://lvshdc5dn0321.lvs.paypal.com:8081
14/06/17 04:15:32 INFO Worker: Connecting to master
spark://lvshdc5en0015.lvs.paypal.com:7077...
14/06/17 04:15:32 ERROR Worker: Worker registration failed: Attempted to
re-register worker at same address: akka.tcp://
sparkwor...@lvshdc5dn0321.lvs.paypal.com:41987


Is that a bug?

Jianshi




On Tue, Jun 17, 2014 at 5:41 PM, Jianshi Huang 
wrote:

> Hi,
>
> I've stuck using either yarn-client or standalone-client mode. Either will
> stuck when I submit jobs, the last messages it printed were:
>
> ...
> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
> file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at
> http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp
> 1402997837065
> 14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
> file:/x/home/jianshuang/tmp/rtgraph.jar at
> http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065
> 14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created
> YarnClusterScheduler
> 14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown
> hook for context org.apache.spark.SparkContext@6655cf60
>
> I can use yarn-cluster to run my app but it's not very convenient to
> monitor the progress.
>
> Standalone-cluster mode doesn't work, it reports file not found error:
>
> Driver successfully submitted as driver-20140617023956-0003
> ... waiting before polling master for driver state
> ... polling master for driver state
> State of driver-20140617023956-0003 is ERROR
> Exception from cluster was: java.io.FileNotFoundException: File
> file:/x/home/jianshuang/tmp/rtgraph.jar does not exist
>
>
> I'm using Spark 1.0.0 and my submit command looks like this:
>
>   ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph'
> --class com.paypal.rtgraph.demo.MapReduceWriter --master spark://
> lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','`
> --executor-memory 20G --total-executor-cores 96 --deploy-mode cluster
> rtgraph.jar
>
> List of jars I put in --jars option are:
>
> accumulo-core.jar
> accumulo-fate.jar
> accumulo-minicluster.jar
> accumulo-trace.jar
> accumulo-tracer.jar
> chill_2.10-0.3.6.jar
> commons-math.jar
> commons-vfs2.jar
> config-1.2.1.ja

Re: Spark sql unable to connect to db2 hive metastore

2014-06-17 Thread Michael Armbrust
First a clarification:  Spark SQL does not talk to HiveServer2, as that
JDBC interface is for retrieving results from queries that are executed
using Hive.  Instead Spark SQL will execute queries itself by directly
accessing your data using Spark.

Spark SQL's Hive module can use JDBC to connect to an external metastore,
in your case DB2. This is only used to retrieve the metadata (i.e., column
names and types, HDFS locations for data)

Looking at your exception I still see "java.sql.SQLException: No suitable
driver", so my guess would be that the DB2 JDBC drivers are not being
correctly included.  How are you trying to add them to the classpath?

Michael


On Tue, Jun 17, 2014 at 1:29 AM, Jenny Zhao  wrote:

>
> Hi,
>
> my hive configuration use db2 as it's metastore database, I have built
> spark with the extra step sbt/sbt assembly/assembly to include the
> dependency jars. and copied HIVE_HOME/conf/hive-site.xml under spark/conf.
> when I ran :
>
> hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>
> got following exception, pasted portion of the stack trace here, looking
> at the stack, this made me wondering if Spark supports remote metastore
> configuration, it seems spark doesn't talk to hiveserver2 directly?  the
> driver jars: db2jcc-10.5.jar, db2jcc_license_cisuz-10.5.jar both are
> included in the classpath, otherwise, it will complain it couldn't find the
> driver.
>
> Appreciate any help to resolve it.
>
> Thanks!
>
> Caused by: java.sql.SQLException: Unable to open a test connection to the
> given database. JDBC url = jdbc:db2://localhost:50001/BIDB, username =
> catalog. Terminating connection pool. Original Exception: --
> java.sql.SQLException: No suitable driver
> at java.sql.DriverManager.getConnection(DriverManager.java:422)
> at java.sql.DriverManager.getConnection(DriverManager.java:374)
> at
> com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:254)
> at com.jolbox.bonecp.BoneCP.(BoneCP.java:305)
> at
> com.jolbox.bonecp.BoneCPDataSource.maybeInit(BoneCPDataSource.java:150)
> at
> com.jolbox.bonecp.BoneCPDataSource.getConnection(BoneCPDataSource.java:112)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:479)
> at
> org.datanucleus.store.rdbms.RDBMSStoreManager.(RDBMSStoreManager.java:304)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:56)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:39)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:527)
> at
> org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
> at
> org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
> at
> org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1069)
> at
> org.datanucleus.NucleusContext.initialise(NucleusContext.java:359)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:768)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:326)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:195)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
> at java.lang.reflect.Method.invoke(Method.java:611)
> at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
> at
> java.security.AccessController.doPrivileged(AccessController.java:277)
> at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
> at
> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
> at
> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
> at
> javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at
> 

Yarn-client mode and standalone-client mode hang during job start

2014-06-17 Thread Jianshi Huang
Hi,

I've stuck using either yarn-client or standalone-client mode. Either will
stuck when I submit jobs, the last messages it printed were:

...
14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
file:/x/home/jianshuang/tmp/lib/commons-vfs2.jar at
http://10.196.195.25:56377/jars/commons-vfs2.jar with timestamp
1402997837065
14/06/17 02:37:17 INFO spark.SparkContext: Added JAR
file:/x/home/jianshuang/tmp/rtgraph.jar at
http://10.196.195.25:56377/jars/rtgraph.jar with timestamp 1402997837065
14/06/17 02:37:17 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
14/06/17 02:37:17 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook
for context org.apache.spark.SparkContext@6655cf60

I can use yarn-cluster to run my app but it's not very convenient to
monitor the progress.

Standalone-cluster mode doesn't work, it reports file not found error:

Driver successfully submitted as driver-20140617023956-0003
... waiting before polling master for driver state
... polling master for driver state
State of driver-20140617023956-0003 is ERROR
Exception from cluster was: java.io.FileNotFoundException: File
file:/x/home/jianshuang/tmp/rtgraph.jar does not exist


I'm using Spark 1.0.0 and my submit command looks like this:

  ~/spark/spark-1.0.0-hadoop2.4.0/bin/spark-submit --name 'rtgraph' --class
com.paypal.rtgraph.demo.MapReduceWriter --master spark://
lvshdc5en0015.lvs.paypal.com:7077 --jars `find lib -type f | tr '\n' ','`
--executor-memory 20G --total-executor-cores 96 --deploy-mode cluster
rtgraph.jar

List of jars I put in --jars option are:

accumulo-core.jar
accumulo-fate.jar
accumulo-minicluster.jar
accumulo-trace.jar
accumulo-tracer.jar
chill_2.10-0.3.6.jar
commons-math.jar
commons-vfs2.jar
config-1.2.1.jar
gson.jar
guava.jar
joda-convert-1.2.jar
joda-time-2.3.jar
kryo-2.21.jar
libthrift.jar
quasiquotes_2.10-2.0.0-M8.jar
scala-async_2.10-0.9.1.jar
scala-library-2.10.4.jar
scala-reflect-2.10.4.jar


Anyone has hint what went wrong? Really confused.


Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


  1   2   >