are functions deserialized once per task?

2015-10-02 Thread Michael Albert
Greetings!
Is it true that functions, such as those passed to RDD.map(), are deserialized 
once per task?This seems to be the case looking at Executor.scala, but I don't 
really understand the code.
I'm hoping the answer is yes because that makes it easier to write code without 
worrying about thread safety.For example, suppose I have something like 
this:class FooToBarTransformer{   def transform(foo: Foo): Bar = .}
Now I want to do something like this:val rddFoo : RDD[FOO] = val 
transformer = new TransformerrddBar = rddFoo.map( foo => 
transformer.transform(foo))
If the "transformer" object is deserialized once per task, then I do not need 
to worry whether "transform()" is thread safe.If, for example, the 
implementation tried "optimize" matters by caching the deserialization, so that 
one object was sharedby all threads in a single JVM, then presumably one would 
need to worry about the thread safety of transform().
Is my understanding correct?Is this likely to continue to be true in future 
releases?Answers of "yes" would be much appreciated :-).
Thanks!-Mike



Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-21 Thread Michael Albert
This is something of a wild guess, but I find that when executors start 
disappearingfor no obvious reason, this is usually because the yarn 
node-managers have decided that the containers are using too much memory and 
then terminate the executors.
Unfortunately, to see evidence of this, one needs to carefully review the yarn 
node-manager logson the workers -- it doesn't seem to show up in the UI.
What I generally do is some combination of:   1) increasing executors memory 
(often also decreasing number of executors)   2) decreasing the number of cores 
per executor   3) increase the executor memory overhead.
Good luck.
-Mike
  From: Sandy Ryza sandy.r...@cloudera.com
 To: Umesh Kacha umesh.ka...@gmail.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Thursday, August 20, 2015 5:21 PM
 Subject: Re: How to avoid executor time out on yarn spark while dealing with 
large shuffle skewed data?
   
GC wouldn't necessarily result in errors - it could just be slowing down your 
job and causing the executor JVMs to stall.  If you click on a stage in the UI, 
you should end up on a page with all the metrics concerning the tasks that ran 
in that stage.  GC Time is one of these task metrics.
-Sandy


On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead 
as 3500 which seems to be good enough I believe. So you mean only GC could be 
the reason behind timeout I checked Yarn logs I did not see any GC error there. 
Please guide. Thanks much.
On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

Moving this back onto user@
Regarding GC, can you look in the web UI and see whether the GC time metric 
dominates the amount of time spent on each task (or at least the tasks that 
aren't completing)?
Also, have you tried bumping your spark.yarn.executor.memoryOverhead?  YARN may 
be killing your executors for using too much off-heap space.  You can see 
whether this is happening by looking in the Spark AM or YARN NodeManager logs.
-Sandy
On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi thanks much for the response. Yes I tried default settings too 0.2 it was 
also going into timeout if it is spending time in GC then why it is not 
throwing GC error I don't see any such error. Yarn logs are not helpful at all. 
What is tungsten how do I use it? Spark is doing great I believe my job runs 
successfully and 60% tasks completes only after first executor gets lost things 
are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com 
wrote:

What sounds most likely is that you're hitting heavy garbage collection.  Did 
you hit issues when the shuffle memory fraction was at its default of 0.2?  A 
potential danger with setting the shuffle storage to 0.7 is that it allows 
shuffle objects to get into the GC old generation, which triggers more 
stop-the-world garbage collections.
Have you tried enabling Tungsten / unsafe?
Unfortunately, Spark is still not that great at dealing with heavily-skewed 
shuffle data, because its reduce-side aggregation still operates on Java 
objects instead of binary data.
-Sandy
On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set 
spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries 
executed using hiveContext.sql my data set is skewed so will be more shuffling 
I believe I don't know what's wrong spark job runs fine for almost an hour and 
when shuffle read shuffle write column in UI starts to show more than 10 gb 
executor starts to getting lost because of timeout and slowly other executor 
starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza 
sandy.r...@cloudera.com wrote:

What version of Spark are you using?  Have you set any shuffle configs?
On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote:

I have one Spark job which seems to run fine but after one hour or so
executor start getting lost because of time out something like the following
error

cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
60 seconds

and because of above error couple of chained errors starts to come like
FetchFailedException, Rpc client disassociated, Connection reset by peer,
IOException etc

Please see the following UI page I have noticed when shuffle read/write
starts to increase more than 10 GB executors starts getting lost because of
timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
section I dont cache anything why Spark is not clearing those memory. Please
guide.

IMG_20150819_231418358.jpg
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg




--
View this message in context: 

Re: Wired Problem: Task not serializable[Spark Streaming]

2015-06-08 Thread Michael Albert
Note that in scala, return is a non-local return: 
https://tpolecat.github.io/2014/05/09/return.htmlSo that return is *NOT* 
returning from the anonymous function, but attempting to return from the 
enclosing method, i.e., main.Which is running on the driver, not on the 
workers.So on the workers, there is no where to which the return can 
jump.Hence it is not serializable.
Good luck.-Mike

  From: bit1...@163.com bit1...@163.com
 To: user user@spark.apache.org 
 Sent: Monday, June 8, 2015 10:01 PM
 Subject: Re: Wired Problem: Task not serializable[Spark Streaming]
   
#yiv1823860044 body {line-height:1.5;}#yiv1823860044 blockquote 
{margin-top:0px;margin-bottom:0px;margin-left:0.5em;}#yiv1823860044 
div.yiv1823860044foxdiv20150609100051035499 {}#yiv1823860044 body 
{font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Could someone help 
explain what happens that leads to the Task not serializable issue?Thanks.
bit1...@163.com
 From: bit1129@163.comDate: 2015-06-08 19:08To: userSubject: Wired Problem: 
Task not serializable[Spark Streaming]Hi, With the following simple code, I got 
an exception that complains Task not serializable. The root cause is I use 
return in map foreach loop

Why return in map foreach loop cause the Task not serializable problem, can 
someone please this to me?


import org.apache.spark.SparkConf
import org.apache.spark.streaming._

import scala.collection.mutable

object NetCatStreamingWordCount3 {
 def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(NetCatWordCount)
 conf.setMaster(local[3])
 val ssc = new StreamingContext(conf, Seconds(5))
 val lines = ssc.socketTextStream(localhost, )
 lines.foreachRDD(rdd = {
 rdd.foreachPartition(partitionIterable= {
 val map = mutable.Map[String, String]()
 while(partitionIterable.hasNext) {
 val v = partitionIterable.next()
 map += v -v
 }

 map.foreach(entry = {
 if (entry._1.equals(abc)) {
 return;  //This is the root cause that cause the  Task not serializable.
 }
 })

 })
 })
 ssc.start()
 ssc.awaitTermination()
 }
}bit1...@163.com


  

Re: variant record by case classes in shell fails?

2015-04-03 Thread Michael Albert
My apologies for following my own post, but a friend just pointed out that if I 
use kryo with reference counting AND copy-and-paste, this runs.
However, if I try to load file, this fails as described below.
I thought load was supposed to be equivalent?
Thanks!-Mike

  From: Michael Albert m_albert...@yahoo.com.INVALID
 To: User user@spark.apache.org 
 Sent: Friday, April 3, 2015 2:45 PM
 Subject: variant record by case classes in shell fails?
   
Greetings!
For me, the code below fails from the shell.However, I can do essentially the 
same from compiled code, exporting the jar.
If I use default serialization or kryo with reference tracking, the error 
message tells me it can't find the constructor for A.If I use kryo with 
reference tracking, I get a stack overflow.
I'm using Spark 1.2.1 on AWS EMR (hadoop 2.4).
I've also tried putting this code inside an object.
Is this just me?  Am I overlooking something obvious?
Thanks!
-Mike
:paste
sealed class AorBcase class A(i: Int) extends AorBcase class B(i: Int, j: Int) 
extends AorB
sc.parallelize(0.until(1)).map{ _ =    val x = A(1)    x}.collect()



  

Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Michael Albert
Thanks for the information! (to all who responded)
The code below *seems* to work.Any hidden gotcha's that anyone sees?
And still, in terasort, how did they check that the data was actually sorted? 
:-)
-Mike
class MyInputFormat[T]    extends parquet.hadoop.ParquetInputFormat[T]{    
override def getSplits(jobContext: org.apache.hadoop.mapreduce.JobContext)      
  :java.util.List[org.apache.hadoop.mapreduce.InputSplit] =    {        val 
splits = super.getSplits(jobContext)           import 
scala.collection.JavaConversions._        splits.sortBy{ split = split match { 
                        case fileSplit                            
:org.apache.hadoop.mapreduce.lib.input.FileSplit                                
        = (fileSplit.getPath.getName,                                          
   fileSplit.getStart)                         case _ = (,-1L) } }    }}

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: User user@spark.apache.org 
 Sent: Monday, March 23, 2015 7:31 AM
 Subject: Re: How to check that a dataset is sorted after it has been written 
