Re: How to create an empty RDD with a given type?

2015-01-12 Thread Xuelin Cao
Got it, thanks!

On Tue, Jan 13, 2015 at 2:00 PM, Justin Yip  wrote:

> Xuelin,
>
> There is a function called emtpyRDD under spark context
> 
>  which
> serves this purpose.
>
> Justin
>
> On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao 
> wrote:
>
>>
>>
>> Hi,
>>
>> I'd like to create a transform function, that convert RDD[String] to
>> RDD[Int]
>>
>> Occasionally, the input RDD could be an empty RDD. I just want to
>> directly create an empty RDD[Int] if the input RDD is empty. And, I don't
>> want to return None as the result.
>>
>> Is there an easy way to do that?
>>
>>
>>
>


Re: Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-12 Thread lihu
How about your scene? do you need use lots of Broadcast? If not, It will be
better to focus more on other thing.

At this time, there is not more better method than TorrentBroadcast. Though
one-by-one, but after one node get the data, it can act as the data source
immediately.


Running Spark application from command line

2015-01-12 Thread Arun Lists
I have a Spark application that was assembled using sbt 0.13.7, Scala 2.11,
and Spark 1.2.0. In build.sbt, I am running on Mac OSX Yosemite.

I use "provided" for the Spark dependencies. I can run the application fine
within sbt.

I run into problems when I try to run it from the command line. Here is the
command I use:

ADD_JARS=analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar scala -cp
/Applications/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:analysis/target/scala-2.11/dtex-analysis_2.11-0.1.jar
com.dtex.analysis.transform.GenUserSummaryView ...

I get the following error messages below. Please advise what I can do to
resolve this issue. Thanks!

arun
15/01/12 22:47:18 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/01/12 22:47:18 WARN BlockManager: Putting block broadcast_0 failed

java.lang.NoSuchMethodError:
scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object;

at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:84)

at
org.apache.spark.util.collection.SizeTracker$class.resetSamples(SizeTracker.scala:61)

at
org.apache.spark.util.collection.SizeTrackingVector.resetSamples(SizeTrackingVector.scala:25)

at
org.apache.spark.util.collection.SizeTracker$class.$init$(SizeTracker.scala:51)

at
org.apache.spark.util.collection.SizeTrackingVector.(SizeTrackingVector.scala:25)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)

at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)

at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)

at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)

at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)

at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:695)

at org.apache.spark.SparkContext.textFile(SparkContext.scala:540)

at
com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97)

at
com.dtex.analysis.transform.TransformUtils$anonfun$2.apply(TransformUtils.scala:97)

at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245)

at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:245)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)

at
com.dtex.analysis.transform.TransformUtils$.generateUserSummaryData(TransformUtils.scala:97)

at
com.dtex.analysis.transform.GenUserSummaryView$.main(GenUserSummaryView.scala:77)

at
com.dtex.analysis.transform.GenUserSummaryView.main(GenUserSummaryView.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
scala.reflect.internal.util.ScalaClassLoader$anonfun$run$1.apply(ScalaClassLoader.scala:70)

at
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)

at
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:101)

at
scala.reflect.internal.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:70)

at
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:101)

at scala.tools.nsc.CommonRunner$class.run(ObjectRunner.scala:22)

at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:39)

at scala.tools.nsc.CommonRunner$class.runAndCatch(ObjectRunner.scala:29)

at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:39)

at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:65)

at scala.tools.nsc.MainGenericRunner.run$1(MainGenericRunner.scala:87)

at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:98)

at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:103)

at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)


Creating RDD from only few columns of a Parquet file

2015-01-12 Thread Ajay Srivastava
Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile("people.parquet")

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay


Re: How to create an empty RDD with a given type?

2015-01-12 Thread Justin Yip
Xuelin,

There is a function called emtpyRDD under spark context

which
serves this purpose.

Justin

On Mon, Jan 12, 2015 at 9:50 PM, Xuelin Cao  wrote:

>
>
> Hi,
>
> I'd like to create a transform function, that convert RDD[String] to
> RDD[Int]
>
> Occasionally, the input RDD could be an empty RDD. I just want to
> directly create an empty RDD[Int] if the input RDD is empty. And, I don't
> want to return None as the result.
>
> Is there an easy way to do that?
>
>
>


How to create an empty RDD with a given type?

2015-01-12 Thread Xuelin Cao
Hi,

I'd like to create a transform function, that convert RDD[String] to
RDD[Int]

Occasionally, the input RDD could be an empty RDD. I just want to
directly create an empty RDD[Int] if the input RDD is empty. And, I don't
want to return None as the result.

Is there an easy way to do that?


Re: quickly counting the number of rows in a partition?

2015-01-12 Thread Kevin Burton
Doesn’t that just read in all the values?  The count isn’t pre-computed?
It’s not the end of the world if it’s not but would be faster.

On Mon, Jan 12, 2015 at 8:09 PM, Ganelin, Ilya 
wrote:

>  Use the mapPartitions function. It returns an iterator to each
> partition. Then just get that length by converting to an array.
>
>
>
> Sent with Good (www.good.com)
>
>
>
> -Original Message-
> *From: *Kevin Burton [bur...@spinn3r.com]
> *Sent: *Monday, January 12, 2015 09:55 PM Eastern Standard Time
> *To: *user@spark.apache.org
> *Subject: *quickly counting the number of rows in a partition?
>
> Is there a way to compute the total number of records in each RDD
> partition?
>
> So say I had 4 partitions.. I’d want to have
>
> partition 0: 100 records
> partition 1: 104 records
> partition 2: 90 records
> partition 3: 140 records
>
> Kevin
>
> --
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>  
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed.  If the reader of this message is not the
> intended recipient, you are hereby notified that any review,
> retransmission, dissemination, distribution, copying or other use of, or
> taking of any action in reliance upon this information is strictly
> prohibited. If you have received this communication in error, please
> contact the sender and delete the material from your computer.
>



-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile




Re: Problem with building spark-1.2.0

2015-01-12 Thread Rapelly Kartheek
Yes, this proxy problem is resolved.


*how your build refers tohttps://github.com/ScrapCodes/sbt-pom-reader.git
  I don't see thisrepo
