Maintaining order of pair rdd

2016-07-23 Thread janardhan shetty
I have a key,value pair rdd where value is an array of Ints. I need to
maintain the order of the value in order to execute downstream
modifications. How do we maintain the order of values?
Ex:
rdd = (id1,[5,2,3,15],
Id2,[9,4,2,5])

Followup question how do we compare between one element in rdd with all
other elements ?


SaveToCassandra executed when I stop Spark

2016-07-23 Thread Fernando Avalos
Hi Spark guys,

I am getting the information from Streaming and I transform the information:

questionStream.filter(_.getEntity.isEmpty).mapPartitions[Choice](questionHandler(_)).saveToCassandra("test","question"

I am getting the information from Streaming, I do some filtering and I
transform to Case class for saving, it is working fine but the information
is not reflected in Cassandra immediately, it is reflected when stop my
spark application. What is happening here? How can I get the information
reflected in Cassandra immediately?

Regards,

favalos.


Re: How to get the number of partitions for a SparkDataFrame in Spark 2.0-preview?

2016-07-23 Thread Neil Chang
One example for using dapply is to apply linear regression on many small
partitions.
I think red can do that with parallelism too but heard dapply is faster.

On Friday, July 22, 2016, Pedro Rodriguez  wrote:

> I haven't used SparkR/R before, only Scala/Python APIs so I don't know for
> sure.
>
> I am guessing if things are in a DataFrame they were read either from some
> disk source (S3/HDFS/file/etc) or they were created from parallelize. If
> you are using the first, Spark will for the most part choose a reasonable
> number of partitions while for parallelize I think it depends on what your
> min parallelism is set to.
>
> In my brief google it looks like dapply is an analogue of mapPartitions.
> Usually the reason to use this is if your map operation has some expensive
> initialization function. For example, you need to open a connection to a
> database so its better to re-use that connection for one partition's
> elements than create it for each element.
>
> What are you trying to accomplish with dapply?
>
> On Fri, Jul 22, 2016 at 8:05 PM, Neil Chang  > wrote:
>
>> Thanks Pedro,
>>   so to use sparkR dapply on SparkDataFrame, don't we need partition the
>> DataFrame first? the example in doc doesn't seem to do this.
>> Without knowing how it partitioned, how can one write the function to
>> process each partition?
>>
>> Neil
>>
>> On Fri, Jul 22, 2016 at 5:56 PM, Pedro Rodriguez > > wrote:
>>
>>> This should work and I don't think triggers any actions:
>>>
>>> df.rdd.partitions.length
>>>
>>> On Fri, Jul 22, 2016 at 2:20 PM, Neil Chang >> > wrote:
>>>
 Seems no function does this in Spark 2.0 preview?

>>>
>>>
>>>
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com
>>>  |
>>> pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com
>  |
> pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: Size exceeds Integer.MAX_VALUE

2016-07-23 Thread Andrew Ehrlich
It may be this issue: https://issues.apache.org/jira/browse/SPARK-6235 
 which limits the size of the 
blocks in the file being written to disk to 2GB.

If so, the solution is for you to try tuning for smaller tasks. Try increasing 
the number of partitions, or using a more space-efficient data structure inside 
the RDD, or increasing the amount of memory available to spark and caching the 
data in memory. Make sure you are using Kryo serialization. 

Andrew

> On Jul 23, 2016, at 9:00 PM, Ascot Moss  wrote:
> 
> 
> Hi,
> 
> Please help!
> 
> My spark: 1.6.2
> Java: java8_u40
> 
> I am trying random forest training, I got " Size exceeds Integer.MAX_VALUE".
> 
> Any idea how to resolve it?
> 
> 
> (the log) 
> 16/07/24 07:59:49 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 25) 
>   
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE  
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
> 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
> 
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)   
>   
> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) 
>   
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)   
>   
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)  
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
>   
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
>   
> at org.apache.spark.scheduler.Task.run(Task.scala:89)   
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   
> at java.lang.Thread.run(Thread.java:745)
> 16/07/24 07:59:49 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 25, 
> localhost): java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE   
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
> 
> at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
> 
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) 
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) 
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)   
>   
> at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) 
>   
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
>   
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)   
>   
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)  
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
>   
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
>   
> at org.apache.spark.scheduler.Task.run(Task.scala:89)   
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   
> at java.lang.Thread.run(Thread.java:745) 
> 
> 
> Regards
> 



Re: How to generate a sequential key in rdd across executors

2016-07-23 Thread Andrew Ehrlich
It’s hard to do in a distributed system. Maybe try generating a meaningful key 
using a timestamp + hashed unique key fields in the record? 

> On Jul 23, 2016, at 7:53 PM, yeshwanth kumar  wrote:
> 
> Hi,
> 
> i am doing bulk load to hbase using spark,
> in which i need to generate a sequential key for each record,
> the key should be sequential across all the executors.
> 
> i tried zipwith index, didn't worked because zipwith index gives index per 
> executor not across all executors.
> 
> looking for some suggestions.
> 
> 
> Thanks,
> -Yeshwanth


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Size exceeds Integer.MAX_VALUE

2016-07-23 Thread Ascot Moss
Hi,

Please help!

My spark: 1.6.2
Java: java8_u40

I am trying random forest training, I got " Size exceeds Integer.MAX_VALUE".

Any idea how to resolve it?


(the log)
16/07/24 07:59:49 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID
25)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)

at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)

at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)
16/07/24 07:59:49 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 25,
localhost): java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)

at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)

at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


Regards


How to generate a sequential key in rdd across executors

2016-07-23 Thread yeshwanth kumar
Hi,

i am doing bulk load to hbase using spark,
in which i need to generate a sequential key for each record,
the key should be sequential across all the executors.

i tried zipwith index, didn't worked because zipwith index gives index per
executor not across all executors.

looking for some suggestions.


Thanks,
-Yeshwanth


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-23 Thread VG
Any suggestions / ideas here ?



On Sun, Jul 24, 2016 at 12:19 AM, VG  wrote:

> Sean,
>
> I did this just to test the model. When I do a split of my data as
> training to 80% and test to be 20%
>
> I get a Root-mean-square error = NaN
>
> So I am wondering where I might be going wrong
>
> Regards,
> VG
>
> On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen  wrote:
>
>> No, that's certainly not to be expected. ALS works by computing a much
>> lower-rank representation of the input. It would not reproduce the
>> input exactly, and you don't want it to -- this would be seriously
>> overfit. This is why in general you don't evaluate a model on the
>> training set.
>>
>> On Sat, Jul 23, 2016 at 7:37 PM, VG  wrote:
>> > I am trying to run ml.ALS to compute some recommendations.
>> >
>> > Just to test I am using the same dataset for training using ALSModel
>> and for
>> > predicting the results based on the model .
>> >
>> > When I evaluate the result using RegressionEvaluator I get a
>> > Root-mean-square error = 1.5544064263236066
>> >
>> > I thin this should be 0. Any suggestions what might be going wrong.
>> >
>> > Regards,
>> > Vipul
>>
>
>


Re: ERROR Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

2016-07-23 Thread Ascot Moss
I tried to add -Xloggc:./jvm_gc.log

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -Xloggc:./jvm_gc.log -XX:+PrintGCDateStamps"

however, I could not find ./jvm_gc.log

How to resolve the OOM and gc log issue?

Regards

On Sun, Jul 24, 2016 at 6:37 AM, Ascot Moss  wrote:

> My JDK is Java 1.8 u40
>
> On Sun, Jul 24, 2016 at 3:45 AM, Ted Yu  wrote:
>
>> Since you specified +PrintGCDetails, you should be able to get some more
>> detail from the GC log.
>>
>> Also, which JDK version are you using ?
>>
>> Please use Java 8 where G1GC is more reliable.
>>
>> On Sat, Jul 23, 2016 at 10:38 AM, Ascot Moss 
>> wrote:
>>
>>> Hi,
>>>
>>> I added the following parameter:
>>>
>>> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
>>> -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
>>> -XX:InitiatingHeapOccupancyPercent=70 -XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps"
>>>
>>> Still got Java heap space error.
>>>
>>> Any idea to resolve?  (my spark is 1.6.1)
>>>
>>>
>>> 16/07/23 23:31:50 WARN TaskSetManager: Lost task 1.0 in stage 6.0 (TID
>>> 22, n1791): java.lang.OutOfMemoryError: Java heap space   at
>>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
>>>
>>> at
>>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:248)
>>>
>>> at
>>> org.apache.spark.util.collection.CompactBuffer.toArray(CompactBuffer.scala:30)
>>>
>>> at
>>> org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findSplits$1(DecisionTree.scala:1009)
>>> at
>>> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>>>
>>> at
>>> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>>>
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>
>>> at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>
>>> at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>
>>> at 
>>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>
>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>
>>> at
>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>
>>> at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>
>>> at
>>> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>>>
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Regards
>>>
>>>
>>>
>>> On Sat, Jul 23, 2016 at 9:49 AM, Ascot Moss 
>>> wrote:
>>>
 Thanks. Trying with extra conf now.

 On Sat, Jul 23, 2016 at 6:59 AM, RK Aduri 
 wrote:

> I can see large number of collections happening on driver and
> eventually, driver is running out of memory. ( am not sure whether you 
> have
> persisted any rdd or data frame). May be you would want to avoid doing so
> many collections or persist unwanted data in memory.
>
> To begin with, you may want to re-run the job with this following
> config: --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” —> and this will give you an
> idea of how you are getting OOM.
>
>
> On Jul 22, 2016, at 3:52 PM, Ascot Moss  wrote:
>
> Hi
>
> Please help!
>
>  When running random forest training phase in cluster mode, I got GC
> overhead 

Re: ERROR Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

2016-07-23 Thread Ascot Moss
My JDK is Java 1.8 u40

On Sun, Jul 24, 2016 at 3:45 AM, Ted Yu  wrote:

> Since you specified +PrintGCDetails, you should be able to get some more
> detail from the GC log.
>
> Also, which JDK version are you using ?
>
> Please use Java 8 where G1GC is more reliable.
>
> On Sat, Jul 23, 2016 at 10:38 AM, Ascot Moss  wrote:
>
>> Hi,
>>
>> I added the following parameter:
>>
>> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
>> -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
>> -XX:InitiatingHeapOccupancyPercent=70 -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps"
>>
>> Still got Java heap space error.
>>
>> Any idea to resolve?  (my spark is 1.6.1)
>>
>>
>> 16/07/23 23:31:50 WARN TaskSetManager: Lost task 1.0 in stage 6.0 (TID
>> 22, n1791): java.lang.OutOfMemoryError: Java heap space   at
>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
>>
>> at
>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)
>>
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:248)
>>
>> at
>> org.apache.spark.util.collection.CompactBuffer.toArray(CompactBuffer.scala:30)
>>
>> at
>> org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findSplits$1(DecisionTree.scala:1009)
>> at
>> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>>
>> at
>> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>>
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>
>> at 
>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>
>> at
>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>>
>> at
>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>>
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Regards
>>
>>
>>
>> On Sat, Jul 23, 2016 at 9:49 AM, Ascot Moss  wrote:
>>
>>> Thanks. Trying with extra conf now.
>>>
>>> On Sat, Jul 23, 2016 at 6:59 AM, RK Aduri 
>>> wrote:
>>>
 I can see large number of collections happening on driver and
 eventually, driver is running out of memory. ( am not sure whether you have
 persisted any rdd or data frame). May be you would want to avoid doing so
 many collections or persist unwanted data in memory.

 To begin with, you may want to re-run the job with this following
 config: --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” —> and this will give you an
 idea of how you are getting OOM.


 On Jul 22, 2016, at 3:52 PM, Ascot Moss  wrote:

 Hi

 Please help!

  When running random forest training phase in cluster mode, I got GC
 overhead limit exceeded.

 I have used two parameters when submitting the job to cluster

 --driver-memory 64g \

 --executor-memory 8g \

 My Current settings:

 (spark-defaults.conf)

 spark.executor.memory   8g

 (spark-env.sh)

 export SPARK_WORKER_MEMORY=8g

 export HADOOP_HEAPSIZE=8000


 Any idea how to resolve it?

 Regards






 ###  (the erro log) ###

 16/07/23 

Re: ERROR Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

2016-07-23 Thread Ted Yu
Since you specified +PrintGCDetails, you should be able to get some more
detail from the GC log.

Also, which JDK version are you using ?

Please use Java 8 where G1GC is more reliable.

On Sat, Jul 23, 2016 at 10:38 AM, Ascot Moss  wrote:

> Hi,
>
> I added the following parameter:
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
> -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
> -XX:InitiatingHeapOccupancyPercent=70 -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps"
>
> Still got Java heap space error.
>
> Any idea to resolve?  (my spark is 1.6.1)
>
>
> 16/07/23 23:31:50 WARN TaskSetManager: Lost task 1.0 in stage 6.0 (TID 22,
> n1791): java.lang.OutOfMemoryError: Java heap space   at
> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
>
> at
> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)
>
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:248)
>
> at
> org.apache.spark.util.collection.CompactBuffer.toArray(CompactBuffer.scala:30)
>
> at
> org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findSplits$1(DecisionTree.scala:1009)
> at
> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>
> at
> org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Regards
>
>
>
> On Sat, Jul 23, 2016 at 9:49 AM, Ascot Moss  wrote:
>
>> Thanks. Trying with extra conf now.
>>
>> On Sat, Jul 23, 2016 at 6:59 AM, RK Aduri 
>> wrote:
>>
>>> I can see large number of collections happening on driver and
>>> eventually, driver is running out of memory. ( am not sure whether you have
>>> persisted any rdd or data frame). May be you would want to avoid doing so
>>> many collections or persist unwanted data in memory.
>>>
>>> To begin with, you may want to re-run the job with this following
>>> config: --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps” —> and this will give you an
>>> idea of how you are getting OOM.
>>>
>>>
>>> On Jul 22, 2016, at 3:52 PM, Ascot Moss  wrote:
>>>
>>> Hi
>>>
>>> Please help!
>>>
>>>  When running random forest training phase in cluster mode, I got GC
>>> overhead limit exceeded.
>>>
>>> I have used two parameters when submitting the job to cluster
>>>
>>> --driver-memory 64g \
>>>
>>> --executor-memory 8g \
>>>
>>> My Current settings:
>>>
>>> (spark-defaults.conf)
>>>
>>> spark.executor.memory   8g
>>>
>>> (spark-env.sh)
>>>
>>> export SPARK_WORKER_MEMORY=8g
>>>
>>> export HADOOP_HEAPSIZE=8000
>>>
>>>
>>> Any idea how to resolve it?
>>>
>>> Regards
>>>
>>>
>>>
>>>
>>>
>>>
>>> ###  (the erro log) ###
>>>
>>> 16/07/23 04:34:04 WARN TaskSetManager: Lost task 2.0 in stage 6.1 (TID
>>> 30, n1794): java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> at
>>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
>>>
>>> at
>>> 