out?
   
Data is not (necessarily) sorted when read from disk, no. A file might
have many blocks even, and while a block yields a partition in
general, the order in which those partitions appear in the RDD is not
defined. This is why you'd sort if you need the data sorted.

I think you could conceivably make some custom RDD or InputFormat that
reads blocks in a well-defined order and, assuming the data is sorted
in some knowable way on disk, then must have them sorted. I think
that's even been brought up.

Deciding whether the data is sorted is quite different. You'd have to
decide what ordering you expect (is part 0 before part 1? should it be
sorted in a part file?) and then just verify that externally.



On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 I sorted a dataset in Spark and then wrote it out in avro/parquet.

 Then I wanted to check that it was sorted.

 It looks like each partition has been sorted, but when reading in, the first
 partition (i.e., as
 seen in the partition index of mapPartitionsWithIndex) is not the same  as
 implied by
 the names of the parquet files (even when the number of partitions is the
 same in the
 rdd which was read as on disk).

 If I take() a few hundred values, they are sorted, but they are *not* the
 same as if I
 explicitly open part-r-0.parquet and take values from that.

 It seems that when opening the rdd, the partitions of the rdd are not in
 the same
 order as implied by the data on disk (i.e., part-r-0.parquet,
 part-r-1.parquet, etc).

 So, how might one read the data so that one maintains the sort order?

 And while on the subject, after the terasort, how did they check that the
 data was actually sorted correctly? (or did they :-) ? ).

 Is there any way to read the data back in so as to preserve the sort, or do
 I need to
 zipWithIndex before writing it out, and write the index at that time? (I
 haven't tried the
 latter yet).

 Thanks!
 -Mike


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

How to check that a dataset is sorted after it has been written out? [repost]