the project code base.*
I manually downloaded the sbt-pom-reader directory and moved into
.sbt/0.13/staging/*/ directory. But, I face the following:

karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION = 2.3.0 sbt/sbt assembly
Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from
/home/karthik/spark-1.2.0/project/project
[info] Loading project definition from
/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project
[warn] Multiple resolvers having different access mechanism configured with
same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate
project resolvers (`resolvers`) or rename publishing resolver (`publishTo`).
[info] Updating
{file:/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader/project/}sbt-pom-reader-build...
[info] Resolving com.typesafe.sbt#sbt-ghpages;0.5.2 ...

Could you please tell me how do I build stand-alone spark-1.2.0 with sbt
correctly?

On Mon, Jan 12, 2015 at 4:21 PM, Sean Owen  wrote:

> The problem is there in the logs. When it went to clone some code,
> something went wrong with the proxy:
>
> Received HTTP code 407 from proxy after CONNECT
>
> Probably you have an HTTP proxy and you have not authenticated. It's
> specific to your environment.
>
> Although it's unrelated, I'm curious how your build refers to
> https://github.com/ScrapCodes/sbt-pom-reader.git  I don't see this
> repo the project code base.
>
> On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R 
> wrote:
> > Hi,
> > This is what I am trying to do:
> >
> > karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
> > Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
> > Note, this will be overridden by -java-home if it is set.
> > [info] Loading project definition from
> > /home/karthik/spark-1.2.0/project/project
> > Cloning into
> > '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
> > fatal: unable to access '
> https://github.com/ScrapCodes/sbt-pom-reader.git/':
> > Received HTTP code 407 from proxy after CONNECT
> > java.lang.RuntimeException: Nonzero exit code (128): git clone
> > https://github.com/ScrapCodes/sbt-pom-reader.git
>


Re: quickly counting the number of rows in a partition?

2015-01-12 Thread Sven Krasser
Yes, using mapPartitionsWithIndex, e.g. in PySpark:

>>> sc.parallelize(xrange(0,1000), 4).mapPartitionsWithIndex(lambda
idx,iter: ((idx, len(list(iter))),)).collect()
[(0, 250), (1, 250), (2, 250), (3, 250)]

(This is not the most efficient way to get the length of an iterator, but
you get the idea...)

Best,
-Sven

On Mon, Jan 12, 2015 at 6:54 PM, Kevin Burton  wrote:

> Is there a way to compute the total number of records in each RDD
> partition?
>
> So say I had 4 partitions.. I’d want to have
>
> partition 0: 100 records
> partition 1: 104 records
> partition 2: 90 records
> partition 3: 140 records
>
> Kevin
>
> --
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
> 
>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


quickly counting the number of rows in a partition?

2015-01-12 Thread Kevin Burton
Is there a way to compute the total number of records in each RDD partition?

So say I had 4 partitions.. I’d want to have

partition 0: 100 records
partition 1: 104 records
partition 2: 90 records
partition 3: 140 records

Kevin

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile




Re: Trouble with large Yarn job

2015-01-12 Thread Sven Krasser
Anders,

This could be related to this open ticket:
https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also
fixed that for us as a stopgap.

Best,
-Sven


On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg  wrote:

> Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
> actually been able to solve the problem finally, and it seems to be an
> issue with too many partitions. The repartitioning I tried initially did so
> after the union, and then it's too late. By repartitioning as early as
> possible, and significantly reducing number of partitions (going from
> 100,000+ to ~6,000 partitions), the job succeeds and no more "Error
> communicating with MapOutputTracker" issues. Seems like an issue with
> handling too many partitions and executors as the same time.
>
> Would be awesome with an "auto-repartition" function, that checks sizes of
> existing partitions and compares with the HDFS block size. If too small (or
> too large), it would repartition to partition sizes similar to the block
> size...
>
> Hope this help others with similar issues.
>
> Best,
> Anders
>
> On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza 
> wrote:
>
>> Hi Anders,
>>
>> Have you checked your NodeManager logs to make sure YARN isn't killing
>> executors for exceeding memory limits?
>>
>> -Sandy
>>
>> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg  wrote:
>>
>>> Hey,
>>>
>>> I have a job that keeps failing if too much data is processed, and I
>>> can't see how to get it working. I've tried repartitioning with more
>>> partitions and increasing amount of memory for the executors (now about 12G
>>> and 400 executors. Here is a snippets of the first part of the code, which
>>> succeeds without any problems:
>>>
>>> val all_days = sc.union(
>>>   ds.dateInterval(startDate, date).map(date =>
>>> sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>>>   .map(s => (
>>> (s.getUsername, s.getTrackUri),
>>> UserItemData(s.getUsername, s.getTrackUri,
>>>   build_vector1(date, s),
>>>   build_vector2(s
>>>   )
>>> )
>>>   .reduceByKey(sum_vectors)
>>>
>>> I want to process 30 days of data or more, but am only able to process
>>> about 10 days. If having more days of data (lower startDate in code
>>> above), the union above succeeds but the code below fails with "Error
>>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
>>> for more detailed error messages). Here is a snippet of the code that fails:
>>>
>>> val top_tracks = all_days.map(t => (t._1._2.toString, 1)).
>>> reduceByKey(_+_)
>>>   .filter(trackFilter)
>>>   .repartition(4)
>>>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
>>>
>>> val observation_data = all_days
>>>   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>>>   .join(top_tracks)
>>>
>>> The calculation of top_tracks works, but the last mapPartitions task
>>> fails with given error message if given more than 10 days of data. Also
>>> tried increasing the spark.akka.askTimeout setting, but it still fails
>>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
>>> and the kryo serialization.
>>>
>>> Realize that this is a rather long message, but I'm stuck and would
>>> appreciate any help or clues for resolving this issue. Seems to be a
>>> out-of-memory issue, but it does not seems to help to increase the number
>>> of partitions.
>>>
>>> Thanks,
>>> Anders
>>>
>>
>>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: OOM exception during row deserialization

2015-01-12 Thread Sven Krasser
Hey Pala,

I also find it very hard to get to the bottom of memory issues such as this
one based on what's in the logs (so if you come up with some findings, then
please share here). In the interim, here are a few things you can try:

   - Provision more memory per executor. While in theory (and depending on
   your storage level) data can be spilled to disk or recomputed from lineage
   if it doesn't fit into memory, I have experienced a lot of problems with
   failing jobs when underprovisioning memory.
   - Experiment with both the memory and shuffle fractions.
   - Repartition your data so that you get smaller tasks.

As far as object size goes, since your issue occurs on deserialization, you
could compute the size on the map side and roll it up into a histogram.

Hope this helps!

-Sven



On Mon, Jan 12, 2015 at 2:48 PM, Pala M Muthaia  wrote:

> Does anybody have insight on this? Thanks.
>
> On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia <
> mchett...@rocketfuelinc.com> wrote:
>
>> Hi,
>>
>> I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during
>> a join step.
>>
>> Basically, i have a RDD of rows, that i am joining with another RDD of
>> tuples.
>>
>> Some of the tasks succeed but a fair number failed with OOM exception
>> with stack below. The stack belongs to the 'reducer' that is reading
>> shuffle output from the 'mapper'.
>>
>> My question is what's the object being deserialized here - just a portion
>> of an RDD or the whole RDD partition assigned to current reducer? The rows
>> in the RDD could be large, but definitely not something that would run to
>> 100s of MBs in size, and thus run out of memory.
>>
>> Also, is there a way to determine size of the object being deserialized
>> that results in the error (either by looking at some staging hdfs dir or
>> logs)?
>>
>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
>> exceeded}
>> java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
>> java.lang.StringBuilder.append(StringBuilder.java:204)
>> java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
>> java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
>> java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
>> java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> java.util.ArrayList.readObject(ArrayList.java:771)
>> sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
>>
>>
>>
>> Thanks,
>> pala
>>
>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Is there any Spark implementation for Item-based Collaborative Filtering?

2015-01-12 Thread Simon Chan
Also a ready-to-use server with Spark MLlib:
http://docs.prediction.io/recommendation/quickstart/

The source code is here:
https://github.com/PredictionIO/PredictionIO/tree/develop/templates/scala-parallel-recommendation


Simon

On Sun, Nov 30, 2014 at 12:17 PM, Pat Ferrel  wrote:

> Actually the spark-itemsimilarity job and related code in the Spark module
> of Mahout creates all-pairs similarity too. It’s designed to use with a
> search engine, which provides the query part of the recommender. Integrate
> the two and you have a near realtime scalable item-based/cooccurrence
> collaborative filtering type recommender.
>
>
> On Nov 30, 2014, at 12:09 PM, Sean Owen  wrote:
>
> There is an implementation of all-pairs similarity. Have a look at the
> DIMSUM implementation in RowMatrix. It is an element of what you would
> need for such a recommender, but not the whole thing.
>
> You can also do the model-building part of an ALS-based recommender
> with ALS in MLlib.
>
> So, no not directly, but there are related pieces.
>
> On Sun, Nov 30, 2014 at 5:36 PM, shahab  wrote:
> > Hi,
> >
> > I just wonder if there is any implementation for Item-based Collaborative
> > Filtering in Spark?
> >
> > best,
> > /Shahab
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Manually trigger RDD map function without action

2015-01-12 Thread Sven Krasser
Hey Kevin,

I assume you want to trigger the map() for a side effect (since you don't
care about the result). To Cody's point, you can use foreach() *instead* of
map(). So instead of e.g. x.map(a => foo(a)).foreach(a => a), you'd run
x.foreach(a => foo(a)).

Best,
-Sven

On Mon, Jan 12, 2015 at 5:13 PM, Kevin Jung  wrote:

> Cody said "If you don't care about the value that your map produced
> (because
> you're not already collecting or saving it), then is foreach more
> appropriate to what you're doing?" but I can not see it from this thread.
> Anyway, I performed small benchmark to test what function is the most
> efficient way. And a winner is foreach(a => a) according to everyone's
> expectations. Collect can cause OOM from driver and count is very slower
> than the others. Thanks all.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: train many decision tress with a single spark job

2015-01-12 Thread Josh Buffum
You are right... my code example doesn't work :)

I actually do want a decision tree per user. So, for 1 million users, I
want 1 million trees. We're training against time series data, so there are
still quite a few data points per users. My previous message where I
mentioned RDDs with no length was, I think, a result of the way the random
partitioning worked (I was partitioning into N groups where N was the
number of users... total).

Given this, I'm thinking the mlllib is not designed for this particular
case? It appears optimized for training across large datasets. I was just
hoping to leverage it since creating my feature sets for the users was
already in Spark.


On Mon, Jan 12, 2015 at 5:05 PM, Sean Owen  wrote:

> A model partitioned by users?
>
> I mean that if you have a million users surely you don't mean to build a
> million models. There would be little data per user right? Sounds like you
> have 0 sometimes.
>
> You would typically be generalizing across users not examining them in
> isolation. Models are built on thousands or millions of data points.
>
> I assumed you were subsetting for cross validation in which case we are
> talking about making more like say 10 models. You usually take random
> subsets. But it might be as fine to subset as a function of a user ID if
> you like. Or maybe you do have some reason for segregating users and
> modeling them differently (e.g. different geographies or something).
>
> Your code doesn't work as is since you are using RDDs inside RDDs. But I
> am also not sure you should do what it looks like you are trying to do.
> On Jan 13, 2015 12:32 AM, "Josh Buffum"  wrote:
>
>> Sean,
>>
>> Thanks for the response. Is there some subtle difference between one
>> model partitioned by N users or N models per each 1 user? I think I'm
>> missing something with your question.
>>
>> Looping through the RDD filtering one user at a time would certainly give
>> me the response that I am hoping for (i.e a map of user => decisiontree),
>> however, that seems like it would yield poor performance? The userIDs are
>> not integers, so I either need to iterator through some in-memory array of
>> them (could be quite large) or have some distributed lookup table. Neither
>> seem great.
>>
>> I tried the random split thing. I wonder if I did something wrong there,
>> but some of the splits got RDDs with 0 tuples and some got RDDs with > 1
>> tuple. I guess that's to be expected with some random distribution?
>> However, that won't work for me since it breaks the "one tree per user"
>> thing. I guess I could randomly distribute user IDs and then do the "scan
>> everything and filter" step...
>>
>> How bad of an idea is it to do:
>>
>> data.groupByKey.map( kvp => {
>>   val (key, data) = kvp
>>   val tree = DecisionTree.train( sc.makeRDD(data), ... )
>>   (key, tree)
>> })
>>
>> Is there a way I could tell spark not to distribute the RDD created by
>> sc.makeRDD(data) but just to deal with it on whatever spark worker is
>> handling kvp? Does that question make sense?
>>
>> Thanks!
>>
>> Josh
>>
>> On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen  wrote:
>>
>>> You just mean you want to divide the data set into N subsets, and do
>>> that dividing by user, not make one model per user right?
>>>
>>> I suppose you could filter the source RDD N times, and build a model
>>> for each resulting subset. This can be parallelized on the driver. For
>>> example let's say you divide into N subsets depending on the value of
>>> the user ID modulo N:
>>>
>>> val N = ...
>>> (0 until N).par.map(d => DecisionTree.train(data.filter(_.userID % N
>>> == d), ...))
>>>
>>> data should be cache()-ed here of course.
>>>
>>> However it may be faster and more principled to take random subsets
>>> directly:
>>>
>>> data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =>
>>> DecisionTree.train(subset, ...))
>>>
>>> On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum  wrote:
>>> > I've got a data set of activity by user. For each user, I'd like to
>>> train a
>>> > decision tree model. I currently have the feature creation step
>>> implemented
>>> > in Spark and would naturally like to use mllib's decision tree model.
>>> > However, it looks like the decision tree model expects the whole RDD
>>> and
>>> > will train a single tree.
>>> >
>>> > Can I split the RDD by user (i.e. groupByKey) and then call the
>>> > DecisionTree.trainClassifer in a reduce() or aggregate function to
>>> create a
>>> > RDD[DecisionTreeModels]? Maybe train the model with an in-memory
>>> dataset
>>> > instead of an RDD? Call sc.parallelize on the Iterable values in a
>>> groupBy
>>> > to create a mini-RDD?
>>> >
>>> > Has anyone else tried something like this with success?
>>> >
>>> > Thanks!
>>>
>>
>>


Re: Broadcast joins on RDD

2015-01-12 Thread Reza Zadeh
First, you should collect().toMap() the small RDD, then you should use
broadcast followed by a map to do a map-side join

(slide
10 has an example).

Spark SQL also does it by default for tables that are smaller than the
spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is
really small, but you can bump this up with set
spark.sql.autoBroadcastJoinThreshold=100 for example).

On Mon, Jan 12, 2015 at 3:15 PM, Pala M Muthaia  wrote:

> Hi,
>
>
> How do i do broadcast/map join on RDDs? I have a large RDD that i want to
> inner join with a small RDD. Instead of having the large RDD repartitioned
> and shuffled for join, i would rather send a copy of a small RDD to each
> task, and then perform the join locally.
>
> How would i specify this in Spark code? I didn't find much documentation
> online. I attempted to create a broadcast variable out of the small RDD and
> then access that in the join operator:
>
> largeRdd.join(smallRddBroadCastVar.value)
>
> but that didn't work as expected ( I found that all rows with same key
> were on same task)
>
> I am using Spark version 1.0.1
>
>
> Thanks,
> pala
>
>
>


Re: Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Cody said "If you don't care about the value that your map produced (because
you're not already collecting or saving it), then is foreach more
appropriate to what you're doing?" but I can not see it from this thread.
Anyway, I performed small benchmark to test what function is the most
efficient way. And a winner is foreach(a => a) according to everyone's
expectations. Collect can cause OOM from driver and count is very slower
than the others. Thanks all.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: train many decision tress with a single spark job

2015-01-12 Thread Sean Owen
A model partitioned by users?

I mean that if you have a million users surely you don't mean to build a
million models. There would be little data per user right? Sounds like you
have 0 sometimes.

You would typically be generalizing across users not examining them in
isolation. Models are built on thousands or millions of data points.

I assumed you were subsetting for cross validation in which case we are
talking about making more like say 10 models. You usually take random
subsets. But it might be as fine to subset as a function of a user ID if
you like. Or maybe you do have some reason for segregating users and
modeling them differently (e.g. different geographies or something).

Your code doesn't work as is since you are using RDDs inside RDDs. But I am
also not sure you should do what it looks like you are trying to do.
On Jan 13, 2015 12:32 AM, "Josh Buffum"  wrote:

> Sean,
>
> Thanks for the response. Is there some subtle difference between one model
> partitioned by N users or N models per each 1 user? I think I'm missing
> something with your question.
>
> Looping through the RDD filtering one user at a time would certainly give
> me the response that I am hoping for (i.e a map of user => decisiontree),
> however, that seems like it would yield poor performance? The userIDs are
> not integers, so I either need to iterator through some in-memory array of
> them (could be quite large) or have some distributed lookup table. Neither
> seem great.
>
> I tried the random split thing. I wonder if I did something wrong there,
> but some of the splits got RDDs with 0 tuples and some got RDDs with > 1
> tuple. I guess that's to be expected with some random distribution?
> However, that won't work for me since it breaks the "one tree per user"
> thing. I guess I could randomly distribute user IDs and then do the "scan
> everything and filter" step...
>
> How bad of an idea is it to do:
>
> data.groupByKey.map( kvp => {
>   val (key, data) = kvp
>   val tree = DecisionTree.train( sc.makeRDD(data), ... )
>   (key, tree)
> })
>
> Is there a way I could tell spark not to distribute the RDD created by
> sc.makeRDD(data) but just to deal with it on whatever spark worker is
> handling kvp? Does that question make sense?
>
> Thanks!
>
> Josh
>
> On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen  wrote:
>
>> You just mean you want to divide the data set into N subsets, and do
>> that dividing by user, not make one model per user right?
>>
>> I suppose you could filter the source RDD N times, and build a model
>> for each resulting subset. This can be parallelized on the driver. For
>> example let's say you divide into N subsets depending on the value of
>> the user ID modulo N:
>>
>> val N = ...
>> (0 until N).par.map(d => DecisionTree.train(data.filter(_.userID % N
>> == d), ...))
>>
>> data should be cache()-ed here of course.
>>
>> However it may be faster and more principled to take random subsets
>> directly:
>>
>> data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =>
>> DecisionTree.train(subset, ...))
>>
>> On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum  wrote:
>> > I've got a data set of activity by user. For each user, I'd like to
>> train a
>> > decision tree model. I currently have the feature creation step
>> implemented
>> > in Spark and would naturally like to use mllib's decision tree model.
>> > However, it looks like the decision tree model expects the whole RDD and
>> > will train a single tree.
>> >
>> > Can I split the RDD by user (i.e. groupByKey) and then call the
>> > DecisionTree.trainClassifer in a reduce() or aggregate function to
>> create a
>> > RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset
>> > instead of an RDD? Call sc.parallelize on the Iterable values in a
>> groupBy
>> > to create a mini-RDD?
>> >
>> > Has anyone else tried something like this with success?
>> >
>> > Thanks!
>>
>
>


Re: [mllib] GradientDescent requires huge memory for storing weight vector

2015-01-12 Thread Reza Zadeh
I guess you're not using too many features (e.g. < 10m), just that hashing
the index makes it look that way, is that correct?

If so, the simple dictionary that maps your feature index -> rank can be
broadcast and used everywhere, so you can pass mllib just the feature's
rank as its index.

Reza

On Mon, Jan 12, 2015 at 4:26 PM, Tianshuo Deng 
wrote:

> Hi,
> Currently in GradientDescent.scala, weights is constructed as a dense
> vector:
>
> initialWeights = Vectors.dense(new Array[Double](numFeatures))
>
> And the numFeatures is determined in the loadLibSVMFile as the max index
> of features.
>
> But in the case of using hash function to compute feature index, it
> results in a huge dense vector being generated taking lots of memory space.
>
> Any suggestions?
>
>


Re: Shuffle Problems in 1.2.0

2015-01-12 Thread Sven Krasser
I've filed a ticket for this issue here:
https://issues.apache.org/jira/browse/SPARK-5209. (This reproduces the
problem on a smaller cluster size.)
-Sven

On Wed, Jan 7, 2015 at 11:13 AM, Sven Krasser  wrote:
> Could you try it on AWS using EMR? That'd give you an exact replica of the 
> environment that causes the error.
>
> Sent from my iPhone
>
>> On Jan 7, 2015, at 10:53 AM, Davies Liu  wrote:
>>
>> Hey Sven,
>>
>> I tried with all of your configurations, 2 node with 2 executors each,
>> but in standalone mode,
>> it worked fine.
>>
>> Could you try to narrow down the possible change of configurations?
>>
>> Davies
>>
>>> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser  wrote:
>>> Hey Davies,
>>>
>>> Here are some more details on a configuration that causes this error for me.
>>> Launch an AWS Spark EMR cluster as follows:
>>>
>>> aws emr create-cluster --region us-west-1 --no-auto-terminate \
>>>
>>>   --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
>>>
>>>   --bootstrap-actions
>>> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \
>>>
>>>   --ami-version 3.3 --instance-groups
>>> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
>>>
>>>   InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
>>> "Spark Issue Repro" \
>>>
>>>   --visible-to-all-users --applications Name=Ganglia
>>>
>>> This is a 10 node cluster (not sure if this makes a difference outside of
>>> HDFS block locality). Then use this Gist here as your spark-defaults file
>>> (it'll configure 2 executors per job as well):
>>> https://gist.github.com/skrasser/9b978d3d572735298d16
>>>
>>> With that, I am seeing this again:
>>>
>>> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
>>> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
>>> stage 0.0 (TID 27)
>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>>> List([B@4cfae71c)
>>>
>>> Thanks for the performance pointers -- the repro script is fairly unpolished
>>> (just enough to cause the aforementioned exception).
>>>
>>> Hope this sheds some light on the error. From what I can tell so far,
>>> something in the spark-defaults file triggers it (with other settings it
>>> completes just fine).
>>>
>>> Thanks for your help!
>>> -Sven
>>>
>>>
 On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu  wrote:

 I still can not reproduce it with 2 nodes (4 CPUs).

 Your repro.py could be faster (10 min) than before (22 min):

 inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
 pc==3).collect()

 (also, no cache needed anymore)

 Davies



> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
> The issue has been sensitive to the number of executors and input data
> size.
> I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
> memory
> overhead for YARN. This will fit onto Amazon r3 instance types.
> -Sven
>
> On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu 
> wrote:
>>
>> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
>> reproduce your failure. Should I test it with big memory node?
>>
>>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
>>> Thanks for the input! I've managed to come up with a repro of the
>>> error
>>> with
>>> test data only (and without any of the custom code in the original
>>> script),
>>> please see here:
>>>
>>> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>>>
>>> The Gist contains a data generator and the script reproducing the
>>> error
>>> (plus driver and executor logs). If I run using full cluster capacity
>>> (32
>>> executors with 28GB), there are no issues. If I run on only two, the
>>> error
>>> appears again and the job fails:
>>>
>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>>> List([B@294b55b7)
>>>
>>>
>>> Any thoughts or any obvious problems you can spot by any chance?
>>>
>>> Thank you!
>>> -Sven
>>>
>>> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen 
>>> wrote:

 It doesn’t seem like there’s a whole lot of clues to go on here
 without
 seeing the job code.  The original "org.apache.spark.SparkException:
 PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
 that
 maybe
 there’s an issue with PySpark’s serialization / tracking of types,
 but
 it’s
 hard to say from this error trace alone.

 On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
 wrote:

 Hey Josh,

 I am still trying to prune this to a minimal example, but it has
 been
 tricky since scale seems to be a factor. The job runs over ~72

Re: train many decision tress with a single spark job

2015-01-12 Thread Josh Buffum
Sean,

Thanks for the response. Is there some subtle difference between one model
partitioned by N users or N models per each 1 user? I think I'm missing
something with your question.

Looping through the RDD filtering one user at a time would certainly give
me the response that I am hoping for (i.e a map of user => decisiontree),
however, that seems like it would yield poor performance? The userIDs are
not integers, so I either need to iterator through some in-memory array of
them (could be quite large) or have some distributed lookup table. Neither
seem great.

I tried the random split thing. I wonder if I did something wrong there,
but some of the splits got RDDs with 0 tuples and some got RDDs with > 1
tuple. I guess that's to be expected with some random distribution?
However, that won't work for me since it breaks the "one tree per user"
thing. I guess I could randomly distribute user IDs and then do the "scan
everything and filter" step...

How bad of an idea is it to do:

data.groupByKey.map( kvp => {
  val (key, data) = kvp
  val tree = DecisionTree.train( sc.makeRDD(data), ... )
  (key, tree)
})

Is there a way I could tell spark not to distribute the RDD created by
sc.makeRDD(data) but just to deal with it on whatever spark worker is
handling kvp? Does that question make sense?

Thanks!

Josh

On Sun, Jan 11, 2015 at 4:12 AM, Sean Owen  wrote:

> You just mean you want to divide the data set into N subsets, and do
> that dividing by user, not make one model per user right?
>
> I suppose you could filter the source RDD N times, and build a model
> for each resulting subset. This can be parallelized on the driver. For
> example let's say you divide into N subsets depending on the value of
> the user ID modulo N:
>
> val N = ...
> (0 until N).par.map(d => DecisionTree.train(data.filter(_.userID % N
> == d), ...))
>
> data should be cache()-ed here of course.
>
> However it may be faster and more principled to take random subsets
> directly:
>
> data.randomSplit(Array.fill(N)(1.0 / N)).par.map(subset =>
> DecisionTree.train(subset, ...))
>
> On Sun, Jan 11, 2015 at 1:53 AM, Josh Buffum  wrote:
> > I've got a data set of activity by user. For each user, I'd like to
> train a
> > decision tree model. I currently have the feature creation step
> implemented
> > in Spark and would naturally like to use mllib's decision tree model.
> > However, it looks like the decision tree model expects the whole RDD and
> > will train a single tree.
> >
> > Can I split the RDD by user (i.e. groupByKey) and then call the
> > DecisionTree.trainClassifer in a reduce() or aggregate function to
> create a
> > RDD[DecisionTreeModels]? Maybe train the model with an in-memory dataset
> > instead of an RDD? Call sc.parallelize on the Iterable values in a
> groupBy
> > to create a mini-RDD?
> >
> > Has anyone else tried something like this with success?
> >
> > Thanks!
>


[mllib] GradientDescent requires huge memory for storing weight vector

2015-01-12 Thread Tianshuo Deng
Hi,
Currently in GradientDescent.scala, weights is constructed as a dense
vector:

initialWeights = Vectors.dense(new Array[Double](numFeatures))

And the numFeatures is determined in the loadLibSVMFile as the max index of
features.

But in the case of using hash function to compute feature index, it results
in a huge dense vector being generated taking lots of memory space.

Any suggestions?


Broadcast joins on RDD

2015-01-12 Thread Pala M Muthaia
Hi,


How do i do broadcast/map join on RDDs? I have a large RDD that i want to
inner join with a small RDD. Instead of having the large RDD repartitioned
and shuffled for join, i would rather send a copy of a small RDD to each
task, and then perform the join locally.

How would i specify this in Spark code? I didn't find much documentation
online. I attempted to create a broadcast variable out of the small RDD and
then access that in the join operator:

largeRdd.join(smallRddBroadCastVar.value)

but that didn't work as expected ( I found that all rows with same key were
on same task)

I am using Spark version 1.0.1


Thanks,
pala


Re: OOM exception during row deserialization

2015-01-12 Thread Pala M Muthaia
Does anybody have insight on this? Thanks.

On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia 
wrote:

> Hi,
>
> I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during
> a join step.
>
> Basically, i have a RDD of rows, that i am joining with another RDD of
> tuples.
>
> Some of the tasks succeed but a fair number failed with OOM exception with
> stack below. The stack belongs to the 'reducer' that is reading shuffle
> output from the 'mapper'.
>
> My question is what's the object being deserialized here - just a portion
> of an RDD or the whole RDD partition assigned to current reducer? The rows
> in the RDD could be large, but definitely not something that would run to
> 100s of MBs in size, and thus run out of memory.
>
> Also, is there a way to determine size of the object being deserialized
> that results in the error (either by looking at some staging hdfs dir or
> logs)?
>
> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
> exceeded}
> java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
> java.lang.StringBuilder.append(StringBuilder.java:204)
> java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
> java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
> java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
> java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> java.util.ArrayList.readObject(ArrayList.java:771)
> sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
>
>
>
> Thanks,
> pala
>


Re: including the spark-mllib in build.sbt

2015-01-12 Thread Xiangrui Meng
I don't know the root cause. Could you try including only

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.1.1"

It should be sufficient because mllib depends on core.

-Xiangrui

On Mon, Jan 12, 2015 at 2:27 PM, Jianguo Li  wrote:
> Hi,
>
> I am trying to build my own scala project using sbt. The project is
> dependent on both spark-score and spark-mllib. I included the following two
> dependencies in my build.sbt file
>
> libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.1.1"
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"
>
> However, when I run the "package" command in sbt, I got an error message
> indicating that "object mllib is not a member of package org.apache.spark".
>
> Did I do anything wrong?
>
> Thanks,
>
> Jianguo
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Xiangrui Meng
No, colStats() computes all summary statistics in one pass and store
the values. It is not lazy.

On Mon, Jan 12, 2015 at 4:42 AM, Rok Roskar  wrote:
> This was without using Kryo -- if I use kryo, I got errors about buffer
> overflows (see above):
>
> com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
> required: 8
>
> Just calling colStats doesn't actually compute those statistics, does it? It
> looks like the computation is only carried out once you call the .mean()
> method.
>
>
>
> On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng  wrote:
>>
>> colStats() computes the mean values along with several other summary
>> statistics, which makes it slower. How is the performance if you don't
>> use kryo? -Xiangrui
>>
>> On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar  wrote:
>> > thanks for the suggestion -- however, looks like this is even slower.
>> > With
>> > the small data set I'm using, my aggregate function takes ~ 9 seconds
>> > and
>> > the colStats.mean() takes ~ 1 minute. However, I can't get it to run
>> > with
>> > the Kyro serializer -- I get the error:
>> >
>> > com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
>> > required: 8
>> >
>> > is there an easy/obvious fix?
>> >
>> >
>> > On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng  wrote:
>> >>
>> >> There is some serialization overhead. You can try
>> >>
>> >>
>> >> https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
>> >> . -Xiangrui
>> >>
>> >> On Wed, Jan 7, 2015 at 9:42 AM, rok  wrote:
>> >> > I have an RDD of SparseVectors and I'd like to calculate the means
>> >> > returning
>> >> > a dense vector. I've tried doing this with the following (using
>> >> > pyspark,
>> >> > spark v1.2.0):
>> >> >
>> >> > def aggregate_partition_values(vec1, vec2) :
>> >> > vec1[vec2.indices] += vec2.values
>> >> > return vec1
>> >> >
>> >> > def aggregate_combined_vectors(vec1, vec2) :
>> >> > if all(vec1 == vec2) :
>> >> > # then the vector came from only one partition
>> >> > return vec1
>> >> > else:
>> >> > return vec1 + vec2
>> >> >
>> >> > means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
>> >> > aggregate_combined_vectors)
>> >> > means = means / nvals
>> >> >
>> >> > This turns out to be really slow -- and doesn't seem to depend on how
>> >> > many
>> >> > vectors there are so there seems to be some overhead somewhere that
>> >> > I'm
>> >> > not
>> >> > understanding. Is there a better way of doing this?
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> >> > Nabble.com.
>> >> >
>> >> > -
>> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> > For additional commands, e-mail: user-h...@spark.apache.org
>> >> >
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How Spark Calculate partition size automatically

2015-01-12 Thread rajnish
Hi,

When I am running a job, that is loading the data from Cassandra, Spark has
created almost 9million partitions. How spark decide the partition count? I
have read from one of the presentation that it is good to have 1000 to
10,000 partitions.

Regards
Raj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-Calculate-partition-size-automatically-tp21109.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



including the spark-mllib in build.sbt

2015-01-12 Thread Jianguo Li
Hi,

I am trying to build my own scala project using sbt. The project is
dependent on both spark-score and spark-mllib. I included the following two
dependencies in my build.sbt file

libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"

However, when I run the "package" command in sbt, I got an error message
indicating that "object mllib is not a member of package org.apache.spark".

Did I do anything wrong?

Thanks,

Jianguo


Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
Sorry, slightly misunderstood the question.  I'm not sure if there's a way
to make the master UI read old log files after a restart, but the log files
themselves are human readable text.

If you just want application duration, the start and stop are timestamped,
look for lines like this in EVENT_LOG_1:

{"Event":"SparkListenerApplicationStart","App
Name":"cassandra-example-broadcast-join","Timestamp":1415763986601,"User":"cody"}

...

{"Event":"SparkListenerApplicationEnd","Timestamp":1415763999790}



On Mon, Jan 12, 2015 at 3:56 PM, Chong Tang  wrote:

> Thank you, Cody! Actually, I have enabled this option, and I saved logs
> into Hadoop file system. The problem is, how can I get the duration of an
> application? The attached file is the log I copied from HDFS.
>
> On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger 
> wrote:
>
>> http://spark.apache.org/docs/latest/monitoring.html
>>
>> http://spark.apache.org/docs/latest/configuration.html#spark-ui
>>
>> spark.eventLog.enabled
>>
>>
>>
>> On Mon, Jan 12, 2015 at 3:00 PM, ChongTang  wrote:
>>
>>> Is there any body can help me with this? Thank you very much!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How does unmanaged memory work with the executor memory limits?

2015-01-12 Thread Marcelo Vanzin
Short answer: yes.

Take a look at: http://spark.apache.org/docs/latest/running-on-yarn.html

Look for "memoryOverhead".

On Mon, Jan 12, 2015 at 2:06 PM, Michael Albert
 wrote:
> Greetings!
>
> My executors apparently are being terminated because they are
> "running beyond physical memory limits" according to the
> "yarn-hadoop-nodemanager"
> logs on the worker nodes (/mnt/var/log/hadoop on AWS EMR).
> I'm setting the "driver-memory" to 8G.
> However, looking at "stdout" in userlogs, I can see GC going on, but the
> lines look
> like "6G -> 5G(7.2G), 0.45secs", so the GC seems to think that the process
> is using
> about 6G of space, not 8G of space.
> However, "ps aux" shows an RSS hovering just below 8G.
>
> The process does a "mapParitionsWithIndex", and the process uses compression
> which (I believe) calls into the native zlib library
> (the overall purpose is to convert each partition into a "matlab" file).
>
> Could it be that the Yarn container is counting both the memory used by the
> JVM proper and memory used by zlib, but that the GC only "sees" the
> "internal" memory.  So the GC keeps the memory usage "reasonable",
> e.g., 6G in an 8G container, but then zlib grabs some memory, and the
> YARN container then terminates the task?
>
> If so, is there anything I can do so that I tell YARN to watch for a larger
> memory limit than I tell the JVM to use for it's memory?
>
> Thanks!
>
> Sincerely,
>  Mike
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How does unmanaged memory work with the executor memory limits?

2015-01-12 Thread Michael Albert
Greetings!
My executors apparently are being terminated because they are "running beyond 
physical memory limits" according to the "yarn-hadoop-nodemanager" logs on the 
worker nodes (/mnt/var/log/hadoop on AWS EMR).  I'm setting the "driver-memory" 
to 8G.However, looking at "stdout" in userlogs, I can see GC going on, but the 
lines looklike "6G -> 5G(7.2G), 0.45secs", so the GC seems to think that the 
process is usingabout 6G of space, not 8G of space.  However, "ps aux" shows an 
RSS hovering just below 8G.
The process does a "mapParitionsWithIndex", and the process uses 
compressionwhich (I believe) calls into the native zlib library (the overall 
purpose is to convert each partition into a "matlab" file).
Could it be that the Yarn container is counting both the memory used by the JVM 
proper and memory used by zlib, but that the GC only "sees" the "internal" 
memory.  So the GC keeps the memory usage "reasonable", e.g., 6G in an 8G 
container, but then zlib grabs some memory, and the YARN container then 
terminates the task?
If so, is there anything I can do so that I tell YARN to watch for a 
largermemory limit than I tell the JVM to use for it's memory?
Thanks!
Sincerely, Mike
 

Re: Discrepancy in PCA values

2015-01-12 Thread Xiangrui Meng
Could you compare V directly and tell us more about the difference you
saw? The column of V should be the same subject to signs. For example,
the first column of V could be either [0.8, -0.6, 0.0] or [-0.8, 0.6,
0.0]. -Xiangrui

On Sat, Jan 10, 2015 at 8:08 PM, Upul Bandara  wrote:
> Hi Xiangrui,
>
> Thanks a lot for you answer.
> So I fixed my Julia code, also calculated PCA using R as well.
>
> R Code:
> -
> data <- read.csv('/home/upul/Desktop/iris.csv');
> X <- data[,1:4]
> pca <- prcomp(X, center = TRUE, scale=FALSE)
> transformed <- predict(pca, newdata = X)
>
> Julia Code (Fixed)
> --
> data = readcsv("/home/upul/temp/iris.csv");
> X = data[:,1:end-1];
> meanX = mean(X,1);
> m,n = size(X);
> X = X - repmat(x, m,1);
> u,s,v = svd(X);
> transformed =  X*v;
>
> Now PCA calculated using Julia and R is identical, but still I can see a
> small
> difference between PCA  values given by Spark and other two.
>
> Thanks,
> Upul
>
> On Sat, Jan 10, 2015 at 11:17 AM, Xiangrui Meng  wrote:
>>
>> You need to subtract mean values to obtain the covariance matrix
>> (http://en.wikipedia.org/wiki/Covariance_matrix).
>>
>> On Fri, Jan 9, 2015 at 6:41 PM, Upul Bandara 
>> wrote:
>> > Hi Xiangrui,
>> >
>> > Thanks for the reply.
>> >
>> > Julia code is also using the covariance matrix:
>> > (1/n)*X'*X ;
>> >
>> > Thanks,
>> > Upul
>> >
>> > On Fri, Jan 9, 2015 at 2:11 AM, Xiangrui Meng  wrote:
>> >>
>> >> The Julia code is computing the SVD of the Gram matrix. PCA should be
>> >> applied to the covariance matrix. -Xiangrui
>> >>
>> >> On Thu, Jan 8, 2015 at 8:27 AM, Upul Bandara 
>> >> wrote:
>> >> > Hi All,
>> >> >
>> >> > I tried to do PCA for the Iris dataset
>> >> > [https://archive.ics.uci.edu/ml/datasets/Iris] using MLLib
>> >> >
>> >> >
>> >> > [http://spark.apache.org/docs/1.1.1/mllib-dimensionality-reduction.html].
>> >> > Also, PCA  was calculated in Julia using following method:
>> >> >
>> >> > Sigma = (1/numRow(X))*X'*X ;
>> >> > [U, S, V] = svd(Sigma);
>> >> > Ureduced = U(:, 1:k);
>> >> > Z = X*Ureduced;
>> >> >
>> >> > However, I'm seeing a little difference between values given by MLLib
>> >> > and
>> >> > the method shown above .
>> >> >
>> >> > Does anyone have any idea about this difference?
>> >> >
>> >> > Additionally, I have attached two visualizations, related to two
>> >> > approaches.
>> >> >
>> >> > Thanks,
>> >> > Upul
>> >> >
>> >> >
>> >> >
>> >> > -
>> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Thank you, Cody! Actually, I have enabled this option, and I saved logs
into Hadoop file system. The problem is, how can I get the duration of an
application? The attached file is the log I copied from HDFS.

On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger  wrote:

> http://spark.apache.org/docs/latest/monitoring.html
>
> http://spark.apache.org/docs/latest/configuration.html#spark-ui
>
> spark.eventLog.enabled
>
>
>
> On Mon, Jan 12, 2015 at 3:00 PM, ChongTang  wrote:
>
>> Is there any body can help me with this? Thank you very much!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


EVENT_LOG_1
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Thank you, Cody! Actually, I have enabled this option, and I saved logs
into Hadoop file system. The problem is, how can I get the duration of an
application? The attached file is the log I copied from HDFS.

On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger  wrote:

> http://spark.apache.org/docs/latest/monitoring.html
>
> http://spark.apache.org/docs/latest/configuration.html#spark-ui
>
> spark.eventLog.enabled
>
>
>
> On Mon, Jan 12, 2015 at 3:00 PM, ChongTang  wrote:
>
>> Is there any body can help me with this? Thank you very much!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


EVENT_LOG_1
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Getting Output From a Cluster

2015-01-12 Thread Su She
Okay, thanks Akhil!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Jan 12, 2015 at 1:24 PM, Akhil Das 
wrote:

> There is no direct way of doing that. If you need a Single file for every
> batch duration, then you can repartition the data to 1 before saving.
> Another way would be to use hadoop's copy merge command/api(available from
> 2.0 versions)
> On 13 Jan 2015 01:08, "Su She"  wrote:
>
>> Hello Everyone,
>>
>> Quick followup, is there any way I can append output to one file rather
>> then create a new directory/file every X milliseconds?
>>
>> Thanks!
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Thu, Jan 8, 2015 at 11:41 PM, Su She  wrote:
>>
>>> 1) Thank you everyone for the help once again...the support here is
>>> really amazing and I hope to contribute soon!
>>>
>>> 2) The solution I actually ended up using was from this thread:
>>> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E
>>>
>>> in case the thread ever goes down, the soln provided by Matei:
>>>
>>>
>>> plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv",
>>> String.class, String.class, (Class) TextOutputFormat.class);
>>>
>>> I had browsed a lot of similar threads that did not have answers, but
>>> found this one from quite some time ago, so apologize for posting a
>>> question that had been answered before.
>>>
>>> 3) Akhil, I was specifying the format as "txt", but it was not
>>> compatible
>>>
>>> Thanks for the help!
>>>
>>>
>>> On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das 
>>> wrote:
>>>
 saveAsHadoopFiles requires you to specify the output format which i
 believe you are not specifying anywhere and hence the program crashes.

 You could try something like this:

 Class> outputFormatClass = (Class>>> OutputFormat>) (Class) SequenceFileOutputFormat.class;
 46

 yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
 "/output-location",Text.class, Text.class, outputFormatClass);



 Thanks
 Best Regards

 On Fri, Jan 9, 2015 at 10:22 AM, Su She  wrote:

> Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when
> I call print on the Dstream it works? If I had to do foreachRDD to
> saveAsHadoopFile, then why is it working for print?
>
> Also, if I am doing foreachRDD, do I need connections, or can I simply
> put the saveAsHadoopFiles inside the foreachRDD function?
>
> Thanks Yana for the help! I will play around with foreachRDD and
> convey my results.
>
>
>
> On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska  > wrote:
>
>> are you calling the saveAsText files on the DStream --looks like it?
>> Look at the section called "Design Patterns for using foreachRDD" in the
>> link you sent -- you want to do  dstream.foreachRDD(rdd =>
>> rdd.saveAs)
>>
>> On Thu, Jan 8, 2015 at 5:20 PM, Su She  wrote:
>>
>>> Hello Everyone,
>>>
>>> Thanks in advance for the help!
>>>
>>> I successfully got my Kafka/Spark WordCount app to print locally.
>>> However, I want to run it on a cluster, which means that I will have to
>>> save it to HDFS if I want to be able to read the output.
>>>
>>> I am running Spark 1.1.0, which means according to this document:
>>> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html
>>>
>>> I should be able to use commands such as saveAsText/HadoopFiles.
>>>
>>> 1) When I try saveAsTextFiles it says:
>>> cannot find symbol
>>> [ERROR] symbol  : method
>>> saveAsTextFiles(java.lang.String,java.lang.String)
>>> [ERROR] location: class
>>> org.apache.spark.streaming.api.java.JavaPairDStream
>>>
>>> This makes some sense as saveAsTextFiles is not included here:
>>>
>>> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html
>>>
>>> 2) When I try
>>> saveAsHadoopFiles("hdfs://ipus-west-1.compute.internal:8020/user/testwordcount",
>>> "txt") it builds, but when I try running it it throws this exception:
>>>
>>> Exception in thread "main" java.lang.RuntimeException:
>>> java.lang.RuntimeException: class scala.runtime.Nothing$ not
>>> org.apache.hadoop.mapred.OutputFormat
>>> at
>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
>>> at
>>> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
>>> at
>>> org.apache.sp

Re: Spark Framework handling of Mesos master change

2015-01-12 Thread Tim Chen
Hi Ethan,

How are you specifying the master to spark?

Able to recover from master failover is already handled by the underlying
Mesos scheduler, but you have to use zookeeper instead of directly passing
in the master uris.

Tim

On Mon, Jan 12, 2015 at 12:44 PM, Ethan Wolf 
wrote:

> We are running Spark and Spark Streaming on Mesos (with multiple masters
> for
> HA).
> At launch, our Spark jobs successfully look up the current Mesos master
> from
> zookeeper and spawn tasks.
>
> However, when the Mesos master changes while the spark job is executing,
> the
> spark driver seems to interact with the old Mesos master, and therefore
> fails to launch any new tasks.
> We are running long running Spark streaming jobs, so we have temporarily
> switched to coarse grained as a work around, but it prevents us from
> running
> in fine grained mode which we would prefer for some job.
>
> Looking at the code for MesosSchedulerBackend, it it has an empty
> implementation of the reregistered (and disconnected) methods, which I
> believe would be called when the master changes:
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202
>
>
> http://mesos.apache.org/documentation/latest/app-framework-development-guide/
>
> Are there any plans to implement master reregistration in the Spark
> framework, or does anyone have any suggested workarounds for long running
> jobs to deal with the mesos master changing?  (Or is there something we are
> doing wrong?)
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/monitoring.html

http://spark.apache.org/docs/latest/configuration.html#spark-ui

spark.eventLog.enabled



On Mon, Jan 12, 2015 at 3:00 PM, ChongTang  wrote:

> Is there any body can help me with this? Thank you very much!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Getting Output From a Cluster

2015-01-12 Thread Akhil Das
There is no direct way of doing that. If you need a Single file for every
batch duration, then you can repartition the data to 1 before saving.
Another way would be to use hadoop's copy merge command/api(available from
2.0 versions)
On 13 Jan 2015 01:08, "Su She"  wrote:

> Hello Everyone,
>
> Quick followup, is there any way I can append output to one file rather
> then create a new directory/file every X milliseconds?
>
> Thanks!
>
> Suhas Shekar
>
> University of California, Los Angeles
> B.A. Economics, Specialization in Computing 2014
>
> On Thu, Jan 8, 2015 at 11:41 PM, Su She  wrote:
>
>> 1) Thank you everyone for the help once again...the support here is
>> really amazing and I hope to contribute soon!
>>
>> 2) The solution I actually ended up using was from this thread:
>> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E
>>
>> in case the thread ever goes down, the soln provided by Matei:
>>
>> plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv",
>> String.class, String.class, (Class) TextOutputFormat.class);
>>
>> I had browsed a lot of similar threads that did not have answers, but
>> found this one from quite some time ago, so apologize for posting a
>> question that had been answered before.
>>
>> 3) Akhil, I was specifying the format as "txt", but it was not compatible
>>
>> Thanks for the help!
>>
>>
>> On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das 
>> wrote:
>>
>>> saveAsHadoopFiles requires you to specify the output format which i
>>> believe you are not specifying anywhere and hence the program crashes.
>>>
>>> You could try something like this:
>>>
>>> Class> outputFormatClass = (Class>> OutputFormat>) (Class) SequenceFileOutputFormat.class;
>>> 46
>>>
>>> yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
>>> "/output-location",Text.class, Text.class, outputFormatClass);
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Jan 9, 2015 at 10:22 AM, Su She  wrote:
>>>
 Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I
 call print on the Dstream it works? If I had to do foreachRDD to
 saveAsHadoopFile, then why is it working for print?

 Also, if I am doing foreachRDD, do I need connections, or can I simply
 put the saveAsHadoopFiles inside the foreachRDD function?

 Thanks Yana for the help! I will play around with foreachRDD and convey
 my results.



 On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska 
 wrote:

> are you calling the saveAsText files on the DStream --looks like it?
> Look at the section called "Design Patterns for using foreachRDD" in the
> link you sent -- you want to do  dstream.foreachRDD(rdd =>
> rdd.saveAs)
>
> On Thu, Jan 8, 2015 at 5:20 PM, Su She  wrote:
>
>> Hello Everyone,
>>
>> Thanks in advance for the help!
>>
>> I successfully got my Kafka/Spark WordCount app to print locally.
>> However, I want to run it on a cluster, which means that I will have to
>> save it to HDFS if I want to be able to read the output.
>>
>> I am running Spark 1.1.0, which means according to this document:
>> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html
>>
>> I should be able to use commands such as saveAsText/HadoopFiles.
>>
>> 1) When I try saveAsTextFiles it says:
>> cannot find symbol
>> [ERROR] symbol  : method
>> saveAsTextFiles(java.lang.String,java.lang.String)
>> [ERROR] location: class
>> org.apache.spark.streaming.api.java.JavaPairDStream
>>
>> This makes some sense as saveAsTextFiles is not included here:
>>
>> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html
>>
>> 2) When I try
>> saveAsHadoopFiles("hdfs://ipus-west-1.compute.internal:8020/user/testwordcount",
>> "txt") it builds, but when I try running it it throws this exception:
>>
>> Exception in thread "main" java.lang.RuntimeException:
>> java.lang.RuntimeException: class scala.runtime.Nothing$ not
>> org.apache.hadoop.mapred.OutputFormat
>> at
>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
>> at
>> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
>> at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
>> at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
>> at
>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
>> at
>> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply

How to recovery application running records when I restart Spark master?

2015-01-12 Thread Chong Tang
Hi all,

Due to some reasons, I restarted Spark master node.

Before I restart it, there were some application running records at the
bottom of the master web page. But they are gone after I restart the master
node. The records include application name, running time, status, and so
on. I am sure you know what I am talking about.

My question is: 1) how can I recovery those records when I restart my Spark
master node. 2) If I cannot recovery it, where should I go to look for
them?

Actually, I am caring about the running time of finished applications.

Thank you for your help, and hope every body is doing great!

Chong


Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread ChongTang
Is there any body can help me with this? Thank you very much!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recovery-application-running-records-when-I-restart-Spark-master-tp21088p21108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Web Service + Spark

2015-01-12 Thread Robert C Senkbeil
If you would like to work with an API, you can use the Spark Kernel found
here: https://github.com/ibm-et/spark-kernel

The kernel provides an API following the IPython message protocol as well
as a client library that can be used with Scala applications.

The kernel can also be plugged into the latest developmental version of
IPython 3.0 in case you want to do more visual exploration.

Signed,
Chip Senkbeil
IBM Emerging Technology Software Engineer



From:   Raghavendra Pandey 
To: Cui Lin , gtinside , Corey
Nolet 
Cc: "user@spark.apache.org" 
Date:   01/11/2015 02:06 AM
Subject:Re: Web Service + Spark



You can take a look at http://zeppelin.incubator.apache.org. it is a
notebook and graphic visual designer.



On Sun, Jan 11, 2015, 01:45 Cui Lin  wrote:
  Thanks, Gaurav and Corey,

  Probably I didn’t make myself clear. I am looking for best Spark practice
  similar to Shiny for R, the analysis/visualziation results can be easily
  published to web server and shown from web browser. Or any dashboard for
  Spark?

  Best regards,

  Cui Lin

  From: gtinside 
  Date: Friday, January 9, 2015 at 7:45 PM
  To: Corey Nolet 
  Cc: Cui Lin , "user@spark.apache.org" <
  user@spark.apache.org>
  Subject: Re: Web Service + Spark

  You can also look at Spark Job Server
  https://github.com/spark-jobserver/spark-jobserver

  - Gaurav

  On Jan 9, 2015, at 10:25 PM, Corey Nolet  wrote:

Cui Lin,

The solution largely depends on how you want your services deployed
(Java web container, Spray framework, etc...) and if you are using
a cluster manager like Yarn or Mesos vs. just firing up your own
executors and master.

I recently worked on an example for deploying Spark services inside
of Jetty using Yarn as the cluster manager. It forced me to learn
how Spark wires up the dependencies/classpaths. If it helps, the
example that resulted from my tinkering is located at [1].


[1] https://github.com/calrissian/spark-jetty-server

On Fri, Jan 9, 2015 at 9:33 PM, Cui Lin  wrote:
 Hello, All,

 What’s the best practice on deploying/publishing spark-based
 scientific applications into a web service? Similar to Shiny on R.
  Thanks!

 Best regards,

 Cui Lin


Spark Framework handling of Mesos master change

2015-01-12 Thread Ethan Wolf
We are running Spark and Spark Streaming on Mesos (with multiple masters for
HA).
At launch, our Spark jobs successfully look up the current Mesos master from
zookeeper and spawn tasks.

However, when the Mesos master changes while the spark job is executing, the
spark driver seems to interact with the old Mesos master, and therefore
fails to launch any new tasks.
We are running long running Spark streaming jobs, so we have temporarily
switched to coarse grained as a work around, but it prevents us from running
in fine grained mode which we would prefer for some job.

Looking at the code for MesosSchedulerBackend, it it has an empty
implementation of the reregistered (and disconnected) methods, which I
believe would be called when the master changes:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L202

http://mesos.apache.org/documentation/latest/app-framework-development-guide/

Are there any plans to implement master reregistration in the Spark
framework, or does anyone have any suggested workarounds for long running
jobs to deal with the mesos master changing?  (Or is there something we are
doing wrong?)

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Framework-handling-of-Mesos-master-change-tp21107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: no snappyjava in java.library.path

2015-01-12 Thread David Rosenstrauch
I ran into this recently.  Turned out we had an old 
org-xerial-snappy.properties file in one of our conf directories that 
had the setting:


# Disables loading Snappy-Java native library bundled in the
# snappy-java-*.jar file forcing to load the Snappy-Java native
# library from the java.library.path.
#
org.xerial.snappy.disable.bundled.libs=true

When I switched that to false, it made the problem go away.

May or may not be your problem of course, but worth a look.

HTH,

DR

On 01/12/2015 03:28 PM, Dan Dong wrote:

Hi,
   My Spark job failed with "no snappyjava in java.library.path" as:
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in
java.library.path
 at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857)
 at java.lang.Runtime.loadLibrary0(Runtime.java:870)
 at java.lang.System.loadLibrary(System.java:1119)
 at
org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)

I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I
have included it in the
CLASSPATH already.
../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar

Did I miss anything or I should set it in other way?

Cheers,
Dan




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
?Good idea! Join each element of c with the corresponding row of A, multiply 
through, then reduce.  I'll give this a try.


Thanks,


Alex


From: Reza Zadeh 
Sent: Monday, January 12, 2015 3:05 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

Yes you are correct, to do it with existing operations you would need a 
transpose on rowmatrix.

However, you can fairly easily perform the operation manually by doing a join 
(if the c vector is an RDD) or broadcasting c (if the c vector is small enough 
to fit in memory on a single machine).

On Mon, Jan 12, 2015 at 11:45 AM, Alex Minnaar 
mailto:aminn...@verticalscope.com>> wrote:

That's not quite what I'm looking for.  Let me provide an example.  I have a 
rowmatrix A that is nxm and I have two local matrices b and c.  b is mx1 and c 
is nx1.  In my spark job I wish to perform the following two computations


A*b


and


A^T*c


I don't think this is possible without being able to transpose a rowmatrix.  Am 
I correct?


Thanks,


Alex


From: Reza Zadeh mailto:r...@databricks.com>>
Sent: Monday, January 12, 2015 1:58 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

As you mentioned, you can perform A * b, where A is a rowmatrix and b is a 
local matrix.

>From your email, I figure you want to compute b * A^T. To do this, you can 
>compute C = A b^T, whose result is the transpose of what you were looking for, 
>i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually 
>yourself. Be careful though, because the result might not have each Row fit in 
>memory on a single machine, which is what RowMatrix requires. This danger is 
>why we didn't provide a transpose operation in RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive 
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
mailto:aminn...@verticalscope.com>> wrote:

I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex




no snappyjava in java.library.path

2015-01-12 Thread Dan Dong
Hi,
  My Spark job failed with "no snappyjava in java.library.path" as:
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1119)
at
org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)

I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I
have included it in the
CLASSPATH already.
../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar
../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar

Did I miss anything or I should set it in other way?

Cheers,
Dan


Re: status of spark analytics functions? over, rank, percentile, row_number, etc.

2015-01-12 Thread Kevin Burton
Great. I’d love to help out. Is there any documentation on what you’re
working on that I can take a look at?

My biggest issue is that I need some way to compute the position of an
entry when used by ORDER BY… which I can do with the RANK operator.

What I essentially need is:

select source, indegree, rank() over (order by in degree desc) from foo
order by indegree desc,

This would give me the position of the record in the whole index and the
table sorted by indegree desc.

I was using RANK in pig but we’re ditching hadoop/pig in favor of spark.

I assume you’re implementing something similar to this:

https://issues.apache.org/jira/browse/SPARK-1442

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

https://issues.apache.org/jira/browse/HIVE-4197

https://issues.apache.org/jira/browse/HIVE-896

On Sat, Jan 10, 2015 at 5:00 PM, Will Benton  wrote:

> Hi Kevin,
>
> I'm currently working on implementing windowing.  If you'd like to see
> something that's not covered by a JIRA, please file one!
>
>
> best,
> wb
>
> - Original Message -
> > From: "Kevin Burton" 
> > To: user@spark.apache.org
> > Sent: Saturday, January 10, 2015 12:12:38 PM
> > Subject: status of spark analytics functions? over, rank, percentile,
> row_number, etc.
> >
> > I’m curious what the status of implementing hive analytics functions in
> > spark.
> >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
> >
> > Many of these seem missing.  I’m assuming they’re not implemented yet?
> >
> > Is there an ETA on them?
> >
> > or am I the first to bring this up? :-P
> >
> > --
> >
> > Founder/CEO Spinn3r.com
> > Location: *San Francisco, CA*
> > blog: http://burtonator.wordpress.com
> > … or check out my Google+ profile
> > 
> > 
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile




ReliableKafkaReceiver stopped receiving data after WriteAheadLogBasedBlockHandler throws TimeoutException

2015-01-12 Thread Max Xu
Hi all,

I am running a Spark streaming application with ReliableKafkaReceiver (Spark 
1.2.0). Constantly I was getting the following exception:

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped 
receiving data from Kafka. The Kafka message handler thread is in BLOCKED state:

Thread 92: KafkaMessageHandler-0 (BLOCKED)
org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
java.util.concurrent.FutureTask.run(FutureTask.java:262)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Sometimes when the exception was thrown, I also see warning messages like this:
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 
30533ms (threshold=3ms); ack: seqno: 113 status: SUCCESS status: SUCCESS 
downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 
172.20.xxx.xxx:50010]
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms 
(threshold=3ms)

In the past, I never have such problem with KafkaReceiver. What causes this 
exception? How can I solve this problem?

Thanks in advance,
Max


Re: RowMatrix multiplication

2015-01-12 Thread Reza Zadeh
Yes you are correct, to do it with existing operations you would need a
transpose on rowmatrix.

However, you can fairly easily perform the operation manually by doing a
join (if the c vector is an RDD) or broadcasting c (if the c vector is
small enough to fit in memory on a single machine).

On Mon, Jan 12, 2015 at 11:45 AM, Alex Minnaar 
wrote:

>  That's not quite what I'm looking for.  Let me provide an example.  I
> have a rowmatrix A that is nxm and I have two local matrices b and c.  b is
> mx1 and c is nx1.  In my spark job I wish to perform the following two
> computations
>
>
>  A*b
>
>
>  and
>
>
>  A^T*c
>
>
>  I don't think this is possible without being able to transpose a
> rowmatrix.  Am I correct?
>
>
>  Thanks,
>
>
>  Alex
>  --
> *From:* Reza Zadeh 
> *Sent:* Monday, January 12, 2015 1:58 PM
> *To:* Alex Minnaar
> *Cc:* u...@spark.incubator.apache.org
> *Subject:* Re: RowMatrix multiplication
>
>  As you mentioned, you can perform A * b, where A is a rowmatrix and b is
> a local matrix.
>
>  From your email, I figure you want to compute b * A^T. To do this, you
> can compute C = A b^T, whose result is the transpose of what you were
> looking for, i.e. C^T = b * A^T. To undo the transpose, you would have
> transpose C manually yourself. Be careful though, because the result might
> not have each Row fit in memory on a single machine, which is what
> RowMatrix requires. This danger is why we didn't provide a transpose
> operation in RowMatrix natively.
>
>  To address this and more, there is an effort to provide more
> comprehensive linear algebra through block matrices, which will likely make
> it to 1.3:
> https://issues.apache.org/jira/browse/SPARK-3434
>
>  Best,
> Reza
>
> On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
> wrote:
>
>>  I have a rowMatrix on which I want to perform two multiplications.  The
>> first is a right multiplication with a local matrix which is fine.  But
>> after that I also wish to right multiply the transpose of my rowMatrix with
>> a different local matrix.  I understand that there is no functionality to
>> transpose a rowMatrix at this time but I was wondering if anyone could
>> suggest a any kind of work-around for this.  I had thought that I might be
>> able to initially create two rowMatrices - a normal version and a
>> transposed version - and use either when appropriate.  Can anyone think of
>> another alternative?
>>
>>
>>  Thanks,
>>
>>
>>  Alex
>>
>
>


Re: RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
That's not quite what I'm looking for.  Let me provide an example.  I have a 
rowmatrix A that is nxm and I have two local matrices b and c.  b is mx1 and c 
is nx1.  In my spark job I wish to perform the following two computations


A*b


and


A^T*c


I don't think this is possible without being able to transpose a rowmatrix.  Am 
I correct?


Thanks,


Alex


From: Reza Zadeh 
Sent: Monday, January 12, 2015 1:58 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication

As you mentioned, you can perform A * b, where A is a rowmatrix and b is a 
local matrix.

>From your email, I figure you want to compute b * A^T. To do this, you can 
>compute C = A b^T, whose result is the transpose of what you were looking for, 
>i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually 
>yourself. Be careful though, because the result might not have each Row fit in 
>memory on a single machine, which is what RowMatrix requires. This danger is 
>why we didn't provide a transpose operation in RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive 
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
mailto:aminn...@verticalscope.com>> wrote:

I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex



Re: Getting Output From a Cluster

2015-01-12 Thread Su She
Hello Everyone,

Quick followup, is there any way I can append output to one file rather
then create a new directory/file every X milliseconds?

Thanks!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Thu, Jan 8, 2015 at 11:41 PM, Su She  wrote:

> 1) Thank you everyone for the help once again...the support here is really
> amazing and I hope to contribute soon!
>
> 2) The solution I actually ended up using was from this thread:
> http://mail-archives.apache.org/mod_mbox/spark-user/201310.mbox/%3ccafnzj5ejxdgqju7nbdqy6xureq3d1pcxr+i2s99g5brcj5e...@mail.gmail.com%3E
>
> in case the thread ever goes down, the soln provided by Matei:
>
> plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv",
> String.class, String.class, (Class) TextOutputFormat.class);
>
> I had browsed a lot of similar threads that did not have answers, but
> found this one from quite some time ago, so apologize for posting a
> question that had been answered before.
>
> 3) Akhil, I was specifying the format as "txt", but it was not compatible
>
> Thanks for the help!
>
>
> On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das 
> wrote:
>
>> saveAsHadoopFiles requires you to specify the output format which i
>> believe you are not specifying anywhere and hence the program crashes.
>>
>> You could try something like this:
>>
>> Class> outputFormatClass = (Class> OutputFormat>) (Class) SequenceFileOutputFormat.class;
>> 46
>>
>> yourStream.saveAsNewAPIHadoopFiles(hdfsUrl,
>> "/output-location",Text.class, Text.class, outputFormatClass);
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Jan 9, 2015 at 10:22 AM, Su She  wrote:
>>
>>> Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I
>>> call print on the Dstream it works? If I had to do foreachRDD to
>>> saveAsHadoopFile, then why is it working for print?
>>>
>>> Also, if I am doing foreachRDD, do I need connections, or can I simply
>>> put the saveAsHadoopFiles inside the foreachRDD function?
>>>
>>> Thanks Yana for the help! I will play around with foreachRDD and convey
>>> my results.
>>>
>>>
>>>
>>> On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska 
>>> wrote:
>>>
 are you calling the saveAsText files on the DStream --looks like it?
 Look at the section called "Design Patterns for using foreachRDD" in the
 link you sent -- you want to do  dstream.foreachRDD(rdd =>
 rdd.saveAs)

 On Thu, Jan 8, 2015 at 5:20 PM, Su She  wrote:

> Hello Everyone,
>
> Thanks in advance for the help!
>
> I successfully got my Kafka/Spark WordCount app to print locally.
> However, I want to run it on a cluster, which means that I will have to
> save it to HDFS if I want to be able to read the output.
>
> I am running Spark 1.1.0, which means according to this document:
> https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html
>
> I should be able to use commands such as saveAsText/HadoopFiles.
>
> 1) When I try saveAsTextFiles it says:
> cannot find symbol
> [ERROR] symbol  : method
> saveAsTextFiles(java.lang.String,java.lang.String)
> [ERROR] location: class
> org.apache.spark.streaming.api.java.JavaPairDStream
>
> This makes some sense as saveAsTextFiles is not included here:
>
> http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html
>
> 2) When I try
> saveAsHadoopFiles("hdfs://ipus-west-1.compute.internal:8020/user/testwordcount",
> "txt") it builds, but when I try running it it throws this exception:
>
> Exception in thread "main" java.lang.RuntimeException:
> java.lang.RuntimeException: class scala.runtime.Nothing$ not
> org.apache.hadoop.mapred.OutputFormat
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
> at
> org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
> at
> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
> at
> org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>  

Re: /tmp directory fills up

2015-01-12 Thread Marcelo Vanzin
Hi Alessandro,

You can look for a log line like this in your driver's output:
15/01/12 10:51:01 INFO storage.DiskBlockManager: Created local
directory at 
/data/yarn/nm/usercache/systest/appcache/application_1421081007635_0002/spark-local-20150112105101-4f3d

If you're deploying your application in cluster mode, the temp
directory will be under the Yarn-defined application dir. In client
mode, the driver will create some stuff under spark.local.dir, but the
driver itself generally doesn't create many temp files IIRC.


On Fri, Jan 9, 2015 at 11:32 PM, Alessandro Baretta
 wrote:
> Gents,
>
> I'm building spark using the current master branch and deploying in to
> Google Compute Engine on top of Hadoop 2.4/YARN via bdutil, Google's Hadoop
> cluster provisioning tool. bdutils configures Spark with
>
> spark.local.dir=/hadoop/spark/tmp,
>
> but this option is ignored in combination with YARN. Bdutils also configures
> YARN with:
>
>   
> yarn.nodemanager.local-dirs
> /mnt/pd1/hadoop/yarn/nm-local-dir
> 
>   Directories on the local machine in which to application temp files.
> 
>   
>
> This is the right directory for spark to store temporary data in. Still,
> Spark is creating such directories as this:
>
> /tmp/spark-51388ee6-9de6-411d-b9b9-ab6f9502d01e
>
> and filling them up with gigabytes worth of output files, filling up the
> very small root filesystem.
>
> How can I diagnose why my Spark installation is not picking up the
> yarn.nodemanager.local-dirs from yarn?
>
> Alex



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RowMatrix multiplication

2015-01-12 Thread Reza Zadeh
As you mentioned, you can perform A * b, where A is a rowmatrix and b is a
local matrix.

>From your email, I figure you want to compute b * A^T. To do this, you can
compute C = A b^T, whose result is the transpose of what you were looking
for, i.e. C^T = b * A^T. To undo the transpose, you would have transpose C
manually yourself. Be careful though, because the result might not have
each Row fit in memory on a single machine, which is what RowMatrix
requires. This danger is why we didn't provide a transpose operation in
RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar 
wrote:

>  I have a rowMatrix on which I want to perform two multiplications.  The
> first is a right multiplication with a local matrix which is fine.  But
> after that I also wish to right multiply the transpose of my rowMatrix with
> a different local matrix.  I understand that there is no functionality to
> transpose a rowMatrix at this time but I was wondering if anyone could
> suggest a any kind of work-around for this.  I had thought that I might be
> able to initially create two rowMatrices - a normal version and a
> transposed version - and use either when appropriate.  Can anyone think of
> another alternative?
>
>
>  Thanks,
>
>
>  Alex
>


Re: Pattern Matching / Equals on Case Classes in Spark Not Working

2015-01-12 Thread Matei Zaharia
Is this in the Spark shell? Case classes don't work correctly in the Spark 
shell unfortunately (though they do work in the Scala shell) because we change 
the way lines of code compile to allow shipping functions across the network. 
The best way to get case classes in there is to compile them into a JAR and 
then add that to your spark-shell's classpath with --jars.

Matei

> On Jan 12, 2015, at 10:04 AM, Rosner, Frank (Allianz SE) 
>  wrote:
> 
> Dear Spark Users,
>  
> I googled the web for several hours now but I don't find a solution for my 
> problem. So maybe someone from this list can help.
>  
> I have an RDD of case classes, generated from CSV files with Spark. When I 
> used the distinct operator, there were still duplicates. So I investigated 
> and found out that the equals returns false although the two objects were 
> equal (so were their individual fields as well as toStrings).
>  
> After googling it I found that the case class equals might break in case the 
> two objects are created by different class loaders. So I implemented my own 
> equals method using mattern matching (code example below). It still didn't 
> work. Some debugging revealed that the problem lies in the pattern matching. 
> Depending on the objects I compare (and maybe the split / classloader they 
> are generated in?) the patternmatching works /doesn't:
>  
> case class Customer(id: String, age: Option[Int], entryDate: 
> Option[java.util.Date]) {
>   def equals(that: Any): Boolean = that match {
> case Customer(id, age, entryDate) => {
>   println("Pattern matching worked!")
>   this.id == id && this.age == age && this.entryDate == entryDate
> }
> case _ => false
>   }
> }
>  
> //val x: Array[Customer]
> // ... some spark code to filter original data and collect x
>  
> scala> x(0)
> Customer("a", Some(5), Some(Fri Sep 23 00:00:00 CEST 1994))
> scala> x(1)
> Customer("a", None, None)
> scala> x(2)
> Customer("a", None, None)
> scala> x(3)
> Customer("a", None, None)
>  
> scala> x(0) == x(0) // should be true and works
> Pattern matching works!
> res0: Boolean = true
> scala> x(0) == x(1) // should be false and works
> Pattern matching works!
> res1: Boolean = false
> scala> x(1) == x(2) // should be true, does not work
> res2: Boolean = false
> scala> x(2) == x(3) // should be true, does not work
> Pattern matching works!
> res3: Boolean = true
> scala> x(0) == x(3) // should be false, does not work
> res4: Boolean = false
>  
> Why is the pattern matching not working? It seems that there are two kinds of 
> Customers: 0,1 and 2,3 which don't match somehow. Is this related to some 
> classloaders? Is there a way around this other than using instanceof and 
> defining a custom equals operation for every case class I write?
>  
> Thanks for the help!
> Frank



Re: can I buffer flatMap input at each worker node?

2015-01-12 Thread Sven Krasser
Not sure I understand correctly, but it sounds like you're looking for
mapPartitions().
-Sven

On Mon, Jan 12, 2015 at 10:17 AM, maherrt  wrote:

> Dear All
>
> what i want to do is :
> as the data is partitioned on many worker nodes I want to be able to
> process
> this partition of data as a whole on each partition and then produce my
> output using flatMap for example.
> so can I loads all of the input records on one worker node and emitting any
> output using map function?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Trouble with large Yarn job

2015-01-12 Thread Anders Arpteg
Yes sure Sandy, I've checked the logs and it's not a OOM issue. I've
actually been able to solve the problem finally, and it seems to be an
issue with too many partitions. The repartitioning I tried initially did so
after the union, and then it's too late. By repartitioning as early as
possible, and significantly reducing number of partitions (going from
100,000+ to ~6,000 partitions), the job succeeds and no more "Error
communicating with MapOutputTracker" issues. Seems like an issue with
handling too many partitions and executors as the same time.

Would be awesome with an "auto-repartition" function, that checks sizes of
existing partitions and compares with the HDFS block size. If too small (or
too large), it would repartition to partition sizes similar to the block
size...

Hope this help others with similar issues.

Best,
Anders

On Mon, Jan 12, 2015 at 6:32 AM, Sandy Ryza  wrote:

> Hi Anders,
>
> Have you checked your NodeManager logs to make sure YARN isn't killing
> executors for exceeding memory limits?
>
> -Sandy
>
> On Tue, Jan 6, 2015 at 8:20 AM, Anders Arpteg  wrote:
>
>> Hey,
>>
>> I have a job that keeps failing if too much data is processed, and I
>> can't see how to get it working. I've tried repartitioning with more
>> partitions and increasing amount of memory for the executors (now about 12G
>> and 400 executors. Here is a snippets of the first part of the code, which
>> succeeds without any problems:
>>
>> val all_days = sc.union(
>>   ds.dateInterval(startDate, date).map(date =>
>> sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
>>   .map(s => (
>> (s.getUsername, s.getTrackUri),
>> UserItemData(s.getUsername, s.getTrackUri,
>>   build_vector1(date, s),
>>   build_vector2(s
>>   )
>> )
>>   .reduceByKey(sum_vectors)
>>
>> I want to process 30 days of data or more, but am only able to process
>> about 10 days. If having more days of data (lower startDate in code
>> above), the union above succeeds but the code below fails with "Error
>> communicating with MapOutputTracker" (see http://pastebin.com/fGDCXPkL
>> for more detailed error messages). Here is a snippet of the code that fails:
>>
>> val top_tracks = all_days.map(t => (t._1._2.toString, 1)).reduceByKey
>> (_+_)
>>   .filter(trackFilter)
>>   .repartition(4)
>>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
>>
>> val observation_data = all_days
>>   .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
>>   .join(top_tracks)
>>
>> The calculation of top_tracks works, but the last mapPartitions task
>> fails with given error message if given more than 10 days of data. Also
>> tried increasing the spark.akka.askTimeout setting, but it still fails
>> even if 10-folding the timeout setting to 300 seconds. I'm using Spark 1.2
>> and the kryo serialization.
>>
>> Realize that this is a rather long message, but I'm stuck and would
>> appreciate any help or clues for resolving this issue. Seems to be a
>> out-of-memory issue, but it does not seems to help to increase the number
>> of partitions.
>>
>> Thanks,
>> Anders
>>
>
>


can I buffer flatMap input at each worker node?

2015-01-12 Thread maherrt
Dear All

what i want to do is :
as the data is partitioned on many worker nodes I want to be able to process
this partition of data as a whole on each partition and then produce my
output using flatMap for example.
so can I loads all of the input records on one worker node and emitting any
output using map function?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-I-buffer-flatMap-input-at-each-worker-node-tp21106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pattern Matching / Equals on Case Classes in Spark Not Working

2015-01-12 Thread Rosner, Frank (Allianz SE)
Dear Spark Users,

I googled the web for several hours now but I don't find a solution for my 
problem. So maybe someone from this list can help.

I have an RDD of case classes, generated from CSV files with Spark. When I used 
the distinct operator, there were still duplicates. So I investigated and found 
out that the equals returns false although the two objects were equal (so were 
their individual fields as well as toStrings).

After googling it I found that the case class equals might break in case the 
two objects are created by different class loaders. So I implemented my own 
equals method using mattern matching (code example below). It still didn't 
work. Some debugging revealed that the problem lies in the pattern matching. 
Depending on the objects I compare (and maybe the split / classloader they are 
generated in?) the patternmatching works /doesn't:

case class Customer(id: String, age: Option[Int], entryDate: 
Option[java.util.Date]) {
  def equals(that: Any): Boolean = that match {
case Customer(id, age, entryDate) => {
  println("Pattern matching worked!")
  this.id == id && this.age == age && this.entryDate == entryDate
}
case _ => false
  }
}

//val x: Array[Customer]
// ... some spark code to filter original data and collect x

scala> x(0)
Customer("a", Some(5), Some(Fri Sep 23 00:00:00 CEST 1994))
scala> x(1)
Customer("a", None, None)
scala> x(2)
Customer("a", None, None)
scala> x(3)
Customer("a", None, None)

scala> x(0) == x(0) // should be true and works
Pattern matching works!
res0: Boolean = true
scala> x(0) == x(1) // should be false and works
Pattern matching works!
res1: Boolean = false
scala> x(1) == x(2) // should be true, does not work
res2: Boolean = false
scala> x(2) == x(3) // should be true, does not work
Pattern matching works!
res3: Boolean = true
scala> x(0) == x(3) // should be false, does not work
res4: Boolean = false

Why is the pattern matching not working? It seems that there are two kinds of 
Customers: 0,1 and 2,3 which don't match somehow. Is this related to some 
classloaders? Is there a way around this other than using instanceof and 
defining a custom equals operation for every case class I write?

Thanks for the help!
Frank


Re: Issue writing to Cassandra from Spark

2015-01-12 Thread Ankur Srivastava
Hi Akhil,

Thank you for the pointers. Below is how we are saving data to Cassandra.

javaFunctions(rddToSave).writerBuilder(datapipelineKeyspace,

  datapipelineOutputTable, mapToRow(Sample.class))

The data we are saving at this stage is ~200 million rows.

How do we control application threads in spark so that it does not exceed "
rpc_max_threads"? We are running with default value of this property in
cassandra.yaml. I have already set these
two properties for Spark-Cassandra connector:

spark.cassandra.output.batch.size.rows=1
spark.cassandra.output.concurrent.writes=1

Thanks
- Ankur


On Sun, Jan 11, 2015 at 10:16 PM, Akhil Das 
wrote:

> I see, can you paste the piece of code? Its probably because you are
> exceeding the number of connection that are specified in the
> property rpc_max_threads. Make sure you close all the connections properly.
>
> Thanks
> Best Regards
>
> On Mon, Jan 12, 2015 at 7:45 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Akhil, thank you for your response.
>>
>> Actually we are first reading from cassandra and then writing back after
>> doing some processing. All the reader stages succeed with no error and many
>> writer stages also succeed but many fail as well.
>>
>> Thanks
>> Ankur
>>
>> On Sat, Jan 10, 2015 at 10:15 PM, Akhil Das 
>> wrote:
>>
>>> Just make sure you are not connecting to the Old RPC Port (9160), new
>>> binary port is running on 9042.
>>>
>>> What is your rpc_address listed in cassandra.yaml? Also make sure you
>>> have start_native_transport: *true *in the yaml file.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Jan 10, 2015 at 8:44 AM, Ankur Srivastava <
>>> ankur.srivast...@gmail.com> wrote:
>>>
 Hi,

 We are currently using spark to join data in Cassandra and then write
 the results back into Cassandra. While reads happen with out any error
 during the writes we see many exceptions like below. Our environment
 details are:

 - Spark v 1.1.0
 - spark-cassandra-connector-java_2.10 v 1.1.0

 We are using below settings for the writer

 spark.cassandra.output.batch.size.rows=1

 spark.cassandra.output.concurrent.writes=1

 com.datastax.driver.core.exceptions.NoHostAvailableException: All
 host(s) tried for query failed (tried: [] - use getErrors() for details)

 at
 com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108)

 at
 com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

 Thanks

 Ankur

>>>
>>>
>>
>


Re: How to use memcached with spark

2015-01-12 Thread Akhil Das
You can try this client https://github.com/jakedouglas/memcached.scala
wouldn't be that hard to integrate it with hadoopFile.

Thanks
Best Regards

On Mon, Jan 12, 2015 at 9:22 PM, octavian.ganea 
wrote:

> I am trying to use it, but without success. Any sample code that works with
> Spark would be highly appreciated. :)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-memcached-with-spark-tp13409p21103.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread David McWhorter
Hi Ganelin, sorry if it wasn't clear from my previous email, but that is 
how I am creating a spark context.  I just didn't write out the lines 
where I create the new SparkConf and SparkContext.  I am also upping the 
driver memory when running.


Thanks,
David

On 01/12/2015 11:12 AM, Ganelin, Ilya wrote:

There are two related options:

To solve your problem directly try:
valconf =newSparkConf().set("spark.yarn.driver.memoryOverhead","1024")
valsc =newSparkContext(conf)
And the second, which increases the overall memory available on the driver, as 
part of your spark-submit script add:
--driver-memory 2g
Hope this helps!


From: David McWhorter mailto:mcwhor...@ccri.com>>
Date: Monday, January 12, 2015 at 11:01 AM
To: "user@spark.apache.org " 
mailto:user@spark.apache.org>>

Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

Hi all,

I'm trying to figure out how to set this option: " 
spark.yarn.driver.memoryOverhead" on Spark 1.2.0.  I found this 
helpful overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, 
which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 
added to spark-submit.  However, when I do that I get this error:

Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling 
sparkConf.set("spark.yarn.driver.memoryOverhead", "1024") on my spark 
configuration object but I still get "Will allocate AM container, with 
 MB memory including 384 MB overhead" when launching.  I'm running 
in yarn-cluster mode.


Any help or tips would be appreciated.

Thanks,
David
--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com  | 434.299.0090x204



The information contained in this e-mail is confidential and/or 
proprietary to Capital One and/or its affiliates. The information 
transmitted herewith is intended only for use by the individual or 
entity to which it is addressed.  If the reader of this message is not 
the intended recipient, you are hereby notified that any review, 
retransmission, dissemination, distribution, copying or other use of, 
or taking of any action in reliance upon this information is strictly 
prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.




--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204



Spark executors resources. Blocking?

2015-01-12 Thread Luis Guerra
Hello all,

I have a naive question regarding how spark uses the executors in a cluster
of machines. Imagine the scenario in which I do not know the input size of
my data in execution A, so I set Spark to use 20 (out of my 25 nodes, for
instance). At the same time, I also launch a second execution B, setting
Spark to use 10 nodes for this.

Assuming a huge input size for execution A, which implies an execution time
of 30 minutes for example (using all the resources), and a constant
execution time for B of 10 minutes, then both executions will last for 40
minutes (I assume that B cannot be launched until 10 resources are
completely available, when A finishes).

Now, assuming a very small input size for execution A running for 5 minutes
in only 2 of the 20 planned resources, I would like execution B to be
launched at that time, consuming both executions only 10 minutes (and 12
resources). However, as execution A has set Spark to use 20 resources,
execution B has to wait until A has finished, so the total execution time
lasts for 15 minutes.

Is this right? If so, how can I solve this kind of scenarios? If I am
wrong, what would be the correct interpretation for this?

Thanks in advance,

Best


Re: Failed to save RDD as text file to local file system

2015-01-12 Thread Sean Owen
I think you're confusing HDFS paths and local paths. You are cd'ing to
a directory and seem to want to write output there, but your path has
no scheme and defaults to being an HDFS path. When you use "file:" you
seem to have a permission error (perhaps).

On Mon, Jan 12, 2015 at 4:21 PM, NingjunWang
 wrote:
> Prannoy
>
>
>
> I tried this r.saveAsTextFile("home/cloudera/tmp/out1"), it return without
> error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is
> not cretaed.
>
>
>
> I also tried the following
>
> cd /home/cloudera/tmp/
>
> spark-shell
>
> scala> val r = sc.parallelize(Array("a", "b", "c"))
>
> scala> r.saveAsTextFile("out1")
>
>
>
> It does not return error. But still there is no “out1” folder created under
> /home/cloudera/tmp/
>
>
>
> I tried to give absolute path but then get an error
>
>
>
> scala> r.saveAsTextFile("/home/cloudera/tmp/out1")
>
> org.apache.hadoop.security.AccessControlException: Permission denied:
> user=cloudera, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
>
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057)
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030)
>
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787)
>
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
>
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
>
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
>
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
>
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
>
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:415)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
>
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
>
>
>
> Very frustrated. Please advise.
>
>
>
>
>
> Regards,
>
>
>
> Ningjun Wang
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>
> From: Prannoy [via Apache Spark User List] [mailto:ml-node+[hidden email]]
> Sent: Monday, January 12, 2015 4:18 AM
>
>
> To: Wang, Ningjun (LNG-NPV)
> Subject: Re: Failed to save RDD as text file to local file system
>
>
>
> Have you tried simple giving the path where you want to save the file ?
>
>
>
> For instance in your case just do
>
>
>
> r.saveAsTextFile("home/cloudera/tmp/out1")
>
>
>
> Dont use file
>
>
>
> This will create a folder with name out1. saveAsTextFile always write by
> making a directory, it does not write data into a single file.
>
>
>
> Incase you need a single file you can use copyMerge API in FileUtils.
>
>
>
> FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 ,
> true, conf,null);
>
> Now out2 will be a single file containing your data.
>
> fs is the configuration of you local file system.
>
> Thanks
>
>
>
>
>
> On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List]
> <[hidden email]> wrote:
>
> No, do you have any idea?
>
>
>
> Regards,
>
>
>
> Ningjun Wang
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>
> From: firemonk9 [via Apache Spark User List] [mailto:[hidden email][hidden
> em

Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread Sean Owen
Isn't the syntax "--conf property=value"?

http://spark.apache.org/docs/latest/configuration.html

Yes, I think setting it after the driver is running is of course too late.

On Mon, Jan 12, 2015 at 4:01 PM, David McWhorter  wrote:
> Hi all,
>
> I'm trying to figure out how to set this option: "
> spark.yarn.driver.memoryOverhead" on Spark 1.2.0.  I found this helpful
> overview
> http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476,
> which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added
> to spark-submit.  However, when I do that I get this error:
> Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
> Run with --help for usage help or --verbose for debug output
> I have also tried calling sparkConf.set("spark.yarn.driver.memoryOverhead",
> "1024") on my spark configuration object but I still get "Will allocate AM
> container, with  MB memory including 384 MB overhead" when launching.
> I'm running in yarn-cluster mode.
>
> Any help or tips would be appreciated.
>
> Thanks,
> David
>
> --
>
> David McWhorter
> Software Engineer
> Commonwealth Computer Research, Inc.
> 1422 Sachem Place, Unit #1
> Charlottesville, VA 22901
> mcwhor...@ccri.com | 434.299.0090x204

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Failed to save RDD as text file to local file system

2015-01-12 Thread NingjunWang
Prannoy

I tried this r.saveAsTextFile("home/cloudera/tmp/out1"), it return without 
error. But where does it saved to? The folder “/home/cloudera/tmp/out1” is not 
cretaed.

I also tried the following
cd /home/cloudera/tmp/
spark-shell
scala> val r = sc.parallelize(Array("a", "b", "c"))
scala> r.saveAsTextFile("out1")

It does not return error. But still there is no “out1” folder created under 
/home/cloudera/tmp/

I tried to give absolute path but then get an error

scala> r.saveAsTextFile("/home/cloudera/tmp/out1")
org.apache.hadoop.security.AccessControlException: Permission denied: 
user=cloudera, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
at 
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6286)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6268)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6220)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4087)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4057)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4030)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:787)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

Very frustrated. Please advise.


Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: Prannoy [via Apache Spark User List] 
[mailto:ml-node+s1001560n21093...@n3.nabble.com]
Sent: Monday, January 12, 2015 4:18 AM
To: Wang, Ningjun (LNG-NPV)
Subject: Re: Failed to save RDD as text file to local file system

Have you tried simple giving the path where you want to save the file ?

For instance in your case just do

r.saveAsTextFile("home/cloudera/tmp/out1")

Dont use file

This will create a folder with name out1. saveAsTextFile always write by making 
a directory, it does not write data into a single file.

Incase you need a single file you can use copyMerge API in FileUtils.

FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 , 
true, conf,null);
Now out2 will be a single file containing your data.
fs is the configuration of you local file system.
Thanks


On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] 
<[hidden email]> wrote:
No, do you have any idea?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: firemonk9 [via Apache Spark User List] [mailto:[hidden 
email][hidden 
email]]
Sent: Friday, January 09, 2015 2:56 PM
To: Wang, Ningjun (LNG-NPV)
Subject: Re: Failed to save RDD as text file to local file system

Have you found any resolution for this issue ?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
To unsubscribe from Failed to save RDD as text file to local file system, click 
here.
NAML

Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread Ganelin, Ilya
There are two related options:

To solve your problem directly try:

val conf = new SparkConf().set("spark.yarn.driver.memoryOverhead", "1024")
val sc = new SparkContext(conf)

And the second, which increases the overall memory available on the driver, as 
part of your spark-submit script add:

--driver-memory 2g


Hope this helps!


From: David McWhorter mailto:mcwhor...@ccri.com>>
Date: Monday, January 12, 2015 at 11:01 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

Hi all,

I'm trying to figure out how to set this option: " 
spark.yarn.driver.memoryOverhead" on Spark 1.2.0.  I found this helpful 
overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476,
 which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to 
spark-submit.  However, when I do that I get this error:
Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling sparkConf.set("spark.yarn.driver.memoryOverhead", 
"1024") on my spark configuration object but I still get "Will allocate AM 
container, with  MB memory including 384 MB overhead" when launching.  I'm 
running in yarn-cluster mode.

Any help or tips would be appreciated.

Thanks,
David

--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark does not loop through a RDD.map

2015-01-12 Thread Cody Koeninger
At a quick glance, I think you're misunderstanding some basic features.

http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

Map is a transformation, it is lazy.  You're not calling any action on the
result of map.

Also, closing over a mutable variable (like idx or featArray here) won't
work; that closure is being run on executors, not the driver where your
main code is running.

On Mon, Jan 12, 2015 at 9:49 AM, rkgurram  wrote:

> Hi,
>I am observing some weird behavior with spark, it might be my
> mis-interpretation of some fundamental concepts but I have look at it for 3
> days and have not been able to solve it.
>
> The source code is pretty long and complex so instead of posting it, I will
> try to articulate the problem.
> I am building a "Sentiment Analyser" using the Naive Bayes model in Spark.
>
> 1) I have taken text files in RAW format and created a RDD of
> words->Array(files the word is found in).
>
>  2) From this I have derived the "features" array for each file which is an
> Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
> word is found in the file
>
> 3) I have then created an RDD[LabeledPoints]
>
> from this I have created the Naive Baiyes model using the following code
>
> val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
> val training = splits(0)
>// training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
> val test = splits(1)
> Logger.info("Training count: " + training.count() + " Testing count:" +
> test.count())
> model = NaiveBayes.train(training, lambda = 1.0)
>
> val predictionAndLabel = test.map(p => (model.predict(p.features),
> p.label))
> val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 ==
> x._2).count() / test.count()
> Logger.info("Fold:[" + fold + "] accuracy: [" + accuracy +"]")
>
> 4) The model seems to be fine and the accuracy is about 75% to 82%
> depending
> on which set of input fles I provide.
>
> 5) Now I am using this model to "predict()",  here I am creating the same
> feature array from the input text file and I have code as follows,
>/*
> * Print all the features (words) in the feature array
> */
>allFeaturesRDD.foreach((x) => print(x + ", "))
>
>  /*
>   * Build the feature array
>   */
>
> val features = buildFeatureArray(reviewID,wordSeqRdd) < Fails here,
> have show this code below
> logFeatureArray(features)
>
> val prediction = model.predict(Vectors.dense(features))
> Logger.info ("Prediction:" + prediction)
>
> ==
> reviewID > filename
> wordReviewSeqRDD -> RDD[(word, Array(filename)]
>
>   def buildFeatureArray(reviewID:String,
> wordReviewSeqRDD:RDD[(String,Seq[String])]):
> Array[Double] = {
>
> val numWords = allFeaturesRDD.count <--- number of all words in the
> feature
> val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()
>
> var featArray:Array[Double] = new Array(numWords.toInt) <--- create an
> empty features array
> var idx = 0
> if (trainingDone) Logger.info("Feature len:" + numWords)
>
> allFeaturesRDD.map{ *<-- This is where it is failing, *
>   case(eachword) => { *<-- for some reason the code does not enter here
> *
> val reviewList = wordReviewSeqMap.get(eachword).get
>
> if (trainingDone == true) {
>   println("1. eachword:" + eachword + "reviewList:" + reviewList)
>   println("2. reviewList.size:" + reviewList.length)
>   println("3. reviewList(0):" + reviewList(0))
>
> }
>
> featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
> 0.toDouble
> idx += 1
>   }
> }
> featArray
>   }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread David McWhorter

Hi all,

I'm trying to figure out how to set this option: " 
spark.yarn.driver.memoryOverhead" on Spark 1.2.0.  I found this helpful 
overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, 
which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 
added to spark-submit. However, when I do that I get this error:

Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling 
sparkConf.set("spark.yarn.driver.memoryOverhead", "1024") on my spark 
configuration object but I still get "Will allocate AM container, with 
 MB memory including 384 MB overhead" when launching.  I'm running 
in yarn-cluster mode.


Any help or tips would be appreciated.

Thanks,
David

--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204



Re: How to use memcached with spark

2015-01-12 Thread octavian.ganea
I am trying to use it, but without success. Any sample code that works with
Spark would be highly appreciated. :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-memcached-with-spark-tp13409p21103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark does not loop through a RDD.map

2015-01-12 Thread rkgurram
Hi,
   I am observing some weird behavior with spark, it might be my
mis-interpretation of some fundamental concepts but I have look at it for 3
days and have not been able to solve it.

The source code is pretty long and complex so instead of posting it, I will
try to articulate the problem.
I am building a "Sentiment Analyser" using the Naive Bayes model in Spark. 

1) I have taken text files in RAW format and created a RDD of
words->Array(files the word is found in). 

 2) From this I have derived the "features" array for each file which is an