Re: spark and plot data

2016-07-23 Thread Gourav Sengupta
And we are all smiling: https://github.com/bokeh/bokeh-scala


Something that helped me immensely, particularly the example.
https://github.com/bokeh/bokeh-scala/issues/24

Please note that I use Toree as the Jupyter kernel.

Regards,
Gourav Sengupta

On Sat, Jul 23, 2016 at 8:01 PM, Andrew Ehrlich  wrote:

> @Gourav, did you find any good inline plotting tools when using the Scala
> kernel? I found one based on highcharts but it was not frictionless the way
> matplotlib is.
>
> On Jul 23, 2016, at 2:26 AM, Gourav Sengupta 
> wrote:
>
> Hi Pedro,
>
> Toree is Scala kernel for Jupyter in case anyone needs a short intro. I
> use it regularly (when I am not using IntelliJ) and its quite good.
>
> Regards,
> Gourav
>
> On Fri, Jul 22, 2016 at 11:15 PM, Pedro Rodriguez  > wrote:
>
>> As of the most recent 0.6.0 release its partially alleviated, but still
>> not great (compared to something like Jupyter).
>>
>> They can be "downloaded" but its only really meaningful in importing it
>> back to Zeppelin. It would be great if they could be exported as HTML or
>> PDF, but at present they can't be. I know they have some sort of git
>> support, but it was never clear to me how it was suppose to be used since
>> the docs are sparse on that. So far what works best for us is S3 storage,
>> but you don't get the benefit of Github using that (history + commits etc).
>>
>> There are a couple other notebooks floating around, Apache Toree seems
>> the most promising for portability since its based on jupyter
>> https://github.com/apache/incubator-toree
>>
>> On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> The biggest stumbling block to using Zeppelin has been that we cannot
>>> download the notebooks, cannot export them and certainly cannot sync them
>>> back to Github, without mind numbing and sometimes irritating hacks. Have
>>> those issues been resolved?
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
 Zeppelin works great. The other thing that we have done in notebooks
 (like Zeppelin or Databricks) which support multiple types of spark session
 is register Spark SQL temp tables in our scala code then escape hatch to
 python for plotting with seaborn/matplotlib when the built in plots are
 insufficient.

 —
 Pedro Rodriguez
 PhD Student in Large-Scale Machine Learning | CU Boulder
 Systems Oriented Data Scientist
 UC Berkeley AMPLab Alumni

 pedrorodriguez.io | 909-353-4423
 github.com/EntilZha | LinkedIn
 

 On July 22, 2016 at 3:04:48 AM, Marco Colombo (
 ing.marco.colo...@gmail.com) wrote:

 Take a look at zeppelin

 http://zeppelin.apache.org

 Il giovedì 21 luglio 2016, Andy Davidson 
 ha scritto:

> Hi Pseudo
>
> Plotting, graphing, data visualization, report generation are common
> needs in scientific and enterprise computing.
>
> Can you tell me more about your use case? What is it about the current
> process / workflow do you think could be improved by pushing plotting (I
> assume you mean plotting and graphing) into spark.
>
>
> In my personal work all the graphing is done in the driver on summary
> stats calculated using spark. So for me using standard python libs has not
> been a problem.
>
> Andy
>
> From: pseudo oduesp 
> Date: Thursday, July 21, 2016 at 8:30 AM
> To: "user @spark" 
> Subject: spark and plot data
>
> Hi ,
> i know spark  it s engine  to compute large data set but for me i work
> with pyspark and it s very wonderful machine
>
> my question  we  don't have tools for ploting data each time we have
> to switch and go back to python for using plot.
> but when you have large result scatter plot or roc curve  you cant use
> collect to take data .
>
> somone have propostion for plot .
>
> thanks
>
>

 --
 Ing. Marco Colombo


>>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>


Re: spark and plot data

2016-07-23 Thread Andrew Ehrlich
@Gourav, did you find any good inline plotting tools when using the Scala 
kernel? I found one based on highcharts but it was not frictionless the way 
matplotlib is.

> On Jul 23, 2016, at 2:26 AM, Gourav Sengupta  
> wrote:
> 
> Hi Pedro,
> 
> Toree is Scala kernel for Jupyter in case anyone needs a short intro. I use 
> it regularly (when I am not using IntelliJ) and its quite good.
> 
> Regards,
> Gourav
> 
> On Fri, Jul 22, 2016 at 11:15 PM, Pedro Rodriguez  > wrote:
> As of the most recent 0.6.0 release its partially alleviated, but still not 
> great (compared to something like Jupyter).
> 
> They can be "downloaded" but its only really meaningful in importing it back 
> to Zeppelin. It would be great if they could be exported as HTML or PDF, but 
> at present they can't be. I know they have some sort of git support, but it 
> was never clear to me how it was suppose to be used since the docs are sparse 
> on that. So far what works best for us is S3 storage, but you don't get the 
> benefit of Github using that (history + commits etc).
> 
> There are a couple other notebooks floating around, Apache Toree seems the 
> most promising for portability since its based on jupyter 
> https://github.com/apache/incubator-toree 
> 
> 
> On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta  > wrote:
> The biggest stumbling block to using Zeppelin has been that we cannot 
> download the notebooks, cannot export them and certainly cannot sync them 
> back to Github, without mind numbing and sometimes irritating hacks. Have 
> those issues been resolved?
> 
> 
> Regards,
> Gourav  
> 
> 
> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez  > wrote:
> Zeppelin works great. The other thing that we have done in notebooks (like 
> Zeppelin or Databricks) which support multiple types of spark session is 
> register Spark SQL temp tables in our scala code then escape hatch to python 
> for plotting with seaborn/matplotlib when the built in plots are insufficient.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423 
> 
> github.com/EntilZha  | LinkedIn 
> 
> 
> On July 22, 2016 at 3:04:48 AM, Marco Colombo (ing.marco.colo...@gmail.com 
> ) wrote:
> 
>> Take a look at zeppelin
>> 
>> http://zeppelin.apache.org 
>> 
>> Il giovedì 21 luglio 2016, Andy Davidson > > ha scritto:
>> Hi Pseudo
>> 
>> Plotting, graphing, data visualization, report generation are common needs 
>> in scientific and enterprise computing.
>> 
>> Can you tell me more about your use case? What is it about the current 
>> process / workflow do you think could be improved by pushing plotting (I 
>> assume you mean plotting and graphing) into spark.
>> 
>> 
>> In my personal work all the graphing is done in the driver on summary stats 
>> calculated using spark. So for me using standard python libs has not been a 
>> problem.
>> 
>> Andy
>> 
>> From: pseudo oduesp >
>> Date: Thursday, July 21, 2016 at 8:30 AM
>> To: "user @spark" >
>> Subject: spark and plot data
>> 
>> Hi , 
>> i know spark  it s engine  to compute large data set but for me i work with 
>> pyspark and it s very wonderful machine 
>> 
>> my question  we  don't have tools for ploting data each time we have to 
>> switch and go back to python for using plot.
>> but when you have large result scatter plot or roc curve  you cant use 
>> collect to take data .
>> 
>> somone have propostion for plot .
>> 
>> thanks 
>> 
>> 
>> --
>> Ing. Marco Colombo
> 
> 
> 
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodrig...@gmail.com  | pedrorodriguez.io 
>  | 909-353-4423
> Github: github.com/EntilZha  | LinkedIn: 
> https://www.linkedin.com/in/pedrorodriguezscience 
> 
> 
> 



Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Andrew Ehrlich
+1 for the misleading error. Messages about failing to connect often mean that 
an executor has died. If so, dig into the executor logs and find out why the 
executor died (out of memory, perhaps). 