2015-03-22 Thread Michael Albert
Greetings![My apologies for this respost, I'm not certain that the first 
message made it to the list].
I sorted a dataset in Spark and then wrote it out in avro/parquet.
Then I wanted to check that it was sorted.
It looks like each partition has been sorted, but when reading in, the first 
partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is 
not the same  as implied by the names of the parquet files (even when the 
number of partitions is the same in therdd which was read as on disk).
If I take() a few hundred values, they are sorted, but they are *not* the 
same as if I explicitly open part-r-0.parquet and take values from that.
It seems that when opening the rdd, the partitions of the rdd are not in the 
sameorder as implied by the data on disk (i.e., part-r-0.parquet, 
part-r-1.parquet, etc).
So, how might one read the data so that one maintains the sort order?
And while on the subject, after the terasort, how did they check that the 
data was actually sorted correctly? (or did they :-) ? ).
Is there any way to read the data back in so as to preserve the sort, or do I 
need to zipWithIndex before writing it out, and write the index at that time? 
(I haven't tried the latter yet).
Thanks!-Mike



How to check that a dataset is sorted after it has been written out?

2015-03-20 Thread Michael Albert
Greetings!
I sorted a dataset in Spark and then wrote it out in avro/parquet.
Then I wanted to check that it was sorted.
It looks like each partition has been sorted, but when reading in, the first 
partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is 
not the same  as implied by the names of the parquet files (even when the 
number of partitions is the same in therdd which was read as on disk).
If I take() a few hundred values, they are sorted, but they are *not* the 
same as if I explicitly open part-r-0.parquet and take values from that.
It seems that when opening the rdd, the partitions of the rdd are not in the 
sameorder as implied by the data on disk (i.e., part-r-0.parquet, 
part-r-1.parquet, etc).
So, how might one read the data so that one maintains the sort order?
And while on the subject, after the terasort, how did they check that the 
data was actually sorted correctly? (or did they :-) ? ).
Is there any way to read the data back in so as to preserve the sort, or do I 
need to zipWithIndex before writing it out, and write the index at that time? 
(I haven't tried the latter yet).
Thanks!-Mike


Re: How to debug a Hung task

2015-02-28 Thread Michael Albert
For what it's worth, I was seeing mysterious hangs, but it went away when 
upgrading from spark1.2 to 1.2.1.I don't know if this is your problem.Also, I'm 
using AWS EMR images, which were also upgraded.
Anyway, that's my experience.
-Mike

  From: Manas Kar manasdebashis...@gmail.com
 To: user@spark.apache.org user@spark.apache.org 
 Sent: Friday, February 27, 2015 3:50 PM
 Subject: How to debug a Hung task
   
Hi,  I have a spark application that hangs on doing just one task (Rest 200-300 
task gets completed in reasonable time)I can see in the Thread dump which 
function gets stuck how ever I don't have a clue as to what value is causing 
that behaviour.Also, logging the inputs before the function is executed does 
not help as the actual message gets buried in logs.
How do one go about debugging such case?Also, is there a way I can wrap my 
function inside some sort of timer based environment and if it took too long I 
would throw a stack trace or some sort. 
ThanksManas

  

Spark stalls or hangs: is this a clue? remote fetches seem to never return?

2015-02-05 Thread Michael Albert
Greetings!
Again, thanks to all who have given suggestions.I am still trying to diagnose a 
problem where I have processes than run for one or several hours but 
intermittently stall or hang.By stall I mean that there is no CPU usage on 
the workers or the driver, nor network activity, nor do I see disk activity.It 
just hangs.
Using the Application Master to find which workers still had active tasks, I 
then went to that machine and looked in the user logs.In one of the users log's 
stderr files, it ends with Started 50 remote fetchesShould there be a 
message saying that the fetch was completed?Any suggestions as to how I might 
diagnose why the fetch was not completed?
Thanks!-Mike
Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: 
ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 
01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in 
memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO 
spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching 
them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; 
tracker actor = 
Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06
 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 
5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the 
output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: 
Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO 
storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 
blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 
remote fetches in 47 ms15/02/06 01:33:46 INFO 
storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's 
been like that for half and hour.
Thanks!-Mike



Re: Spark stalls or hangs: is this a clue? remote fetches seem to never return?

2015-02-05 Thread Michael Albert
My apologies for following up my own post, but I thought this might be of 
interest.
I terminated the java process corresponding to executor which had opened the 
stderr file mentioned below (kill pid).Then my spark job completed without 
error (it was actually almost finished).
Now I am completely confused :-).
Thanks!-Mike

  From: Michael Albert m_albert...@yahoo.com.INVALID
 To: user@spark.apache.org user@spark.apache.org 
 Sent: Thursday, February 5, 2015 9:04 PM
 Subject: Spark stalls or hangs: is this a clue? remote fetches seem to never 
return?
   
Greetings!
Again, thanks to all who have given suggestions.I am still trying to diagnose a 
problem where I have processes than run for one or several hours but 
intermittently stall or hang.By stall I mean that there is no CPU usage on 
the workers or the driver, nor network activity, nor do I see disk activity.It 
just hangs.
Using the Application Master to find which workers still had active tasks, I 
then went to that machine and looked in the user logs.In one of the users log's 
stderr files, it ends with Started 50 remote fetchesShould there be a 
message saying that the fetch was completed?Any suggestions as to how I might 
diagnose why the fetch was not completed?
Thanks!-Mike
Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: 
ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 
01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in 
memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO 
spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching 
them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; 
tracker actor = 
Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06
 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 
5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the 
output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: 
Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO 
storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 
blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 
remote fetches in 47 ms15/02/06 01:33:46 INFO 
storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's 
been like that for half and hour.
Thanks!-Mike



  

Re: Spark Job running on localhost on yarn cluster

2015-02-04 Thread Michael Albert
1) Parameters like --num-executors should come before the jar.  That is, you 
want something like$SPARK_HOME --num-executors 3 --driver-memory 6g 
--executor-memory 7g \--master yarn-cluster  --class EDDApp 
target/scala-2.10/eddjar \outputPath
That is, *your* parameters come after the jar, spark's parameters come *before* 
the jar.That's how spark knows which are which (at least that is my 
understanding).
2‚ Double check that in your code, when you create the SparkContext or the 
configuration object, that you don't set local there.(I don't recall the exact 
order of priority if the parameters disagree with the code).
Good luck!
-Mike

  From: kundan kumar iitr.kun...@gmail.com
 To: spark users user@spark.apache.org 
 Sent: Wednesday, February 4, 2015 7:41 AM
 Subject: Spark Job running on localhost on yarn cluster
   
Hi, 
I am trying to execute my code on a yarn cluster
The command which I am using is 
$SPARK_HOME/bin/spark-submit --class EDDApp 
target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster 
--num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath

But, I can see that this program is running only on the localhost.
Its able to read the file from hdfs.
I have tried this in standalone mode and it works fine.
Please suggest where is it going wrong.

Regards,Kundan

  

Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Michael Albert
=OFF_SWITCH2015-02-04 18:18:28,636 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
 (ResourceManager Event Processor): Application application_1422834185427_0088 