Array[Double], a 0.0 if the file does not contain the word and 1.0 if the
word is found in the file

3) I have then created an RDD[LabeledPoints]

from this I have created the Naive Baiyes model using the following code
   
val splits = uberLbRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
   // training.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val test = splits(1)
Logger.info("Training count: " + training.count() + " Testing count:" +
test.count())
model = NaiveBayes.train(training, lambda = 1.0)

val predictionAndLabel = test.map(p => (model.predict(p.features),
p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 ==
x._2).count() / test.count()
Logger.info("Fold:[" + fold + "] accuracy: [" + accuracy +"]")

4) The model seems to be fine and the accuracy is about 75% to 82% depending
on which set of input fles I provide.

5) Now I am using this model to "predict()",  here I am creating the same
feature array from the input text file and I have code as follows,
   /*
* Print all the features (words) in the feature array
*/
   allFeaturesRDD.foreach((x) => print(x + ", "))

 /*
  * Build the feature array
  */

val features = buildFeatureArray(reviewID,wordSeqRdd) < Fails here,
have show this code below
logFeatureArray(features)

val prediction = model.predict(Vectors.dense(features))
Logger.info ("Prediction:" + prediction)

==
reviewID > filename
wordReviewSeqRDD -> RDD[(word, Array(filename)]

  def buildFeatureArray(reviewID:String,
wordReviewSeqRDD:RDD[(String,Seq[String])]):
Array[Double] = {

val numWords = allFeaturesRDD.count <--- number of all words in the
feature
val wordReviewSeqMap = wordReviewSeqRDD.collectAsMap()

var featArray:Array[Double] = new Array(numWords.toInt) <--- create an
empty features array
var idx = 0
if (trainingDone) Logger.info("Feature len:" + numWords)

allFeaturesRDD.map{ *<-- This is where it is failing, *
  case(eachword) => { *<-- for some reason the code does not enter here
*
val reviewList = wordReviewSeqMap.get(eachword).get

if (trainingDone == true) {
  println("1. eachword:" + eachword + "reviewList:" + reviewList)
  println("2. reviewList.size:" + reviewList.length)
  println("3. reviewList(0):" + reviewList(0))

}

featArray(idx) = if (reviewList.contains(reviewID)) 1.toDouble else
0.toDouble
idx += 1
  }
}
featArray
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-does-not-loop-through-a-RDD-map-tp21102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RowMatrix multiplication

2015-01-12 Thread Alex Minnaar
I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?


Thanks,


Alex


Re: Manually trigger RDD map function without action

2015-01-12 Thread Cody Koeninger
If you don't care about the value that your map produced (because you're
not already collecting or saving it), then is foreach more appropriate to
what you're doing?

On Mon, Jan 12, 2015 at 4:08 AM, kevinkim  wrote:

> Hi, answer from another Kevin.
>
> I think you may already know it,
> 'transformation' in spark
> (
> http://spark.apache.org/docs/latest/programming-guide.html#transformations
> )
> will be done in 'lazy' way, when you trigger 'actions'.
> (http://spark.apache.org/docs/latest/programming-guide.html#actions)
>
> So you can use
> 'collect' - if you need result from memory
> 'count' - if you need to count
> 'saveAs ...' - if you need to persist transformed RDD
>
> Regards,
> Kevin
>
>
> On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] 
> <[hidden
> email] > wrote:
>
>> Hi all
>> Is there efficient way to trigger RDD transformations? I'm now using
>> count action to achieve this.
>>
>> Best regards
>> Kevin
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
>>  To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
> --
> View this message in context: Re: Manually trigger RDD map function
> without action
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Cody Koeninger
You should take a look at

https://issues.apache.org/jira/browse/SPARK-4122

which is implementing writing to kafka in a pretty similar way (make a new
producer inside foreachPartition)

On Mon, Jan 12, 2015 at 5:24 AM, Sean Owen  wrote:

> Leader-not-found suggests a problem with zookeeper config. It depends
> a lot on the specifics of your error. But this is really a Kafka
> question, better for the Kafka list.
>
> On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
>  wrote:
> > Actually this code is producing error leader not found exception. I am
> > unable to find the reason
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: how to select the first row in each group by group?

2015-01-12 Thread Silvio Fiorito
As you described, SparkSQL does not yet have windowing functions. However, 
because we can combine RDD functions with SparkSQL we can achieve the same 
thing.

What you essentially want to do is key by “imei” and use reduceByKey to select 
the record you want for each key.

val keyedTable = table.keyBy(_(0)) // assuming “imei” is in position 0

val reduced = keyedTable.reduceByKey((a,b) => {
  if (a.getInt(1) < b.getInt(1)) a else b
}).map(_._2)

val reducedTable = sqlCtx.applySchema(reduced, table.schema) // since reduced 
is now just an RDD we need to reapply a

   // schema to it, using the original we 
started from



From: LinQili mailto:lin_q...@outlook.com>>
Date: Monday, January 12, 2015 at 4:23 AM
To: "u...@spark.incubator.apache.org" 
mailto:u...@spark.incubator.apache.org>>
Subject: how to select the first row in each group by group?

Hi all:
I am using spark sql to read and write hive tables. But There is a issue that 
how to select the first row in each group by group?
In hive, we could write hql like this:
SELECT imei
FROM (
SELECT imei,
row_number() over (PARTITION BY imei ORDER BY login_time ASC) AS 
row_num
FROM login_log_2015010914
) a
WHERE row_num = 1


In spark sql, how to write the sql equal to the hql?


Re: Removing JARs from spark-jobserver

2015-01-12 Thread Fernando O.
just an FYI: you can configure that using spark.jobserver.filedao.rootdir

On Mon, Jan 12, 2015 at 1:52 AM, abhishek  wrote:

> Nice!  Good to know
> On 11 Jan 2015 21:10, "Sasi [via Apache Spark User List]" <[hidden email]
> > wrote:
>
>> Thank you Abhishek. That works.
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Removing-JARs-from-spark-jobserver-tp21081p21084.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
> --
> View this message in context: Re: Removing JARs from spark-jobserver
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Spark SQL & Parquet - data are reading very very slow

2015-01-12 Thread kaushal
yes , i am also facing same problem .. please any one help to get fast
execution.
thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Parquet-data-are-reading-very-very-slow-tp21061p21100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Rok Roskar
This was without using Kryo -- if I use kryo, I got errors about buffer
overflows (see above):

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

Just calling colStats doesn't actually compute those statistics, does it?
It looks like the computation is only carried out once you call the .mean()
method.



On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng  wrote:

> colStats() computes the mean values along with several other summary
> statistics, which makes it slower. How is the performance if you don't
> use kryo? -Xiangrui
>
> On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar  wrote:
> > thanks for the suggestion -- however, looks like this is even slower.
> With
> > the small data set I'm using, my aggregate function takes ~ 9 seconds and
> > the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
> > the Kyro serializer -- I get the error:
> >
> > com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
> > required: 8
> >
> > is there an easy/obvious fix?
> >
> >
> > On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng  wrote:
> >>
> >> There is some serialization overhead. You can try
> >>
> >>
> https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
> >> . -Xiangrui
> >>
> >> On Wed, Jan 7, 2015 at 9:42 AM, rok  wrote:
> >> > I have an RDD of SparseVectors and I'd like to calculate the means
> >> > returning
> >> > a dense vector. I've tried doing this with the following (using
> pyspark,
> >> > spark v1.2.0):
> >> >
> >> > def aggregate_partition_values(vec1, vec2) :
> >> > vec1[vec2.indices] += vec2.values
> >> > return vec1
> >> >
> >> > def aggregate_combined_vectors(vec1, vec2) :
> >> > if all(vec1 == vec2) :
> >> > # then the vector came from only one partition
> >> > return vec1
> >> > else:
> >> > return vec1 + vec2
> >> >
> >> > means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
> >> > aggregate_combined_vectors)
> >> > means = means / nvals
> >> >
> >> > This turns out to be really slow -- and doesn't seem to depend on how
> >> > many
> >> > vectors there are so there seems to be some overhead somewhere that
> I'm
> >> > not
> >> > understanding. Is there a better way of doing this?
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> >> >
> >
> >
>