Andrew

> On Jul 23, 2016, at 11:39 AM, VG  wrote:
> 
> Hi Pedro,
> 
> Based on your suggestion, I deployed this on a aws node and it worked fine. 
> thanks for your advice. 
> 
> I am still trying to figure out the issues on the local environment
> Anyways thanks again
> 
> -VG
> 
> On Sat, Jul 23, 2016 at 9:26 PM, Pedro Rodriguez  > wrote:
> Have you changed spark-env.sh or spark-defaults.conf from the default? It 
> looks like spark is trying to address local workers based on a network 
> address (eg 192.168……) instead of on localhost (localhost, 127.0.0.1, 
> 0.0.0.0,…). Additionally, that network address doesn’t resolve correctly. You 
> might also check /etc/hosts to make sure that you don’t have anything weird 
> going on.
> 
> Last thing to try perhaps is that are you running Spark within a VM and/or 
> Docker? If networking isn’t setup correctly on those you may also run into 
> trouble.
> 
> What would be helpful is to know everything about your setup that might 
> affect networking.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423 
> 
> github.com/EntilZha  | LinkedIn 
> 
> On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com 
> ) wrote:
> 
>> Hi pedro,
>> 
>> Apologies for not adding this earlier. 
>> 
>> This is running on a local cluster set up as follows.
>> JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");
>> 
>> Any suggestions based on this ? 
>> 
>> The ports are not blocked by firewall. 
>> 
>> Regards,
>> 
>> 
>> 
>> On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez > > wrote:
>> Make sure that you don’t have ports firewalled. You don’t really give much 
>> information to work from, but it looks like the master can’t access the 
>> worker nodes for some reason. If you give more information on the cluster, 
>> networking, etc, it would help.
>> 
>> For example, on AWS you can create a security group which allows all traffic 
>> to/from itself to itself. If you are using something like ufw on ubuntu then 
>> you probably need to know the ip addresses of the worker nodes beforehand.
>> 
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>> 
>> pedrorodriguez.io  | 909-353-4423 
>> 
>> github.com/EntilZha  | LinkedIn 
>> 
>> On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com 
>> ) wrote:
>> 
>>> Please suggest if I am doing something wrong or an alternative way of doing 
>>> this. 
>>> 
>>> I have an RDD with two values as follows 
>>> JavaPairRDD rdd
>>> 
>>> When I execute   rdd..collectAsMap()
>>> it always fails with IO exceptions.   
>>> 
>>> 
>>> 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning 
>>> fetch of 1 outstanding blocks 
>>> java.io.IOException: Failed to connect to /192.168.1.3:58179 
>>> 
>>> at 
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>>> at 
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
>>> at 
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
>>> at 
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> at 
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>> at 
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
>>> at 
>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
>>> at 
>>> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
>>> at 
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
>>> at 

Re: How to give name to Spark jobs shown in Spark UI

2016-07-23 Thread Andrew Ehrlich
As far as I know, the best you can do is refer to the Actions by line number.

> On Jul 23, 2016, at 8:47 AM, unk1102  wrote:
> 
> Hi I have multiple child spark jobs run at a time. Is there any way to name
> these child spark jobs so I can identify slow running ones. For e. g.
> xyz_saveAsTextFile(),  abc_saveAsTextFile() etc please guide. Thanks in
> advance. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-give-name-to-Spark-jobs-shown-in-Spark-UI-tp27400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-23 Thread VG
Sean,

I did this just to test the model. When I do a split of my data as training
to 80% and test to be 20%

I get a Root-mean-square error = NaN

So I am wondering where I might be going wrong

Regards,
VG

On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen  wrote:

> No, that's certainly not to be expected. ALS works by computing a much
> lower-rank representation of the input. It would not reproduce the
> input exactly, and you don't want it to -- this would be seriously
> overfit. This is why in general you don't evaluate a model on the
> training set.
>
> On Sat, Jul 23, 2016 at 7:37 PM, VG  wrote:
> > I am trying to run ml.ALS to compute some recommendations.
> >
> > Just to test I am using the same dataset for training using ALSModel and
> for
> > predicting the results based on the model .
> >
> > When I evaluate the result using RegressionEvaluator I get a
> > Root-mean-square error = 1.5544064263236066
> >
> > I thin this should be 0. Any suggestions what might be going wrong.
> >
> > Regards,
> > Vipul
>


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-23 Thread Sean Owen
No, that's certainly not to be expected. ALS works by computing a much
lower-rank representation of the input. It would not reproduce the
input exactly, and you don't want it to -- this would be seriously
overfit. This is why in general you don't evaluate a model on the
training set.

On Sat, Jul 23, 2016 at 7:37 PM, VG  wrote:
> I am trying to run ml.ALS to compute some recommendations.
>
> Just to test I am using the same dataset for training using ALSModel and for
> predicting the results based on the model .
>
> When I evaluate the result using RegressionEvaluator I get a
> Root-mean-square error = 1.5544064263236066
>
> I thin this should be 0. Any suggestions what might be going wrong.
>
> Regards,
> Vipul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread VG
Hi Pedro,

Based on your suggestion, I deployed this on a aws node and it worked fine.
thanks for your advice.

I am still trying to figure out the issues on the local environment
Anyways thanks again

-VG

On Sat, Jul 23, 2016 at 9:26 PM, Pedro Rodriguez 
wrote:

> Have you changed spark-env.sh or spark-defaults.conf from the default? It
> looks like spark is trying to address local workers based on a network
> address (eg 192.168……) instead of on localhost (localhost, 127.0.0.1,
> 0.0.0.0,…). Additionally, that network address doesn’t resolve correctly.
> You might also check /etc/hosts to make sure that you don’t have anything
> weird going on.
>
> Last thing to try perhaps is that are you running Spark within a VM and/or
> Docker? If networking isn’t setup correctly on those you may also run into
> trouble.
>
> What would be helpful is to know everything about your setup that might
> affect networking.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote:
>
> Hi pedro,
>
> Apologies for not adding this earlier.
>
> This is running on a local cluster set up as follows.
> JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");
>
> Any suggestions based on this ?
>
> The ports are not blocked by firewall.
>
> Regards,
>
>
>
> On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez 
> wrote:
>
>> Make sure that you don’t have ports firewalled. You don’t really give
>> much information to work from, but it looks like the master can’t access
>> the worker nodes for some reason. If you give more information on the
>> cluster, networking, etc, it would help.
>>
>> For example, on AWS you can create a security group which allows all
>> traffic to/from itself to itself. If you are using something like ufw on
>> ubuntu then you probably need to know the ip addresses of the worker nodes
>> beforehand.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> 
>>
>> On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:
>>
>> Please suggest if I am doing something wrong or an alternative way of
>> doing this.
>>
>> I have an RDD with two values as follows
>> JavaPairRDD rdd
>>
>> When I execute   rdd..collectAsMap()
>> it always fails with IO exceptions.
>>
>>
>> 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning
>> fetch of 1 outstanding blocks
>> java.io.IOException: Failed to connect to /192.168.1.3:58179
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
>> at
>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
>> at
>> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
>> at
>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
>> at
>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>> at
>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
>> at
>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: java.net.ConnectException: Connection timed out: no further
>> information: /192.168.1.3:58179
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
>> at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>> at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>> at
>> 

Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-23 Thread VG
I am trying to run ml.ALS to compute some recommendations.