reserved container container_1422834185427_0088_01_21 on node host: 
ip-10-171-0-129.ec2.internal:9103 #containers=2 available=5632 used=17408, 
currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,636 
INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): Reserved container  
application=application_1422834185427_0088 resource=memory:8704, vCores:1 
queue=default: capacity=1.0, absoluteCapacity=1.0, 
usedResources=memory:226304, vCores:26usedCapacity=0.982, 
absoluteUsedCapacity=0.982, numApps=1, numContainers=26 
usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, 
vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,636 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Skipping scheduling since node 
ip-10-171-0-129.ec2.internal:9103 is reserved by application 
appattempt_1422834185427_0088_012015-02-04 18:18:28,645 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): default usedResources: memory:226304, 
vCores:26 clusterResources: memory:230400, vCores:160 currentCapacity 
0.982 required memory:8704, vCores:1 potentialNewCapacity: 1.02 (  
max-capacity: 1.0)2015-02-04 18:18:28,646 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Trying to fulfill reservation for 
application application_1422834185427_0088 on node: 
ip-10-171-0-122.ec2.internal:91032015-02-04 18:18:28,646 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): assignContainers: 
node=ip-10-171-0-122.ec2.internal application=88 priority=1 request={Priority: 
1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: 
*, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,646 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
 (ResourceManager Event Processor): Application application_1422834185427_0088 
reserved container container_1422834185427_0088_01_23 on node host: 
ip-10-171-0-122.ec2.internal:9103 #containers=2 available=5632 used=17408, 
currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,646 
INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue 
(ResourceManager Event Processor): Reserved container  
application=application_1422834185427_0088 resource=memory:8704, vCores:1 
queue=default: capacity=1.0, absoluteCapacity=1.0, 
usedResources=memory:226304, vCores:26usedCapacity=0.982, 
absoluteUsedCapacity=0.982, numApps=1, numContainers=26 
usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, 
vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,646 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
 (ResourceManager Event Processor): Skipping scheduling since node 
ip-10-171-0-122.ec2.internal:9103 is reserved by application 
appattempt_1422834185427_0088_01

  From: Sandy Ryza sandy.r...@cloudera.com
 To: Imran Rashid iras...@cloudera.com 
Cc: Michael Albert m_albert...@yahoo.com; user@spark.apache.org 
user@spark.apache.org 
 Sent: Wednesday, February 4, 2015 12:54 PM
 Subject: Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
   
Also, do you see any lines in the YARN NodeManager logs where it says that it's 
killing a container?
-Sandy 


On Wed, Feb 4, 2015 at 8:56 AM, Imran Rashid iras...@cloudera.com wrote:

Hi Michael,
judging from the logs, it seems that those tasks are just working a really long 
time.  If you have long running tasks, then you wouldn't expect the driver to 
output anything while those tasks are working.
What is unusual is that there is no activity during all that time the tasks are 
executing.  Are you sure you are looking at the activity of the executors (the 
nodes that are actually running the tasks), and not the activity of the driver 
node (the node where your main program lives, but that doesn't do any of the 
distributed computation)?  It would be perfectly normal for the driver node to 
be idle while all the executors were busy with long running tasks.
I would look at:(a) the cpu usage etc. of the executor nodes during those long 
running tasks(b) the thread dumps of the executors during those long running 
tasks (available via the UI under the Executors tab, or just log into the 
boxes and run jstack).  Ideally this will point out a hotspot in your code that 
is making these tasks take so long.  (Or perhaps it'll point out what is going 
on in spark internals that is so slow)(c

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Thank you!
This is very helpful.
-Mike

  From: Aaron Davidson ilike...@gmail.com
 To: Imran Rashid iras...@cloudera.com 
Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; 
user@spark.apache.org user@spark.apache.org 
 Sent: Tuesday, February 3, 2015 6:13 PM
 Subject: Re: 2GB limit for partitions?
   
To be clear, there is no distinction between partitions and blocks for RDD 
caching (each RDD partition corresponds to 1 cache block). The distinction is 
important for shuffling, where by definition N partitions are shuffled into M 
partitions, creating N*M intermediate blocks. Each of these blocks must also be 
smaller than 2GB, but due to their number, this is an atypical scenario.
If you do
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1000).count()
you should not see this error, as the 5GB initial partition was split into 1000 
partitions of 5MB each, during a shuffle.
On the other hand,
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same reason.


On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

Michael,
you are right, there is definitely some limit at 2GB.  Here is a trivial 
example to demonstrate it:
import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 
1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count()
It gives the same error you are observing.  I was under the same impression as 
Sean about the limits only being on blocks, not partitions -- but clearly that 
isn't the case here.
I don't know the whole story yet, but I just wanted to at least let you know 
you aren't crazy :)At the very least this suggests that you might need to make 
smaller partitions for now.
Imran

On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt I am working on is something like this.
 1) Do a map whose output key indicates which of the 1,000 files it will go
 into and whose value is what I will want to stick into the file.
 2) Partition the data and use the body of mapPartition to open a file and
 save the data.

 My apologies, this is actually embedded in a bigger mess, so I won't post
 it.

 However, I get errors telling me that there is an IllegalArgumentException:
 Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
 top of the stack.  This leads me to think that I have hit the limit or
 partition and/or block size.

 Perhaps this is not a good way to do it?

 I suppose I could run 1,000 passes over the data, each time collecting the
 output for one of my 1,000 final files, but that seems likely to be
 painfully slow to run.

 Am I missing something?

 Admittedly, this is an odd use case

 Thanks!

 Sincerely,
  Mike Albert

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-03 Thread Michael Albert
Greetings!
First, my sincere thanks to all who have given me advice.Following previous 
discussion, I've rearranged my code to try to keep the partitions to more 
manageable sizes.Thanks to all who commented.
At the moment, the input set I'm trying to work with is about 90GB (avro 
parquet format).
When I run on a reasonable chunk of the data (say half) things work reasonably.
On the full data, the spark process stalls.That is, for about 1.5 hours out of 
a 3.5 hour run, I see no activity.No cpu usage, no error message, no network 
activity.It just seems to sits there.The messages bracketing the stall are 
shown below.
Any advice on how to diagnose this? I don't get any error messages.  The spark 
UI says that it is running a stage, but it makes no discernible 
progress.Ganglia shows no CPU usage or network activity.When I shell into the 
worker nodes there are no filled disks or other obvious problems.
How can I discern what Spark is waiting for?
The only weird thing seen, other than the stall, is that the yarn logs on the 
workers have lines with messages like this:2015-02-03 22:59:58,890 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id 
container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 
7.6 GB of 42.5 GB virtual memory used
It's rather strange that it mentions 42.5 GB of virtual memory.  The machines 
are EMR machines with 32 GB of physical memory and, as far as I can determine, 
no swap space.
The messages bracketing the stall are shown below.