Re: Play Scala Spark Exmaple

2015-01-12 Thread Eduardo Cusa
The EC2 versión is 1.1.0 and this is my build.sbt:


libraryDependencies ++= Seq(
  jdbc,
  anorm,
  cache,
  "org.apache.spark"  %% "spark-core"  % "1.1.0",
  "com.typesafe.akka" %% "akka-actor"  % "2.2.3",
  "com.typesafe.akka" %% "akka-slf4j"  % "2.2.3",
  "org.apache.spark"  %% "spark-streaming-twitter" % "1.1.0",
  "org.apache.spark"  %% "spark-sql"   % "1.1.0",
  "org.apache.spark"  %% "spark-mllib" % "1.1.0"
  )



On Sun, Jan 11, 2015 at 3:01 AM, Akhil Das 
wrote:

> What is your spark version that is running on the EC2 cluster? From the build
> file 
> of your play application it seems that it uses Spark 1.0.1.
>
> Thanks
> Best Regards
>
> On Fri, Jan 9, 2015 at 7:17 PM, Eduardo Cusa <
> eduardo.c...@usmediaconsulting.com> wrote:
>
>> Hi guys, I running the following example :
>> https://github.com/knoldus/Play-Spark-Scala in the same machine as the
>> spark master, and the spark cluster was lauched with ec2 script.
>>
>>
>> I'm stuck with this errors, any idea how to fix it?
>>
>> Regards
>> Eduardo
>>
>>
>> call the play app prints the following exception :
>>
>>
>> [*error*] a.r.EndpointWriter - AssociationError 
>> [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] <- 
>> [akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575]: Error 
>> [Shut down address: 
>> akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575] [
>> akka.remote.ShutDownAssociation: Shut down address: 
>> akka.tcp://driverPropsFetcher@ip-10-158-18-250.ec2.internal:52575
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: The 
>> remote system terminated the association because it is shutting down.
>>
>>
>>
>>
>> The master recive the spark application and generate the following stderr
>> log page:
>>
>>
>> 15/01/09 13:31:23 INFO Remoting: Remoting started; listening on addresses 
>> :[akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
>> 15/01/09 13:31:23 INFO Remoting: Remoting now listens on addresses: 
>> [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:37856]
>> 15/01/09 13:31:23 INFO util.Utils: Successfully started service 
>> 'sparkExecutor' on port 37856.
>> 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to MapOutputTracker: 
>> akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/MapOutputTracker
>> 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to BlockManagerMaster: 
>> akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/BlockManagerMaster
>> 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at 
>> /mnt/spark/spark-local-20150109133123-3805
>> 15/01/09 13:31:23 INFO storage.DiskBlockManager: Created local directory at 
>> /mnt2/spark/spark-local-20150109133123-b05e
>> 15/01/09 13:31:23 INFO util.Utils: Successfully started service 'Connection 
>> manager for block manager' on port 36936.
>> 15/01/09 13:31:23 INFO network.ConnectionManager: Bound socket to port 36936 
>> with id = ConnectionManagerId(ip-10-158-18-250.ec2.internal,36936)
>> 15/01/09 13:31:23 INFO storage.MemoryStore: MemoryStore started with 
>> capacity 265.4 MB
>> 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Trying to register 
>> BlockManager
>> 15/01/09 13:31:23 INFO storage.BlockManagerMaster: Registered BlockManager
>> 15/01/09 13:31:23 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: 
>> akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481/user/HeartbeatReceiver
>> 15/01/09 13:31:54 ERROR executor.CoarseGrainedExecutorBackend: Driver 
>> Disassociated [akka.tcp://sparkExecutor@ip-10-158-18-250.ec2.internal:57671] 
>> -> [akka.tcp://sparkDriver@ip-10-28-236-122.ec2.internal:47481] 
>> disassociated! Shutting down.
>>
>>
>


Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Sean Owen
Leader-not-found suggests a problem with zookeeper config. It depends
a lot on the specifics of your error. But this is really a Kafka
question, better for the Kafka list.

On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
 wrote:
> Actually this code is producing error leader not found exception. I am
> unable to find the reason
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does DecisionTree model in MLlib deal with missing values?

2015-01-12 Thread Sean Owen
On Sun, Jan 11, 2015 at 9:46 PM, Christopher Thom
 wrote:
> Is there any plan to extend the data types that would be accepted by the Tree 
> models in Spark? e.g. Many models that we build contain a large number of 
> string-based categorical factors. Currently the only strategy is to map these 
> string values to integers, and store the mapping so the data can be remapped 
> when the model is scored. A viable solution, but cumbersome for models with 
> hundreds of these kinds of factors.

I think there is nothing on the roadmap, except that in the newer ML
API (the bits under spark.ml), there's fuller support for the idea of
a pipeline of transformations, of which performing this encoding could
be one step.

Since it's directly relevant, I don't mind mentioning that we did
build this sort of logic on top of MLlib and PMML. There's nothing
hard about it, just a pile of translation and counting code, such as
in 
https://github.com/OryxProject/oryx/blob/master/oryx-app-common/src/main/java/com/cloudera/oryx/app/rdf/RDFPMMLUtils.java

So there are bits you can reuse out there especially if your goal is
to get to PMML, which will want to represent all the actual
categorical values in its DataDictionary and not encodings.


> Concerning missing data, I haven't been able to figure out how to use NULL 
> values in LabeledPoints, and I'm not sure whether DecisionTrees correctly 
> handle the case of missing data. The only thing I've been able to work out is 
> to use a placeholder value,

Yes, I don't think that's supported. In model training, you can simply
ignore data that can't reach the node because it lacks a feature
needed in a decision rule. This is OK as long as not that much data is
missing.

In scoring you can't not-answer. Again if you refer to PMML, you can
see some ideas about how to handle this:
http://www.dmg.org/v4-2-1/TreeModel.html#xsdType_MISSING-VALUE-STRATEGY

- Make no prediction
- Just copy the last prediction
- Use a model-supplied default for the node
- Use some confidence weighted combination of the answer you'd get by
following both paths

I have opted, in the past, for simply defaulting to the subtree with
more training examples. All of these strategies are approximations,
yes.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Actually this code is producing error leader not found exception. I am
unable to find the reason

On Mon, Jan 12, 2015 at 4:03 PM, kevinkim [via Apache Spark User List] <
ml-node+s1001560n21098...@n3.nabble.com> wrote:

> Well, you can use coalesce() to decrease number of partition to 1.
> (It will take time and quite not efficient, tough)
>
> Regards,
> Kevin.
>
> On Mon Jan 12 2015 at 7:57:39 PM Hafiz Mujadid [via Apache Spark User
> List] <[hidden email]
> > wrote:
>
>> Hi experts!
>>
>>
>> I have a schemaRDD of messages to be pushed in kafka. So I am using
>> following piece of code to do that
>>
>> rdd.foreachPartition(itr => {
>> val props = new Properties()
>> props.put("metadata.broker.list",
>> brokersList)
>> props.put("serializer.class",
>> "kafka.serializer.StringEncoder")
>> props.put("compression.codec",
>> codec.toString)
>> props.put("producer.type", "sync")
>> props.put("batch.num.messages",
>> BatchSize.toString)
>> props.put("message.send.max.retries",
>> maxRetries.toString)
>> props.put("request.required.acks", "-1")
>> producer = new Producer[String,
>> String](new ProducerConfig(props))
>> itr.foreach(row => {
>> val msg =
>> row.toString.drop(1).dropRight(1)
>> this.synchronized {
>> producer.send(new
>> KeyedMessage[String, String](Topic, msg))
>> }
>> })
>> producer.close
>> })
>>
>>
>>
>> the problem with this code is that it creates kafka producer separate for
>> each partition and I want a single producer for all partitions. Is there
>> any way to achieve this?
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
>>  To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21098.html
>  To unsubscribe from creating a single kafka producer object for all
> partitions, click here
> 
> .
> NAML
> 
>



-- 
Regards: HAFIZ MUJADID




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Sean Owen
I might be missing something, but why are multiple Producer objects a
problem? they're all sending to the same topic here.

On Mon, Jan 12, 2015 at 10:56 AM, Hafiz Mujadid
 wrote:
> Hi experts!
>
>
> I have a schemaRDD of messages to be pushed in kafka. So I am using
> following piece of code to do that
>
> rdd.foreachPartition(itr => {
> val props = new Properties()
> props.put("metadata.broker.list", brokersList)
> props.put("serializer.class", 
> "kafka.serializer.StringEncoder")
> props.put("compression.codec", codec.toString)
> props.put("producer.type", "sync")
> props.put("batch.num.messages", 
> BatchSize.toString)
> props.put("message.send.max.retries", 
> maxRetries.toString)
> props.put("request.required.acks", "-1")
> producer = new Producer[String, String](new 
> ProducerConfig(props))
> itr.foreach(row => {
> val msg = 
> row.toString.drop(1).dropRight(1)
> this.synchronized {
> producer.send(new 
> KeyedMessage[String, String](Topic, msg))
> }
> })
> producer.close
> })
>
>
>
> the problem with this code is that it creates kafka producer separate for
> each partition and I want a single producer for all partitions. Is there any
> way to achieve this?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread kevinkim
Well, you can use coalesce() to decrease number of partition to 1.
(It will take time and quite not efficient, tough)

Regards,
Kevin.

On Mon Jan 12 2015 at 7:57:39 PM Hafiz Mujadid [via Apache Spark User List]
 wrote:

> Hi experts!
>
>
> I have a schemaRDD of messages to be pushed in kafka. So I am using
> following piece of code to do that
>
> rdd.foreachPartition(itr => {
> val props = new Properties()
> props.put("metadata.broker.list",
> brokersList)
> props.put("serializer.class",
> "kafka.serializer.StringEncoder")
> props.put("compression.codec",
> codec.toString)
> props.put("producer.type", "sync")
> props.put("batch.num.messages",
> BatchSize.toString)
> props.put("message.send.max.retries",
> maxRetries.toString)
> props.put("request.required.acks", "-1")
> producer = new Producer[String,
> String](new ProducerConfig(props))
> itr.foreach(row => {
> val msg =
> row.toString.drop(1).dropRight(1)
> this.synchronized {
> producer.send(new
> KeyedMessage[String, String](Topic, msg))
> }
> })
> producer.close
> })
>
>
>
> the problem with this code is that it creates kafka producer separate for
> each partition and I want a single producer for all partitions. Is there
> any way to achieve this?
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
>  To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Issue with Parquet on Spark 1.2 and Amazon EMR

2015-01-12 Thread Aniket Bhatnagar
Meanwhile, I have submitted a pull request (
https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users
to place their jars ahead of all other jars in spark classpath. This should
serve as a temporary workaround for all class conflicts.

Thanks,
Aniket

On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan  wrote:

>   I've noticed the same thing recently and will contact the appropriate
> owner soon.  (I work for Amazon, so I'll go through internal channels and
> report back to this list.)
>
>  In the meantime, I've found that editing spark-env.sh and putting the
> Spark assembly first in the classpath fixes the issue.  I expect that the
> version of Parquet that's being included in the EMR libs just needs to be
> upgraded.
>
>
>  ~ Jonathan Kelly
>
>   From: Aniket Bhatnagar 
> Date: Sunday, January 4, 2015 at 10:51 PM
> To: Adam Gilmore , "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR
>
>   Can you confirm your emr version? Could it be because of the classpath
> entries for emrfs? You might face issues with using S3 without them.
>
> Thanks,
> Aniket
>
> On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore  wrote:
>
>>  Just an update on this - I found that the script by Amazon was the
>> culprit - not exactly sure why.  When I installed Spark manually onto the
>> EMR (and did the manual configuration of all the EMR stuff), it worked fine.
>>
>> On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore 
>> wrote:
>>
>>>  Hi all,
>>>
>>>  I've just launched a new Amazon EMR cluster and used the script at:
>>>
>>>  s3://support.elasticmapreduce/spark/install-spark
>>>
>>>  to install Spark (this script was upgraded to support 1.2).
>>>
>>>  I know there are tools to launch a Spark cluster in EC2, but I want to
>>> use EMR.
>>>
>>>  Everything installs fine; however, when I go to read from a Parquet
>>> file, I end up with (the main exception):
>>>
>>>  Caused by: java.lang.NoSuchMethodError:
>>> parquet.hadoop.ParquetInputSplit.(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V
>>> at
>>> parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578)
>>> ... 55 more
>>>
>>>  It seems to me like a version mismatch somewhere.  Where is the
>>> parquet-hadoop jar coming from?  Is it built into a fat jar for Spark?
>>>
>>>  Any help would be appreciated.  Note that 1.1.1 worked fine with
>>> Parquet files.
>>>
>>
>>


creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Hi experts!


I have a schemaRDD of messages to be pushed in kafka. So I am using
following piece of code to do that

rdd.foreachPartition(itr => {
val props = new Properties()
props.put("metadata.broker.list", brokersList)
props.put("serializer.class", 
"kafka.serializer.StringEncoder")
props.put("compression.codec", codec.toString)
props.put("producer.type", "sync")
props.put("batch.num.messages", 
BatchSize.toString)
props.put("message.send.max.retries", 
maxRetries.toString)
props.put("request.required.acks", "-1")
producer = new Producer[String, String](new 
ProducerConfig(props))
itr.foreach(row => {
val msg = 
row.toString.drop(1).dropRight(1)
this.synchronized {
producer.send(new 
KeyedMessage[String, String](Topic, msg))
}
})
producer.close
})



the problem with this code is that it creates kafka producer separate for
each partition and I want a single producer for all partitions. Is there any
way to achieve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with building spark-1.2.0

2015-01-12 Thread Sean Owen
The problem is there in the logs. When it went to clone some code,
something went wrong with the proxy:

Received HTTP code 407 from proxy after CONNECT

Probably you have an HTTP proxy and you have not authenticated. It's
specific to your environment.

Although it's unrelated, I'm curious how your build refers to
https://github.com/ScrapCodes/sbt-pom-reader.git  I don't see this
repo the project code base.

On Mon, Jan 12, 2015 at 9:09 AM, Kartheek.R  wrote:
> Hi,
> This is what I am trying to do:
>
> karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
> Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
> Note, this will be overridden by -java-home if it is set.
> [info] Loading project definition from
> /home/karthik/spark-1.2.0/project/project
> Cloning into
> '/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
> fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/':
> Received HTTP code 407 from proxy after CONNECT
> java.lang.RuntimeException: Nonzero exit code (128): git clone
> https://github.com/ScrapCodes/sbt-pom-reader.git

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed to save RDD as text file to local file system

2015-01-12 Thread Sean Owen
Without a scheme, it will be interpreted relative to the default FS
configured in the Hadoop configuration, which is almost surely HDFS.

No, the stack trace does not imply it was writing to HDFS. It would
use the HDFS FileSystem API in any event, but, you can see that the
path was a file: URI.

The goal is to write a local file. I don't see that multiple files are
a problem either.

The original problem is simply that mkdirs failed and that's almost
surely a permission issue. I don't see that this has been addressed by
the OP.

On Mon, Jan 12, 2015 at 9:18 AM, Prannoy  wrote:
> Have you tried simple giving the path where you want to save the file ?
>
> For instance in your case just do
>
> r.saveAsTextFile("home/cloudera/tmp/out1")
>
> Dont use file
>
> This will create a folder with name out1. saveAsTextFile always write by
> making a directory, it does not write data into a single file.
>
> Incase you need a single file you can use copyMerge API in FileUtils.
>
> FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 ,
> true, conf,null);
>
> Now out2 will be a single file containing your data.
>
> fs is the configuration of you local file system.
>
> Thanks
>
>
>
> On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List]
> <[hidden email]> wrote:
>>
>> No, do you have any idea?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Ningjun Wang
>>
>> Consulting Software Engineer
>>
>> LexisNexis
>>
>> 121 Chanlon Road
>>
>> New Providence, NJ 07974-1541
>>
>>
>>
>> From: firemonk9 [via Apache Spark User List] [mailto:[hidden email][hidden
>> email]]
>> Sent: Friday, January 09, 2015 2:56 PM
>> To: Wang, Ningjun (LNG-NPV)
>> Subject: Re: Failed to save RDD as text file to local file system
>>
>>
>>
>> Have you found any resolution for this issue ?
>>
>> 
>>
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
>>
>> To unsubscribe from Failed to save RDD as text file to local file system,
>> click here.
>> NAML
>>
>>
>>
>> 
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>
>
>
> 
> View this message in context: Re: Failed to save RDD as text file to local
> file system
>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Manually trigger RDD map function without action

2015-01-12 Thread kevinkim
Hi, answer from another Kevin.

I think you may already know it,
'transformation' in spark
(http://spark.apache.org/docs/latest/programming-guide.html#transformations)
will be done in 'lazy' way, when you trigger 'actions'.
(http://spark.apache.org/docs/latest/programming-guide.html#actions)

So you can use
'collect' - if you need result from memory
'count' - if you need to count
'saveAs ...' - if you need to persist transformed RDD

Regards,
Kevin


On Mon Jan 12 2015 at 6:48:54 PM Kevin Jung [via Apache Spark User List] <
ml-node+s1001560n21094...@n3.nabble.com> wrote:

> Hi all
> Is there efficient way to trigger RDD transformations? I'm now using count
> action to achieve this.
>
> Best regards
> Kevin
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
>  To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Hi all
Is there efficient way to trigger RDD transformations? I'm now using count
action to achieve this.

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to select the first row in each group by group?

2015-01-12 Thread LinQili
Hi all:I am using spark sql to read and write hive tables. But There is a issue 
that how to select the first row in each group by group?In hive, we could write 
hql like this:SELECT imeiFROM (SELECT imei,
row_number() over (PARTITION BY imei ORDER BY login_time ASC) AS row_num
FROM login_log_2015010914) a  WHERE row_num = 1

In spark sql, how to write the sql equal to the hql?
  

Re: Failed to save RDD as text file to local file system

2015-01-12 Thread Prannoy
Have you tried simple giving the path where you want to save the file ?

For instance in your case just do

*r.saveAsTextFile("home/cloudera/tmp/out1") *

Dont use* file*

This will create a folder with name out1. saveAsTextFile always write by
making a directory, it does not write data into a single file.

Incase you need a single file you can use copyMerge API in FileUtils.

*FileUtil.copyMerge(fs, home/cloudera/tmp/out1, fs,home/cloudera/tmp/out2 ,
true, conf,null);*

Now out2 will be a single file containing your data.

*fs* is the configuration of you local file system.

Thanks



On Sat, Jan 10, 2015 at 1:36 AM, NingjunWang [via Apache Spark User List] <
ml-node+s1001560n21068...@n3.nabble.com> wrote:

>  No, do you have any idea?
>
>
>
> Regards,
>
>
>
> *Ningjun Wang*
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>
> *From:* firemonk9 [via Apache Spark User List] [mailto:ml-node+[hidden
> email] ]
> *Sent:* Friday, January 09, 2015 2:56 PM
> *To:* Wang, Ningjun (LNG-NPV)
> *Subject:* Re: Failed to save RDD as text file to local file system
>
>
>
> Have you found any resolution for this issue ?
>  --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21067.html
>
> To unsubscribe from Failed to save RDD as text file to local file system, 
> click
> here.
> NAML
> 
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21068.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-save-RDD-as-text-file-to-local-file-system-tp21050p21093.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Problem with building spark-1.2.0

2015-01-12 Thread Kartheek.R
Hi,
This is what I am trying to do:

karthik@s4:~/spark-1.2.0$ SPARK_HADOOP_VERSION=2.3.0 sbt/sbt clean
Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from
/home/karthik/spark-1.2.0/project/project
Cloning into
'/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader'...
fatal: unable to access 'https://github.com/ScrapCodes/sbt-pom-reader.git/':
Received HTTP code 407 from proxy after CONNECT
java.lang.RuntimeException: Nonzero exit code (128): git clone
https://github.com/ScrapCodes/sbt-pom-reader.git
/home/karthik/.sbt/0.13/staging/ad8e8574a5bcb2d22d23/sbt-pom-reader
at scala.sys.package$.error(package.scala:27)
at sbt.Resolvers$.run(Resolvers.scala:127)
at sbt.Resolvers$.run(Resolvers.scala:117)
at sbt.Resolvers$$anon$2.clone(Resolvers.scala:74)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11$$anonfun$apply$5.apply$mcV$sp(Resolvers.scala:99)
at sbt.Resolvers$.creates(Resolvers.scala:134)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:98)
at
sbt.Resolvers$DistributedVCS$$anonfun$toResolver$1$$anonfun$apply$11.apply(Resolvers.scala:97)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:88)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$3.apply(BuildLoader.scala:87)
at scala.Option.map(Option.scala:145)
at 
sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:87)
at 
sbt.BuildLoader$$anonfun$componentLoader$1.apply(BuildLoader.scala:83)
at sbt.MultiHandler.apply(BuildLoader.scala:15)
at sbt.BuildLoader.apply(BuildLoader.scala:139)
at sbt.Load$.loadAll(Load.scala:334)
at sbt.Load$.loadURI(Load.scala:289)
at sbt.Load$.load(Load.scala:285)
at sbt.Load$.load(Load.scala:276)
at sbt.Load$.apply(Load.scala:130)
at sbt.Load$.buildPluginDefinition(Load.scala:819)
at sbt.Load$.buildPlugins(Load.scala:785)
at sbt.Load$.plugins(Load.scala:773)
at sbt.Load$.loadUnit(Load.scala:431)
at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281)
at sbt.Load$$anonfun$18$$anonfun$apply$11.apply(Load.scala:281)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:91)
at
sbt.BuildLoader$$anonfun$componentLoader$1$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6.apply(BuildLoader.scala:90)
at sbt.BuildLoader.apply(BuildLoader.scala:140)
at sbt.Load$.loadAll(Load.scala:334)
at sbt.Load$.loadURI(Load.scala:289)
at sbt.Load$.load(Load.scala:285)
at sbt.Load$.load(Load.scala:276)
at sbt.Load$.apply(Load.scala:130)
at sbt.Load$.defaultLoad(Load.scala:36)
at sbt.BuiltinCommands$.doLoadProject(Main.scala:481)
at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475)
at sbt.BuiltinCommands$$anonfun$loadProjectImpl$2.apply(Main.scala:475)
at
sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58)
at
sbt.Command$$anonfun$applyEffect$1$$anonfun$apply$2.apply(Command.scala:58)
at
sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60)
at
sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:60)
at sbt.Command$.process(Command.scala:92)
at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98)
at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:98)
at sbt.State$$anon$1.process(State.scala:184)
at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98)
at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:98)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.MainLoop$.next(MainLoop.scala:98)
at sbt.MainLoop$.run(MainLoop.scala:91)
at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:70)
at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:65)
at sbt.Using.apply(Using.scala:24)
at sbt.MainLoop$.runWithNewLog(MainLoop.scala:65)
at sbt.MainLoop$.runAndClearLast(MainLoop.scala:48)
at sbt.MainLoop$.runLoggedLoop(MainLoop.scala:32)
at sbt.MainLoop$.runLogged(MainLoop.scala:24)
at sbt.StandardMain$.runManaged(Main.scala:53)
at sbt.xMain.run(Main.scala:28)
at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109)
at xsbt.boot.Launch$.withContextLoader(Launch.scala:128)
at xsbt.boot.Launch$.run(Launch.scala:109)
at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35)
at xsbt.boot.Launch$.launch(Launch.scala:117)
at xsbt.boot.Launch$.apply(Launch.scala:18)
at xsbt.boot.Boot$.runImpl(Boot.scal

Re: How to set UI port #?

2015-01-12 Thread Prannoy
Set the port using

spconf.set("spark.ui.port","");

where,  is any port

spconf is your spark configuration object.

On Sun, Jan 11, 2015 at 2:08 PM, YaoPau [via Apache Spark User List] <
ml-node+s1001560n21083...@n3.nabble.com> wrote:

> I have multiple Spark Streaming jobs running all day, and so when I run my
> hourly batch job, I always get a "java.net.BindException: Address already
> in use" which starts at 4040 then goes to 4041, 4042, 4043 before settling
> at 4044.
>
> That slows down my hourly job, and time is critical.  Is there a way I can
> set it to 4044 by default, or prevent the UI from launching altogether?
>
> Jon
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083p21090.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

GraphX vs GraphLab

2015-01-12 Thread Madabhattula Rajesh Kumar
Hi Team,

Is any one done comparison(pros and cons ) study between GraphX ad GraphLab.

Could you please let me know any links for this comparison.

Regards,
Rajesh