Just to test I am using the same dataset for training using ALSModel and
for predicting the results based on the model .

When I evaluate the result using RegressionEvaluator I get a
Root-mean-square error = 1.5544064263236066

I thin this should be 0. Any suggestions what might be going wrong.

Regards,
Vipul


Re: ERROR Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

2016-07-23 Thread Ascot Moss
Hi,

I added the following parameter:

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
-XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
-XX:InitiatingHeapOccupancyPercent=70 -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps"

Still got Java heap space error.

Any idea to resolve?  (my spark is 1.6.1)


16/07/23 23:31:50 WARN TaskSetManager: Lost task 1.0 in stage 6.0 (TID 22,
n1791): java.lang.OutOfMemoryError: Java heap space   at
scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)

at
scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)

at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:248)

at
org.apache.spark.util.collection.CompactBuffer.toArray(CompactBuffer.scala:30)

at
org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$findSplits$1(DecisionTree.scala:1009)
at
org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)

at
org.apache.spark.mllib.tree.DecisionTree$$anonfun$29.apply(DecisionTree.scala:1042)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

at scala.collection.AbstractIterator.to(Iterator.scala:1157)

at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Regards



On Sat, Jul 23, 2016 at 9:49 AM, Ascot Moss  wrote:

> Thanks. Trying with extra conf now.
>
> On Sat, Jul 23, 2016 at 6:59 AM, RK Aduri  wrote:
>
>> I can see large number of collections happening on driver and eventually,
>> driver is running out of memory. ( am not sure whether you have persisted
>> any rdd or data frame). May be you would want to avoid doing so many
>> collections or persist unwanted data in memory.
>>
>> To begin with, you may want to re-run the job with this following config: 
>> --conf
>> "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps” —> and this will give you an idea of how you are
>> getting OOM.
>>
>>
>> On Jul 22, 2016, at 3:52 PM, Ascot Moss  wrote:
>>
>> Hi
>>
>> Please help!
>>
>>  When running random forest training phase in cluster mode, I got GC
>> overhead limit exceeded.
>>
>> I have used two parameters when submitting the job to cluster
>>
>> --driver-memory 64g \
>>
>> --executor-memory 8g \
>>
>> My Current settings:
>>
>> (spark-defaults.conf)
>>
>> spark.executor.memory   8g
>>
>> (spark-env.sh)
>>
>> export SPARK_WORKER_MEMORY=8g
>>
>> export HADOOP_HEAPSIZE=8000
>>
>>
>> Any idea how to resolve it?
>>
>> Regards
>>
>>
>>
>>
>>
>>
>> ###  (the erro log) ###
>>
>> 16/07/23 04:34:04 WARN TaskSetManager: Lost task 2.0 in stage 6.1 (TID
>> 30, n1794): java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> at
>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
>>
>> at
>> scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)
>>
>> at
>> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:144)
>>
>> at
>> org.apache.spark.util.collection.CompactBuffer.$plus$plus$eq(CompactBuffer.scala:90)
>>
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1$$anonfun$10.apply(PairRDDFunctions.scala:505)
>>
>> at
>> 

SaveToCassandra executed when I stop Spark

2016-07-23 Thread Fernando Avalos
> Hi Spark guys,
>
> I am getting the information from Streaming and I transform the
information:
>
>
questionStream.filter(_.getEntity.isEmpty).mapPartitions[Choice](questionHandler(_)).saveToCassandra("test","question"
>
> I am getting the information from Streaming, I do some filtering and I
transform to Case class for saving, it is working fine but the information
is not reflected in Cassandra immediately, it is reflected when stop my
spark application. What is happening here? How can I get the information
reflected in Cassandra immediately?
>
> Regards,
>
> favalos.


Re: Choosing RDD/DataFrame/DataSet and Cluster Tuning

2016-07-23 Thread Pedro Rodriguez
Hi Jestin,

Spark is smart about how it does joins. In this case, if df2 is sufficiently 
small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a 
join, it broadcasts df2 to all workers and joins locally. Looks like you may 
already have known that though based on using the 
spark.sql.autoBroadcastJoinThreshold.

Its hard to say why your job is slow without knowing more. For example, it 
could be a CPU intensive calculation or maybe you have imbalance over keys 
which would cause a straggler. Hard to know without knowing what some of the 
metrics from the Spark UI are like.

1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API 
and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last 
night so the official release will probably go out early next week
2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is 
specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because 
reduceByKey will run reduce locally at each node for each key, then reduce all 
those results to get the final result. groupByKey will shuffle all keys across 
the network which if you are just doing a reduce right after is wasteful. 
DataFrame’s have lots of optimizations as well
3. Shouldn’t need to explicitly call broadcast
4. Driver memory is important if your node needs to collect results back to it 
for some reason. One good example is in mllib/ml its common to collect 
parameters back to the driver to update a global model. For some algorithms 
(like LDA), the model can be quite large so it requires high driver memory.
5. Hard to know without more metrics from your job. That being said, your 
number of executor instances vs number of cores seems a bit high. I would try 5 
instances of 15 cores each or 10 of 7 cores each. You can also kick up the 
memory to use more of your cluster’s memory. Lastly, if you are running on EC2 
make sure to configure spark.local.dir to write to something that is not an EBS 
volume, preferably an attached SSD to something like an r3 machine.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote:

Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one 
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM 
each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and 
join, which seems very slow to me.

Currently I have set the following in my spark-defaults.conf:

spark.executor.instances                             24
spark.executor.memory                               10g
spark.executor.cores                                    3
spark.driver.memory                                     5g
spark.sql.autoBroadcastJoinThreshold        200Mb


I have a couple of questions regarding tuning for performance as a beginner.
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even 
DataFrames) be better?
What if I used RDDs instead? I know that reduceByKey is better than groupByKey, 
and DataFrames don't have that method. 
I think I can do a broadcast join and have set a threshold. Do I need to set it 
above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
What's the point of driver memory?
Can anyone point out something wrong with my tuning numbers, or any additional 
parameters worth checking out?

Thank you a lot!
Sincerely,
Jestin

Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Marco Mistroni
Hi vg I believe the error msg is misleading. I had a similar one with
pyspark yesterday after calling a count on a data frame, where the real
error was with an incorrect user defined function being applied .
Pls send me some sample code with a trimmed down version of the data and I
see if i can reproduce
Kr

On 23 Jul 2016 4:57 pm, "Pedro Rodriguez"  wrote:

Have you changed spark-env.sh or spark-defaults.conf from the default? It
looks like spark is trying to address local workers based on a network
address (eg 192.168……) instead of on localhost (localhost, 127.0.0.1,
0.0.0.0,…). Additionally, that network address doesn’t resolve correctly.
You might also check /etc/hosts to make sure that you don’t have anything
weird going on.

Last thing to try perhaps is that are you running Spark within a VM and/or
Docker? If networking isn’t setup correctly on those you may also run into
trouble.

What would be helpful is to know everything about your setup that might
affect networking.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn


On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote:

Hi pedro,

Apologies for not adding this earlier.

This is running on a local cluster set up as follows.
JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");

Any suggestions based on this ?

The ports are not blocked by firewall.

Regards,



On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez 
wrote:

> Make sure that you don’t have ports firewalled. You don’t really give much
> information to work from, but it looks like the master can’t access the
> worker nodes for some reason. If you give more information on the cluster,
> networking, etc, it would help.
>
> For example, on AWS you can create a security group which allows all
> traffic to/from itself to itself. If you are using something like ufw on
> ubuntu then you probably need to know the ip addresses of the worker nodes
> beforehand.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:
>
> Please suggest if I am doing something wrong or an alternative way of
> doing this.
>
> I have an RDD with two values as follows
> JavaPairRDD rdd
>
> When I execute   rdd..collectAsMap()
> it always fails with IO exceptions.
>
>
> 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to /192.168.1.3:58179
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
> at
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.net.ConnectException: Connection timed out: no further
> information: /192.168.1.3:58179
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> 

Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Have you changed spark-env.sh or spark-defaults.conf from the default? It looks 
like spark is trying to address local workers based on a network address (eg 
192.168……) instead of on localhost (localhost, 127.0.0.1, 0.0.0.0,…). 
Additionally, that network address doesn’t resolve correctly. You might also 
check /etc/hosts to make sure that you don’t have anything weird going on.

Last thing to try perhaps is that are you running Spark within a VM and/or 
Docker? If networking isn’t setup correctly on those you may also run into 
trouble.

What would be helpful is to know everything about your setup that might affect 
networking.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:10:31 AM, VG (vlin...@gmail.com) wrote:

Hi pedro,

Apologies for not adding this earlier. 

This is running on a local cluster set up as follows.
JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");

Any suggestions based on this ? 

The ports are not blocked by firewall. 

Regards,



On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez  
wrote:
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms






How to give name to Spark jobs shown in Spark UI

2016-07-23 Thread unk1102
Hi I have multiple child spark jobs run at a time. Is there any way to name
these child spark jobs so I can identify slow running ones. For e. g.
xyz_saveAsTextFile(),  abc_saveAsTextFile() etc please guide. Thanks in
advance. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-give-name-to-Spark-jobs-shown-in-Spark-UI-tp27400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Choosing RDD/DataFrame/DataSet and Cluster Tuning

2016-07-23 Thread Jestin Ma
Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB
RAM each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count,
and join, which seems very slow to me.

Currently I have set the following in my *spark-defaults.conf*:

spark.executor.instances 24
spark.executor.memory   10g
spark.executor.cores3
spark.driver.memory 5g
spark.sql.autoBroadcastJoinThreshold200Mb


I have a couple of questions regarding tuning for performance as a beginner.

   1. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet
   (or even DataFrames) be better?
   2. What if I used RDDs instead? I know that reduceByKey is better than
   groupByKey, and DataFrames don't have that method.
   3. I think I can do a broadcast join and have set a threshold. Do I need
   to set it above my second DataFrame size? Do I need to explicitly call
   broadcast(df2)?
   4. What's the point of driver memory?
   5. Can anyone point out something wrong with my tuning numbers, or any
   additional parameters worth checking out?


Thank you a lot!
Sincerely,
Jestin


Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread VG
Hi pedro,

Apologies for not adding this earlier.

This is running on a local cluster set up as follows.
JavaSparkContext jsc = new JavaSparkContext("local[2]", "DR");

Any suggestions based on this ?

The ports are not blocked by firewall.

Regards,



On Sat, Jul 23, 2016 at 8:35 PM, Pedro Rodriguez 
wrote:

> Make sure that you don’t have ports firewalled. You don’t really give much
> information to work from, but it looks like the master can’t access the
> worker nodes for some reason. If you give more information on the cluster,
> networking, etc, it would help.
>
> For example, on AWS you can create a security group which allows all
> traffic to/from itself to itself. If you are using something like ufw on
> ubuntu then you probably need to know the ip addresses of the worker nodes
> beforehand.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:
>
> Please suggest if I am doing something wrong or an alternative way of
> doing this.
>
> I have an RDD with two values as follows
> JavaPairRDD rdd
>
> When I execute   rdd..collectAsMap()
> it always fails with IO exceptions.
>
>
> 16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to /192.168.1.3:58179
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
> at
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.net.ConnectException: Connection timed out: no further
> information: /192.168.1.3:58179
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
> 16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1
> outstanding blocks after 5000 ms
>
>
>
>


Re: Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread Pedro Rodriguez
Make sure that you don’t have ports firewalled. You don’t really give much 
information to work from, but it looks like the master can’t access the worker 
nodes for some reason. If you give more information on the cluster, networking, 
etc, it would help.

For example, on AWS you can create a security group which allows all traffic 
to/from itself to itself. If you are using something like ufw on ubuntu then 
you probably need to know the ip addresses of the worker nodes beforehand.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 7:38:01 AM, VG (vlin...@gmail.com) wrote:

Please suggest if I am doing something wrong or an alternative way of doing 
this. 

I have an RDD with two values as follows 
JavaPairRDD rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.   


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 1 outstanding blocks 
java.io.IOException: Failed to connect to /192.168.1.3:58179
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further 
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms





Re: Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Sun Rui
You should use :
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val df = spark.read.parquet(fileName)

implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

val df1 = df.flatMap { x => List(x) }
> On Jul 23, 2016, at 22:01, Julien Nauroy  wrote:
> 
> Thanks for your quick reply.
> 
> I've tried with this encoder:
> implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = 
> org.apache.spark.sql.Encoders.kryo[Row]
> Using a suggestion from 
> http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6
>  
> 
> 
> How did you setup your encoder?
> 
> 
> De: "Sun Rui" 
> À: "Julien Nauroy" 
> Cc: user@spark.apache.org
> Envoyé: Samedi 23 Juillet 2016 15:55:21
> Objet: Re: Using flatMap on Dataframes with Spark 2.0
> 
> I did a try. the schema after flatMap is the same, which is expected.
> 
> What’s your Row encoder?
> On Jul 23, 2016, at 20:36, Julien Nauroy  > wrote:
> 
> Hi,
> 
> I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5).
> The code is the following:
> var data = spark.read.parquet(fileName).flatMap(x => List(x))
> 
> Of course it's an overly simplified example, but the result is the same.
> The dataframe schema goes from this:
> root
> |-- field1: double (nullable = true)
> |-- field2: integer (nullable = true)
> (etc)
> 
> to this:
> root
> |-- value: binary (nullable = true)
> 
> Plus I have to provide an encoder for Row.
> I expect to get the same schema after calling flatMap.
> Any idea what I could be doing wrong?
> 
> 
> Best regards,
> Julien
> 
> 
> 
> 



Re: Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Julien Nauroy
Thanks for your quick reply. 

I've tried with this encoder: 


implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = 
org.apache.spark.sql.Encoders.kryo[Row] 

Using a suggestion from 
http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6
 




How did you setup your encoder? 


- Mail original -

De: "Sun Rui"  
À: "Julien Nauroy"  
Cc: user@spark.apache.org 
Envoyé: Samedi 23 Juillet 2016 15:55:21 
Objet: Re: Using flatMap on Dataframes with Spark 2.0 

I did a try. the schema after flatMap is the same, which is expected. 

What’s your Row encoder? 



On Jul 23, 2016, at 20:36, Julien Nauroy < julien.nau...@u-psud.fr > wrote: 

Hi, 

I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). 
The code is the following: 
var data = spark.read.parquet(fileName).flatMap(x => List(x)) 

Of course it's an overly simplified example, but the result is the same. 
The dataframe schema goes from this: 
root 
|-- field1: double (nullable = true) 
|-- field2: integer (nullable = true) 
(etc) 