Any advice is welcome.
Thanks!
Sincerely, Mike Albert
Before the stall.15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: 
Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 
21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at 
Transposer.scala:147) finished in 4880.317 s15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: looking for newly runnable stages15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: running: Set(Stage 3)15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8)15/02/03 
21:45:28 INFO scheduler.DAGScheduler: failed: Set()15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3)15/02/03 
21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 
6)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: 
List(Stage 7)At this point, I see no activity for 1.5 hours except for this 
(XXX for I.P. address)15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to 
ExecutorActor: 
akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor
Then finally it started again:15/02/03 23:31:34 INFO scheduler.TaskSetManager: 
Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on 
ip-10-171-0-124.ec2.internal (3/4)15/02/03 23:31:34 INFO 
scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 
ms on ip-10-171-0-128.ec2.internal (4/4)15/02/03 23:31:34 INFO 
scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) 
finished in 7209.534 s




2GB limit for partitions?

2015-02-02 Thread Michael Albert
Greetings!
SPARK-1476 says that there is a 2G limit for blocks.Is this the same as a 2G 
limit for partitions (or approximately so?)?

What I had been attempting to do is the following.1) Start with a moderately 
large data set (currently about 100GB, but growing).2) Create about 1,000 files 
(yes, files) each representing a subset of the data.
The current attempt I am working on is something like this.1) Do a map whose 
output key indicates which of the 1,000 files it will go into and whose value 
is what I will want to stick into the file.2) Partition the data and use the 
body of mapPartition to open a file and save the data.
My apologies, this is actually embedded in a bigger mess, so I won't post it.
However, I get errors telling me that there is an IllegalArgumentException: 
Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top 
of the stack.  This leads me to think that I have hit the limit or partition 
and/or block size.
Perhaps this is not a good way to do it?
I suppose I could run 1,000 passes over the data, each time collecting the 
output for one of my 1,000 final files, but that seems likely to be painfully 
slow to run.
Am I missing something?
Admittedly, this is an odd use case
Thanks!
Sincerely, Mike Albert

How does unmanaged memory work with the executor memory limits?

2015-01-12 Thread Michael Albert
Greetings!
My executors apparently are being terminated because they are running beyond 
physical memory limits according to the yarn-hadoop-nodemanager logs on the 
worker nodes (/mnt/var/log/hadoop on AWS EMR).  I'm setting the driver-memory 
to 8G.However, looking at stdout in userlogs, I can see GC going on, but the 
lines looklike 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the 
process is usingabout 6G of space, not 8G of space.  However, ps aux shows an 
RSS hovering just below 8G.
The process does a mapParitionsWithIndex, and the process uses 
compressionwhich (I believe) calls into the native zlib library (the overall 
purpose is to convert each partition into a matlab file).
Could it be that the Yarn container is counting both the memory used by the JVM 
proper and memory used by zlib, but that the GC only sees the internal 
memory.  So the GC keeps the memory usage reasonable, e.g., 6G in an 8G 
container, but then zlib grabs some memory, and the YARN container then 
terminates the task?
If so, is there anything I can do so that I tell YARN to watch for a 
largermemory limit than I tell the JVM to use for it's memory?
Thanks!
Sincerely, Mike
 

Re: a vague question, but perhaps it might ring a bell

2015-01-05 Thread Michael Albert
Greeting!
Thank you very much for taking the time to respond.
My apologies, but at the moment I don't have an example that I feel comfortable 
posting.  Frankly, I've been struggling with variantsof this for the last two 
weeks and probably won't be able to work on this particular issue for a few 
days.
However, I am intrigued by your comment.  You mention when I closethe fs 
object inside map/mapPartition etc.  Where else can one close theobject?  If I 
don't close it, the output file is generally truncated.
Again, the code seems to work for  a few hundred files, then I get theseweird 
errors. Is this something subtle related to the shipping of the closure thatI'm 
not aware of?  
Can you give a general idea of how you handled this?Is it necessary to create a 
custom OutputFormat class?I was looking at the OutputFormat code and it looks 
like it also createsan fs object and starts writing, but perhaps there is 
some subtle difference in the context?
Thank you.
Sincerely, Mike

  From: Akhil Das ak...@sigmoidanalytics.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, January 5, 2015 1:21 AM
 Subject: Re: a vague question, but perhaps it might ring a bell
   
What are you trying to do? Can you paste the whole code? I used to see this 
sort of Exception when i close the fs object inside map/mapPartition etc.
ThanksBest Regards


On Mon, Jan 5, 2015 at 6:43 AM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
So, I think I have data saved so that each partition (part-r-0, etc)is 
exactly what I wan to translate into an output file of a format not related to 
hadoop.
I believe I've figured out how to tell Spark to read the data set without 
re-partitioning (in another post I mentioned this -- I have a non-splitable 
InputFormat).
I do something like   mapPartitionWithIndex( (partId, iter) =            conf 
= new Configuration()           fs = Filesystem.get(conf)           strm = 
fs.create(new Path(...))            //  write data to stream          
strm.close() // in finally block }
This runs for a few hundred input files (so each executors sees 10's of 
files),and it chugs along nicely, then suddenly everything shuts down.I can 
restart (telling it to skip the partIds which it has already completed), and 
itchugs along again for a while (going past the previous stopping point) and 
again dies.
I am a t a loss.  This work for the first 10's of files (so it runs for about 
1hr) then quits,and I see no useful error information (no Exceptions except the 
stuff below.I'm not shutting it down.
Any idea what I might check? I've bumped up the memory multiple times (16G 
currently)and fiddled with increasing other parameters.
Thanks.Exception in thread main org.apache.spark.SparkException: Job 
cancelled because SparkContext was shut down    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)    at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)    at 
akka.actor.ActorCell.terminate(ActorCell.scala:338)    at 
akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)    at 
akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)    at 
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
           



  