to this: 
root 
|-- value: binary (nullable = true) 

Plus I have to provide an encoder for Row. 
I expect to get the same schema after calling flatMap. 
Any idea what I could be doing wrong? 


Best regards, 
Julien 









Re: Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Sun Rui
I did a try. the schema after flatMap is the same, which is expected.

What’s your Row encoder?
> On Jul 23, 2016, at 20:36, Julien Nauroy  wrote:
> 
> Hi,
> 
> I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5).
> The code is the following:
> var data = spark.read.parquet(fileName).flatMap(x => List(x))
> 
> Of course it's an overly simplified example, but the result is the same.
> The dataframe schema goes from this:
> root
> |-- field1: double (nullable = true)
> |-- field2: integer (nullable = true)
> (etc)
> 
> to this:
> root
> |-- value: binary (nullable = true)
> 
> Plus I have to provide an encoder for Row.
> I expect to get the same schema after calling flatMap.
> Any idea what I could be doing wrong?
> 
> 
> Best regards,
> Julien
> 
> 



Error in collecting RDD as a Map - IOException in collectAsMap

2016-07-23 Thread VG
Please suggest if I am doing something wrong or an alternative way of doing
this.

I have an RDD with two values as follows
JavaPairRDD rdd

When I execute   rdd..collectAsMap()
it always fails with IO exceptions.


16/07/23 19:03:58 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /192.168.1.3:58179
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further
information: /192.168.1.3:58179
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/07/23 19:03:58 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1
outstanding blocks after 5000 ms


spark context stop vs close

2016-07-23 Thread Mail.com
Hi All,

Where should we us spark context stop vs close. Should we stop the context 
first and then close.

Are general guidelines around this. When I stop and later try to close I get 
RPC already closed error.

Thanks,
Pradeep




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Julien Nauroy
Hi, 

I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). 
The code is the following: 


var data = spark.read.parquet(fileName).flatMap(x => List(x)) 


Of course it's an overly simplified example, but the result is the same. 
The dataframe schema goes from this: 


root 
|-- field1: double (nullable = true) 
|-- field2: integer (nullable = true) 
(etc) 




to this: 

root 
|-- value: binary (nullable = true) 

Plus I have to provide an encoder for Row. 
I expect to get the same schema after calling flatMap. 
Any idea what I could be doing wrong? 


Best regards, 
Julien 




Re: Role-based S3 access outside of EMR

2016-07-23 Thread Steve Loughran


Amazon S3 has stronger consistency guarantees than the ASF s3 clients, it uses 
dynamo to do this.

there is some work underway to do something similar atop S3a, S3guard, see 
https://issues.apache.org/jira/browse/HADOOP-13345  .

Regarding IAM support in Spark, The latest version of S3A, which will ship in 
Hadoop 2.8, adds: IAM, temporary credential, direct env var pickup —and the 
ability to add your own.

Regarding getting the relevant binaries into your app, you need a version of 
the hadoop-aws library consistent with the rest of hadoop, and the version of 
the amazon AWS SDKs that hadoop was built against. APIs in the SDK have changed 
and attempting to upgrade the amazon JAR will fail.

There's a PR attached to SPARK-7481 which does the bundling and adds a suite of 
tests...it's designed to work with Hadoop 2.7+ builds. if you are building 
Spark locally, please try it and provide feedback on the PR

finally, don't try an use s3a  on hadoop-2.6...that was really in preview 
state, and it let bugs surface which were fixed in 2.7.

-Steve

ps: More on S3a in Hadoop 2.8. Things will be way better! 
http://slideshare.net/HadoopSummit/hadoop-cloud-storage-object-store-integration-in-production


On 21 Jul 2016, at 17:23, Ewan Leith 
> wrote:

If you use S3A rather than S3N, it supports IAM roles.

I think you can make s3a used for s3:// style URLs so it’s consistent with your 
EMR paths by adding this to your Hadoop config, probably in core-site.xml:

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A
fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A

And make sure the s3a jars are in your classpath

Thanks,
Ewan

From: Everett Anderson [mailto:ever...@nuna.com.INVALID]
Sent: 21 July 2016 17:01
To: Gourav Sengupta 
>
Cc: Teng Qiu >; Andy Davidson 
>; user 
>
Subject: Re: Role-based S3 access outside of EMR

Hey,

FWIW, we are using EMR, actually, in production.

The main case I have for wanting to access S3 with Spark outside of EMR is that 
during development, our developers tend to run EC2 sandbox instances that have 
all the rest of our code and access to some of the input data on S3. It'd be 
nice if S3 access "just worked" on these without storing the access keys in an 
exposed manner.

Teng -- when you say you use EMRFS, does that mean you copied AWS's EMRFS JAR 
from an EMR cluster and are using it outside? My impression is that AWS hasn't 
released the EMRFS implementation as part of the aws-java-sdk, so I'm wary of 
using it. Do you know if it's supported?


On Thu, Jul 21, 2016 at 2:32 AM, Gourav Sengupta 
> wrote:
Hi Teng,
This is totally a flashing news for me, that people cannot use EMR in 
production because its not open sourced, I think that even Werner is not aware 
of such a problem. Is EMRFS opensourced? I am curious to know what does HA 
stand for?
Regards,
Gourav

On Thu, Jul 21, 2016 at 8:37 AM, Teng Qiu 
> wrote:
there are several reasons that AWS users do (can) not use EMR, one
point for us is that security compliance problem, EMR is totally not
open sourced, we can not use it in production system. second is that
EMR do not support HA yet.

but to the original question from @Everett :

-> Credentials and Hadoop Configuration

as you said, best practice should be "rely on machine roles", they
called IAM roles.

we are using EMRFS impl for accessing s3, it supports IAM role-based
access control well. you can take a look here:
https://github.com/zalando/spark/tree/branch-1.6-zalando