Reading one partition at a time

2015-01-04 Thread Michael Albert
Greetings!

I would like to know if the code below will read one-partition-at-a-time,
and whether I am reinventing the wheel.

If I may explain, upstream code has managed (I hope) to save an RDD such 
that each partition file (e.g, part-r-0, part-r-1) contains exactly the 
data subset which I would like to 
repackage in a file of a non-hadoop format.  So what I want to do is 
something like mapPartitionsWithIndex on this data (which is stored in 
sequence files, SNAPPY compressed).  However, if I simply open the data
set with sequenceFile(), the data is re-partitioned and I loose the 
partitioning
I want. My intention is that in the closure passed to mapPartitionWithIndex,

I'll open an HDFS file and write the data from the partition in my desired 
format, 
one file for each input partition.
The code below seems to work, I think.  Have I missed something bad?
Thanks!

-Mike

class NonSplittingSequenceFileInputFormat[K,V]
//extends 
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[K,V] // XXX
extends org.apache.hadoop.mapred.SequenceFileInputFormat[K,V]
{
override def isSplitable(
//context: org.apache.hadoop.mapreduce.JobContext,
//path: org.apache.hadoop.fs.Path) = false
fs: org.apache.hadoop.fs.FileSystem,
filename: org.apache.hadoop.fs.Path) = false

}


sc.hadoopFile(outPathPhase1,
classOf[NonSplittingSequenceFileInputFormat[K, V]],
classOf[K],
   classOf[V],
   1)
}

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



a vague question, but perhaps it might ring a bell

2015-01-04 Thread Michael Albert
Greetings!
So, I think I have data saved so that each partition (part-r-0, etc)is 
exactly what I wan to translate into an output file of a format not related to 
hadoop.
I believe I've figured out how to tell Spark to read the data set without 
re-partitioning (in another post I mentioned this -- I have a non-splitable 
InputFormat).
I do something like   mapPartitionWithIndex( (partId, iter) =            conf 
= new Configuration()           fs = Filesystem.get(conf)           strm = 
fs.create(new Path(...))            //  write data to stream          
strm.close() // in finally block }
This runs for a few hundred input files (so each executors sees 10's of 
files),and it chugs along nicely, then suddenly everything shuts down.I can 
restart (telling it to skip the partIds which it has already completed), and 
itchugs along again for a while (going past the previous stopping point) and 
again dies.
I am a t a loss.  This work for the first 10's of files (so it runs for about 
1hr) then quits,and I see no useful error information (no Exceptions except the 
stuff below.I'm not shutting it down.
Any idea what I might check? I've bumped up the memory multiple times (16G 
currently)and fiddled with increasing other parameters.
Thanks.Exception in thread main org.apache.spark.SparkException: Job 
cancelled because SparkContext was shut down    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)    at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)    at 
akka.actor.ActorCell.terminate(ActorCell.scala:338)    at 
akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)    at 
akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)    at 
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
           

Re: unable to do group by with 1st column

2014-12-28 Thread Michael Albert
Greetings!
Thanks for the comment.
I have tried several variants of this, as indicated.
The code works on small sets, but fails on larger sets.However, I don't get 
memory errors.I see java.nio.channels.CancelledKeyException and things about 
lost taskand then things like Resubmitting state 1, and off it goes.
I've already upped the memory (I think the last experiment had 
--executor-memory 6G and --driver memory 6G.
I'm experimenting with recoding this with map-reduce and so far seem to be 
having more success (with HADOOP_OPTS=-Xmx6g -Xmx5g)
Again, each grouping should have no more than 6E7 values, and the data is 
(DataKey(Int,Int), Option[Float]), so that shouldn't need 5g?
Anyway, thanks for the info.
Best wishes,Mike


  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org 
 Sent: Friday, December 26, 2014 3:23 PM
 Subject: Re: unable to do group by with 1st column
   
Here is a sketch of what you need to do off the top of my head and based on a 
guess of what your RDD is like:val in: RDD[(K,Seq[(C,V)])] = ...in.flatMap { 
case (key, colVals) =
  colVals.map { case (col, val) = 
    (col, (key, val))
  }
}.groupByKeySo the problem with both input and output here is that all values 
for each key exist in memory at once. When transposed, each element contains 
50M key value pairs. You probably should try to do what you're trying to do a 
slightly different way.Depends on what you mean by resubmitting but I imagine 
you need a cache() on an RDD you are reusing. 

On Dec 26, 2014 4:18 PM, Michael Albert m_albert...@yahoo.com.invalid wrote:

Greetings!
I'm trying to do something similar, and having a very bad time of it.
What I start with is
key1: (col1, val-1-1, col2: val-1-2, col3: val-1-3, col4: val-1-4...)key2: 
(col1: val-2-1, col2: val-2-2, col3: val-2-3, col4: val 2-4, ...)
What I want  (what I have been asked to produce :-)) is:
col1: (key1: val-1-1, key2: val-2-1, key3, val-3-1, ...)col2: (key1: val-1-2, 
key2: val2-2, key3: val-3-2,...)
So basically the transpose.  The input is actually avro/parquet with each key 
in one record. In the output, the final step is to convert each column into a 
matlab file.Please don't ask me whether this is a good idea. 
I can get this to work for smallish data sets (e.g, a few hundred keys and a 
few hundred columns).However, if I crank up the number of keys to about 5e7, 
then this fails, even if I turn the number of columns that are actually used 
down to 10.
The system seems to spend lots of time resubmitting parts of the first phase in 
which the data is read from the original records and shuffled and never quite 
finishes.
I can't post the code, but I can give folks and idea of what I've tried.
Try #1: Mapper emits data as (DataKey(col-as-int,key-as-int), 
value-as-Option[Any]), then create a ShuffledRDD using the col-as-int for 
partitioning and then SetKeyOrdering on the key-as-int.  This is then fed to 
mapPartitionWithIndex.
Try #2: Emit (col-as-int, (key-as-int, value)) and groupBy, and have a final 
map() on each col.
Try #3: Emit (col-as-t, Collection[(key-as-int, value)]), then have a 
reduceByKey which takes the union of the collection (union for set, ++ for 
list) then havea final map() which attempts the final conversion.
No matter what I do, it works for for small numbers of keys (hundreds), but 
when I crank it up, it seems to sit there resubmitting the shuffle phase.
Happy holidays, all!-Mike 

  From: Amit Behera amit.bd...@gmail.com
 To: u...@spark.incubator.apache.org 
 Sent: Thursday, December 25, 2014 3:22 PM
 Subject: unable to do group by with 1st column
   
Hi Users,
I am reading a csv file and my data format is like :
key1,value1key1,value2
key1,value1
key1,value3
key2,value1
key2,value5
key2,value5
key2,value4key1,value4key1,value4
key3,value1
key3,value1
key3,value2

required output :
key1:[value1,value2,value1,value3,value4,value4]key2:[value1,value5,value5,value4]key3:[value1,value1,value2]

How can I do it? Please help me to do.
ThanksAmit   


   


  

Re: unable to do group by with 1st column

2014-12-26 Thread Michael Albert
Greetings!
I'm trying to do something similar, and having a very bad time of it.
What I start with is
key1: (col1, val-1-1, col2: val-1-2, col3: val-1-3, col4: val-1-4...)key2: 
(col1: val-2-1, col2: val-2-2, col3: val-2-3, col4: val 2-4, ...)
What I want  (what I have been asked to produce :-)) is:
col1: (key1: val-1-1, key2: val-2-1, key3, val-3-1, ...)col2: (key1: val-1-2, 
key2: val2-2, key3: val-3-2,...)
So basically the transpose.  The input is actually avro/parquet with each key 
in one record. In the output, the final step is to convert each column into a 
matlab file.Please don't ask me whether this is a good idea. 
I can get this to work for smallish data sets (e.g, a few hundred keys and a 
few hundred columns).However, if I crank up the number of keys to about 5e7, 
then this fails, even if I turn the number of columns that are actually used 
down to 10.
The system seems to spend lots of time resubmitting parts of the first phase in 
which the data is read from the original records and shuffled and never quite 
finishes.
I can't post the code, but I can give folks and idea of what I've tried.
Try #1: Mapper emits data as (DataKey(col-as-int,key-as-int), 
value-as-Option[Any]), then create a ShuffledRDD using the col-as-int for 
partitioning and then SetKeyOrdering on the key-as-int.  This is then fed to 
mapPartitionWithIndex.
Try #2: Emit (col-as-int, (key-as-int, value)) and groupBy, and have a final 
map() on each col.
Try #3: Emit (col-as-t, Collection[(key-as-int, value)]), then have a 
reduceByKey which takes the union of the collection (union for set, ++ for 
list) then havea final map() which attempts the final conversion.
No matter what I do, it works for for small numbers of keys (hundreds), but 
when I crank it up, it seems to sit there resubmitting the shuffle phase.
Happy holidays, all!-Mike 

  From: Amit Behera amit.bd...@gmail.com
 To: u...@spark.incubator.apache.org 
 Sent: Thursday, December 25, 2014 3:22 PM
 Subject: unable to do group by with 1st column
   
Hi Users,
I am reading a csv file and my data format is like :
key1,value1key1,value2
key1,value1
key1,value3
key2,value1
key2,value5
key2,value5
key2,value4key1,value4key1,value4
key3,value1
key3,value1
key3,value2

required output :
key1:[value1,value2,value1,value3,value4,value4]key2:[value1,value5,value5,value4]key3:[value1,value1,value2]

How can I do it? Please help me to do.
ThanksAmit   


  

Re: avro + parquet + vectorstring + NullPointerException while reading

2014-11-06 Thread Michael Albert
Thanks for the advice!
What seems to work for is is that I define the array type as:   type: { 
type: array, items: string, java-class: java.util.ArrayList }It 
seems to be creating an avro.Generic.List, which spark doesn't know how to 
serialize, instead of a guava.util.List, which spark likes.
Hive at 0.13.1 still can't read it though...Thanks!-Mike

  From: Michael Armbrust mich...@databricks.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Tuesday, November 4, 2014 2:37 PM
 Subject: Re: avro + parquet + vectorstring + NullPointerException while 
reading
   
You might consider using the native parquet support built into Spark SQL 
instead of using the raw library: 
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files



On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
I'm trying to use avro and parquet with the following schema:{    name: 
TestStruct,    namespace: bughunt,    type: record,    fields: [    
    {            name: string_array,            type: { type: array, 
items: string }         }    ]}The writing process seems to be OK, but when 
I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerExceptionSerialization trace:string_array 
(bughunt.TestStruct) at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try 
to read it with Hive, I get this:Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast 
to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this 
might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 
, but that one seems to be Hive specific, and I am not seeing Spark read the 
data it claims to have written itself.
I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and 
spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:package bughunt
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._

import parquet.avro.AvroWriteSupportimport 
parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat
import java.util.ArrayList

object GenData {    val outputPath = /user/x/testdata    val words = 
List(                     List(apple, banana, cherry),                    
List(car, boat, plane),                    List(lion, tiger, bear), 
                   List(north, south, east, west),                    
List(up, down, left, right),                    List(red, green, 
blue))
    def main(args: Array[String]) {        val conf = new SparkConf(true)       
             .setAppName(IngestLoanApplicattion)                    
//.set(spark.kryo.registrator,                    //            
classOf[CommonRegistrator].getName)                    .set(spark.serializer, 
                           org.apache.spark.serializer.KryoSerializer)        
            .set(spark.kryoserializer.buffer.mb, 4.toString)                  
  .set(spark.kryo.referenceTracking, false)
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(words)
        val job = new Job(sc.hadoopConfiguration)
        ParquetOutputFormat.setWriteSupportClass(job, 
classOf[AvroWriteSupport])        AvroParquetOutputFormat.setSchema(job,        
            TestStruct.SCHEMA$)
        rdd.map(p = {                     val xs = new 
java.util.ArrayList[String]                    for (z-p) { xs.add(z) }         
           val bldr = TestStruct.newBuilder()                    
bldr.setStringArray(xs)                    (null, bldr.build()) })           
.saveAsNewAPIHadoopFile(outputPath,                classOf[Void],               
 classOf[TestStruct],                classOf[ParquetOutputFormat[TestStruct]],  
              job.getConfiguration)    }}
To read the data, I use this sort of code from the spark-shell::paste
import bughunt.TestStruct
import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext
import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport
def openRddSpecific(sc: SparkContext) = {    val job = new 
Job(sc.hadoopConfiguration)
    ParquetInputFormat.setReadSupportClass(job,            
classOf[AvroReadSupport[TestStruct]])
    sc.newAPIHadoopFile(/user/malbert/testdata,            
classOf[ParquetInputFormat[TestStruct]],            classOf[Void],            
classOf[TestStruct],            job.getConfiguration)}I start the Spark shell 
as follows:spark-shell \    --jars 
../my-jar-containing-the-class

avro + parquet + vectorstring + NullPointerException while reading

2014-11-03 Thread Michael Albert

Greetings!




I'm trying to use avro and parquet with the following schema:

{

    name: TestStruct,

    namespace: bughunt,

    type: record,

    fields: [

        {

            name: string_array,

            type: { type: array, items: string } 

        }

    ]



}
The writing process seems to be OK, but when I try to read it with Spark, I get:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

string_array (bughunt.TestStruct)

 at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
When I try to read it with Hive, I get this:
Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast 
to org.apache.hadoop.io.ArrayWritable
Which would lead me to suspect that this might be related to this one: 
https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be 
Hive specific, and I am not seeing Spark read the data it claims to have 
written itself.
I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and 
spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:
package bughunt




import org.apache.hadoop.mapreduce.Job




import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._







import parquet.avro.AvroWriteSupport

import parquet.avro.AvroParquetOutputFormat

import parquet.hadoop.ParquetOutputFormat




import java.util.ArrayList







object GenData {

    val outputPath = /user/x/testdata

    val words = List( 

                    List(apple, banana, cherry),

                    List(car, boat, plane),

                    List(lion, tiger, bear),

                    List(north, south, east, west),

                    List(up, down, left, right),

                    List(red, green, blue))




    def main(args: Array[String]) {

        val conf = new SparkConf(true)

                    .setAppName(IngestLoanApplicattion)

                    //.set(spark.kryo.registrator,

                    //            classOf[CommonRegistrator].getName)

                    .set(spark.serializer,

                            org.apache.spark.serializer.KryoSerializer)

                    .set(spark.kryoserializer.buffer.mb, 4.toString)

                    .set(spark.kryo.referenceTracking, false)




        val sc = new SparkContext(conf)




        val rdd = sc.parallelize(words)




        val job = new Job(sc.hadoopConfiguration)




        ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])

        AvroParquetOutputFormat.setSchema(job,

                    TestStruct.SCHEMA$)




        rdd.map(p = { 

                    val xs = new java.util.ArrayList[String]

                    for (z-p) { xs.add(z) }

                    val bldr = TestStruct.newBuilder()

                    bldr.setStringArray(xs)

                    (null, bldr.build()) })

           .saveAsNewAPIHadoopFile(outputPath,

                classOf[Void],

                classOf[TestStruct],

                classOf[ParquetOutputFormat[TestStruct]],

                job.getConfiguration)

    }

}

To read the data, I use this sort of code from the spark-shell:
:paste




import bughunt.TestStruct




import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext




import parquet.hadoop.ParquetInputFormat

import parquet.avro.AvroReadSupport




def openRddSpecific(sc: SparkContext) = {

    val job = new Job(sc.hadoopConfiguration)




    ParquetInputFormat.setReadSupportClass(job,

            classOf[AvroReadSupport[TestStruct]])




    sc.newAPIHadoopFile(/user/malbert/testdata,

            classOf[ParquetInputFormat[TestStruct]],

            classOf[Void],

            classOf[TestStruct],

            job.getConfiguration)

}
I start the Spark shell as follows:
spark-shell \

    --jars ../my-jar-containing-the-class-definitions.jar \

    --conf mapreduce.user.classpath.first=true \

    --conf spark.kryo.referenceTracking=false \

    --conf spark.kryoserializer.buffer.mb=4 \

    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 

I'm stumped.  I can read and write records and maps, but arrays/vectors elude 
me.Am I missing something obvious?
Thanks!
Sincerely, Mike Albert

BUG: when running as extends App, closures don't capture variables

2014-10-29 Thread Michael Albert
Greetings!
This might be a documentation issue as opposed to a coding issue, in that 
perhaps the correct answer is don't do that, but as this is not obvious, I am 
writing.
The following code produces output most would not expect:
package misc
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._
object DemoBug extends App {    val conf = new SparkConf()    val sc = new 
SparkContext(conf)
    val rdd = sc.parallelize(List(A,B,C,D))    val str1 = A
    val rslt1 = rdd.filter(x = { x != A }).count    val rslt2 = rdd.filter(x 
= { str1 != null  x != A }).count        println(DemoBug: rslt1 =  + 
rslt1 +  rslt2 =  + rslt2)}
This produces the output:DemoBug: rslt1 = 3 rslt2 = 0
Compiled with sbt:libraryDependencies += org.apache.spark % spark-core_2.10 
% 1.1.0Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 
1.1.0)
If instead there is a proper main(), it works as expected.
Thank you.
Sincerely, Mike