or simply use our docker image (Dockerfile on github:
https://github.com/zalando/spark-appliance/tree/master/Dockerfile)

docker run -d --net=host \
   -e START_MASTER="true" \
   -e START_WORKER="true" \
   -e START_WEBAPP="true" \
   -e START_NOTEBOOK="true" \
   
registry.opensource.zalan.do/bi/spark:1.6.2-6


-> SDK and File System Dependencies

as mentioned above, using EMRFS libs solved this problem:
http://docs.aws.amazon.com//ElasticMapReduce/latest/ReleaseGuide/emr-fs.html


2016-07-21 8:37 GMT+02:00 Gourav Sengupta 
>:
> But that would mean you would be accessing data over internet increasing
> data read latency, data transmission failures. Why are you not using EMR?
>
> Regards,
> Gourav
>
> On Thu, Jul 21, 2016 at 1:06 AM, Everett Anderson 
> 

Re: spark and plot data

2016-07-23 Thread andy petrella
Heya,

Might be worth checking the spark-notebook  I
guess, it offers custom and reactive dynamic charts (scatter, line, bar,
pie, graph, radar, parallel, pivot, …) for any kind of data from an
intuitive and easy Scala API (with server side, incl. spark based, sampling
if needed).

There are many charts available natively, you can check this repo
 (specially the
notebook named Why Spark Notebook) and if you’re familiar with docker, you
can even simply do the following (and use spark 2.0)

docker datafellas/scala-for-data-science:1.0-spark2
docker run --rm -it --net=host -m 8g
datafellas/scala-for-data-science:1.0-spark2 bash



For any question, you can poke the community live on our gitter

or from github  of course
HTH
andy

On Sat, Jul 23, 2016 at 11:26 AM Gourav Sengupta 
wrote:

Hi Pedro,
>
> Toree is Scala kernel for Jupyter in case anyone needs a short intro. I
> use it regularly (when I am not using IntelliJ) and its quite good.
>
> Regards,
> Gourav
>
> On Fri, Jul 22, 2016 at 11:15 PM, Pedro Rodriguez  > wrote:
>
>> As of the most recent 0.6.0 release its partially alleviated, but still
>> not great (compared to something like Jupyter).
>>
>> They can be "downloaded" but its only really meaningful in importing it
>> back to Zeppelin. It would be great if they could be exported as HTML or
>> PDF, but at present they can't be. I know they have some sort of git
>> support, but it was never clear to me how it was suppose to be used since
>> the docs are sparse on that. So far what works best for us is S3 storage,
>> but you don't get the benefit of Github using that (history + commits etc).
>>
>> There are a couple other notebooks floating around, Apache Toree seems
>> the most promising for portability since its based on jupyter
>> https://github.com/apache/incubator-toree
>>
>> On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> The biggest stumbling block to using Zeppelin has been that we cannot
>>> download the notebooks, cannot export them and certainly cannot sync them
>>> back to Github, without mind numbing and sometimes irritating hacks. Have
>>> those issues been resolved?
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez <
>>> ski.rodrig...@gmail.com> wrote:
>>>
 Zeppelin works great. The other thing that we have done in notebooks
 (like Zeppelin or Databricks) which support multiple types of spark session
 is register Spark SQL temp tables in our scala code then escape hatch to
 python for plotting with seaborn/matplotlib when the built in plots are
 insufficient.

 —
 Pedro Rodriguez
 PhD Student in Large-Scale Machine Learning | CU Boulder
 Systems Oriented Data Scientist
 UC Berkeley AMPLab Alumni

 pedrorodriguez.io | 909-353-4423
 github.com/EntilZha | LinkedIn
 

 On July 22, 2016 at 3:04:48 AM, Marco Colombo (
 ing.marco.colo...@gmail.com) wrote:

 Take a look at zeppelin

 http://zeppelin.apache.org

 Il giovedì 21 luglio 2016, Andy Davidson 
 ha scritto:

> Hi Pseudo
>
> Plotting, graphing, data visualization, report generation are common
> needs in scientific and enterprise computing.
>
> Can you tell me more about your use case? What is it about the current
> process / workflow do you think could be improved by pushing plotting (I
> assume you mean plotting and graphing) into spark.
>
>
> In my personal work all the graphing is done in the driver on summary
> stats calculated using spark. So for me using standard python libs has not
> been a problem.
>
> Andy
>
> From: pseudo oduesp 
> Date: Thursday, July 21, 2016 at 8:30 AM
> To: "user @spark" 
> Subject: spark and plot data
>
> Hi ,
> i know spark  it s engine  to compute large data set but for me i work
> with pyspark and it s very wonderful machine
>
> my question  we  don't have tools for ploting data each time we have
> to switch and go back to python for using plot.
> but when you have large result scatter plot or roc curve  you cant use
> collect to take data .
>
> somone have propostion for plot .
>
> thanks
>
>

 --
 Ing. Marco Colombo


>>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> 

Re: spark and plot data

2016-07-23 Thread Gourav Sengupta
Hi Pedro,

Toree is Scala kernel for Jupyter in case anyone needs a short intro. I use
it regularly (when I am not using IntelliJ) and its quite good.

Regards,
Gourav

On Fri, Jul 22, 2016 at 11:15 PM, Pedro Rodriguez 
wrote:

> As of the most recent 0.6.0 release its partially alleviated, but still
> not great (compared to something like Jupyter).
>
> They can be "downloaded" but its only really meaningful in importing it
> back to Zeppelin. It would be great if they could be exported as HTML or
> PDF, but at present they can't be. I know they have some sort of git
> support, but it was never clear to me how it was suppose to be used since
> the docs are sparse on that. So far what works best for us is S3 storage,
> but you don't get the benefit of Github using that (history + commits etc).
>
> There are a couple other notebooks floating around, Apache Toree seems the
> most promising for portability since its based on jupyter
> https://github.com/apache/incubator-toree
>
> On Fri, Jul 22, 2016 at 3:53 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> The biggest stumbling block to using Zeppelin has been that we cannot
>> download the notebooks, cannot export them and certainly cannot sync them
>> back to Github, without mind numbing and sometimes irritating hacks. Have
>> those issues been resolved?
>>
>>
>> Regards,
>> Gourav
>>
>>
>> On Fri, Jul 22, 2016 at 2:22 PM, Pedro Rodriguez > > wrote:
>>
>>> Zeppelin works great. The other thing that we have done in notebooks
>>> (like Zeppelin or Databricks) which support multiple types of spark session
>>> is register Spark SQL temp tables in our scala code then escape hatch to
>>> python for plotting with seaborn/matplotlib when the built in plots are
>>> insufficient.
>>>
>>> —
>>> Pedro Rodriguez
>>> PhD Student in Large-Scale Machine Learning | CU Boulder
>>> Systems Oriented Data Scientist
>>> UC Berkeley AMPLab Alumni
>>>
>>> pedrorodriguez.io | 909-353-4423
>>> github.com/EntilZha | LinkedIn
>>> 
>>>
>>> On July 22, 2016 at 3:04:48 AM, Marco Colombo (
>>> ing.marco.colo...@gmail.com) wrote:
>>>
>>> Take a look at zeppelin
>>>
>>> http://zeppelin.apache.org
>>>
>>> Il giovedì 21 luglio 2016, Andy Davidson 
>>> ha scritto:
>>>
 Hi Pseudo

 Plotting, graphing, data visualization, report generation are common
 needs in scientific and enterprise computing.

 Can you tell me more about your use case? What is it about the current
 process / workflow do you think could be improved by pushing plotting (I
 assume you mean plotting and graphing) into spark.


 In my personal work all the graphing is done in the driver on summary
 stats calculated using spark. So for me using standard python libs has not
 been a problem.

 Andy

 From: pseudo oduesp 
 Date: Thursday, July 21, 2016 at 8:30 AM
 To: "user @spark" 
 Subject: spark and plot data

 Hi ,
 i know spark  it s engine  to compute large data set but for me i work
 with pyspark and it s very wonderful machine

 my question  we  don't have tools for ploting data each time we have to
 switch and go back to python for using plot.
 but when you have large result scatter plot or roc curve  you cant use
 collect to take data .

 somone have propostion for plot .

 thanks


>>>
>>> --
>>> Ing. Marco Colombo
>>>
>>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: spark and plot data

2016-07-23 Thread Gourav Sengupta
Hi Taotao,

that is the way its usually used to visualize data from SPARK. But I do see
that people transfer the data to list to feed to Matplot (as in the SPARK
course currently running in EDX).

Please try using blaze and bokeh and you will be in a new world altogether.


Regards,
Gourav

On Sat, Jul 23, 2016 at 2:47 AM, Taotao.Li  wrote:

> hi, pesudo,
>
>   I've posted a blog before spark-dataframe-introduction
>   , and
> for me, I use spark dataframe [ or RDD ] to do the logic calculation on all
> the datasets, and then transform the result into pandas dataframe, and make
> data visualization using pandas dataframe, sometimes you may need
> matplotlib or seaborn.
>
> --
> *___*
> Quant | Engineer | Boy
> *___*
> *blog*:http://litaotao.github.io
> 
> *github*: www.github.com/litaotao
>