are functions deserialized once per task?
Greetings! Is it true that functions, such as those passed to RDD.map(), are deserialized once per task?This seems to be the case looking at Executor.scala, but I don't really understand the code. I'm hoping the answer is yes because that makes it easier to write code without worrying about thread safety.For example, suppose I have something like this:class FooToBarTransformer{ def transform(foo: Foo): Bar = .} Now I want to do something like this:val rddFoo : RDD[FOO] = val transformer = new TransformerrddBar = rddFoo.map( foo => transformer.transform(foo)) If the "transformer" object is deserialized once per task, then I do not need to worry whether "transform()" is thread safe.If, for example, the implementation tried "optimize" matters by caching the deserialization, so that one object was sharedby all threads in a single JVM, then presumably one would need to worry about the thread safety of transform(). Is my understanding correct?Is this likely to continue to be true in future releases?Answers of "yes" would be much appreciated :-). Thanks!-Mike
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
This is something of a wild guess, but I find that when executors start disappearingfor no obvious reason, this is usually because the yarn node-managers have decided that the containers are using too much memory and then terminate the executors. Unfortunately, to see evidence of this, one needs to carefully review the yarn node-manager logson the workers -- it doesn't seem to show up in the UI. What I generally do is some combination of: 1) increasing executors memory (often also decreasing number of executors) 2) decreasing the number of cores per executor 3) increase the executor memory overhead. Good luck. -Mike From: Sandy Ryza sandy.r...@cloudera.com To: Umesh Kacha umesh.ka...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Thursday, August 20, 2015 5:21 PM Subject: Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data? GC wouldn't necessarily result in errors - it could just be slowing down your job and causing the executor JVMs to stall. If you click on a stage in the UI, you should end up on a page with all the metrics concerning the tasks that ran in that stage. GC Time is one of these task metrics. -Sandy On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead as 3500 which seems to be good enough I believe. So you mean only GC could be the reason behind timeout I checked Yarn logs I did not see any GC error there. Please guide. Thanks much. On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Moving this back onto user@ Regarding GC, can you look in the web UI and see whether the GC time metric dominates the amount of time spent on each task (or at least the tasks that aren't completing)? Also, have you tried bumping your spark.yarn.executor.memoryOverhead? YARN may be killing your executors for using too much off-heap space. You can see whether this is happening by looking in the Spark AM or YARN NodeManager logs. -Sandy On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks much for the response. Yes I tried default settings too 0.2 it was also going into timeout if it is spending time in GC then why it is not throwing GC error I don't see any such error. Yarn logs are not helpful at all. What is tungsten how do I use it? Spark is doing great I believe my job runs successfully and 60% tasks completes only after first executor gets lost things are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What sounds most likely is that you're hitting heavy garbage collection. Did you hit issues when the shuffle memory fraction was at its default of 0.2? A potential danger with setting the shuffle storage to 0.7 is that it allows shuffle objects to get into the GC old generation, which triggers more stop-the-world garbage collections. Have you tried enabling Tungsten / unsafe? Unfortunately, Spark is still not that great at dealing with heavily-skewed shuffle data, because its reduce-side aggregation still operates on Java objects instead of binary data. -Sandy On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries executed using hiveContext.sql my data set is skewed so will be more shuffling I believe I don't know what's wrong spark job runs fine for almost an hour and when shuffle read shuffle write column in UI starts to show more than 10 gb executor starts to getting lost because of timeout and slowly other executor starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context:
Re: Wired Problem: Task not serializable[Spark Streaming]
Note that in scala, return is a non-local return: https://tpolecat.github.io/2014/05/09/return.htmlSo that return is *NOT* returning from the anonymous function, but attempting to return from the enclosing method, i.e., main.Which is running on the driver, not on the workers.So on the workers, there is no where to which the return can jump.Hence it is not serializable. Good luck.-Mike From: bit1...@163.com bit1...@163.com To: user user@spark.apache.org Sent: Monday, June 8, 2015 10:01 PM Subject: Re: Wired Problem: Task not serializable[Spark Streaming] #yiv1823860044 body {line-height:1.5;}#yiv1823860044 blockquote {margin-top:0px;margin-bottom:0px;margin-left:0.5em;}#yiv1823860044 div.yiv1823860044foxdiv20150609100051035499 {}#yiv1823860044 body {font-size:10.5pt;color:rgb(0, 0, 0);line-height:1.5;}Could someone help explain what happens that leads to the Task not serializable issue?Thanks. bit1...@163.com From: bit1129@163.comDate: 2015-06-08 19:08To: userSubject: Wired Problem: Task not serializable[Spark Streaming]Hi, With the following simple code, I got an exception that complains Task not serializable. The root cause is I use return in map foreach loop Why return in map foreach loop cause the Task not serializable problem, can someone please this to me? import org.apache.spark.SparkConf import org.apache.spark.streaming._ import scala.collection.mutable object NetCatStreamingWordCount3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName(NetCatWordCount) conf.setMaster(local[3]) val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) lines.foreachRDD(rdd = { rdd.foreachPartition(partitionIterable= { val map = mutable.Map[String, String]() while(partitionIterable.hasNext) { val v = partitionIterable.next() map += v -v } map.foreach(entry = { if (entry._1.equals(abc)) { return; //This is the root cause that cause the Task not serializable. } }) }) }) ssc.start() ssc.awaitTermination() } }bit1...@163.com
Re: variant record by case classes in shell fails?
My apologies for following my own post, but a friend just pointed out that if I use kryo with reference counting AND copy-and-paste, this runs. However, if I try to load file, this fails as described below. I thought load was supposed to be equivalent? Thanks!-Mike From: Michael Albert m_albert...@yahoo.com.INVALID To: User user@spark.apache.org Sent: Friday, April 3, 2015 2:45 PM Subject: variant record by case classes in shell fails? Greetings! For me, the code below fails from the shell.However, I can do essentially the same from compiled code, exporting the jar. If I use default serialization or kryo with reference tracking, the error message tells me it can't find the constructor for A.If I use kryo with reference tracking, I get a stack overflow. I'm using Spark 1.2.1 on AWS EMR (hadoop 2.4). I've also tried putting this code inside an object. Is this just me? Am I overlooking something obvious? Thanks! -Mike :paste sealed class AorBcase class A(i: Int) extends AorBcase class B(i: Int, j: Int) extends AorB sc.parallelize(0.until(1)).map{ _ = val x = A(1) x}.collect()
Re: How to check that a dataset is sorted after it has been written out?
Thanks for the information! (to all who responded) The code below *seems* to work.Any hidden gotcha's that anyone sees? And still, in terasort, how did they check that the data was actually sorted? :-) -Mike class MyInputFormat[T] extends parquet.hadoop.ParquetInputFormat[T]{ override def getSplits(jobContext: org.apache.hadoop.mapreduce.JobContext) :java.util.List[org.apache.hadoop.mapreduce.InputSplit] = { val splits = super.getSplits(jobContext) import scala.collection.JavaConversions._ splits.sortBy{ split = split match { case fileSplit :org.apache.hadoop.mapreduce.lib.input.FileSplit = (fileSplit.getPath.getName, fileSplit.getStart) case _ = (,-1L) } } }} From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: User user@spark.apache.org Sent: Monday, March 23, 2015 7:31 AM Subject: Re: How to check that a dataset is sorted after it has been written out? Data is not (necessarily) sorted when read from disk, no. A file might have many blocks even, and while a block yields a partition in general, the order in which those partitions appear in the RDD is not defined. This is why you'd sort if you need the data sorted. I think you could conceivably make some custom RDD or InputFormat that reads blocks in a well-defined order and, assuming the data is sorted in some knowable way on disk, then must have them sorted. I think that's even been brought up. Deciding whether the data is sorted is quite different. You'd have to decide what ordering you expect (is part 0 before part 1? should it be sorted in a part file?) and then just verify that externally. On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in the rdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the same order as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks! -Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to check that a dataset is sorted after it has been written out? [repost]
Greetings![My apologies for this respost, I'm not certain that the first message made it to the list]. I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in therdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the sameorder as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks!-Mike
How to check that a dataset is sorted after it has been written out?
Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in therdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the sameorder as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks!-Mike
Re: How to debug a Hung task
For what it's worth, I was seeing mysterious hangs, but it went away when upgrading from spark1.2 to 1.2.1.I don't know if this is your problem.Also, I'm using AWS EMR images, which were also upgraded. Anyway, that's my experience. -Mike From: Manas Kar manasdebashis...@gmail.com To: user@spark.apache.org user@spark.apache.org Sent: Friday, February 27, 2015 3:50 PM Subject: How to debug a Hung task Hi, I have a spark application that hangs on doing just one task (Rest 200-300 task gets completed in reasonable time)I can see in the Thread dump which function gets stuck how ever I don't have a clue as to what value is causing that behaviour.Also, logging the inputs before the function is executed does not help as the actual message gets buried in logs. How do one go about debugging such case?Also, is there a way I can wrap my function inside some sort of timer based environment and if it took too long I would throw a stack trace or some sort. ThanksManas
Spark stalls or hangs: is this a clue? remote fetches seem to never return?
Greetings! Again, thanks to all who have given suggestions.I am still trying to diagnose a problem where I have processes than run for one or several hours but intermittently stall or hang.By stall I mean that there is no CPU usage on the workers or the driver, nor network activity, nor do I see disk activity.It just hangs. Using the Application Master to find which workers still had active tasks, I then went to that machine and looked in the user logs.In one of the users log's stderr files, it ends with Started 50 remote fetchesShould there be a message saying that the fetch was completed?Any suggestions as to how I might diagnose why the fetch was not completed? Thanks!-Mike Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 47 ms15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's been like that for half and hour. Thanks!-Mike
Re: Spark stalls or hangs: is this a clue? remote fetches seem to never return?
My apologies for following up my own post, but I thought this might be of interest. I terminated the java process corresponding to executor which had opened the stderr file mentioned below (kill pid).Then my spark job completed without error (it was actually almost finished). Now I am completely confused :-). Thanks!-Mike From: Michael Albert m_albert...@yahoo.com.INVALID To: user@spark.apache.org user@spark.apache.org Sent: Thursday, February 5, 2015 9:04 PM Subject: Spark stalls or hangs: is this a clue? remote fetches seem to never return? Greetings! Again, thanks to all who have given suggestions.I am still trying to diagnose a problem where I have processes than run for one or several hours but intermittently stall or hang.By stall I mean that there is no CPU usage on the workers or the driver, nor network activity, nor do I see disk activity.It just hangs. Using the Application Master to find which workers still had active tasks, I then went to that machine and looked in the user logs.In one of the users log's stderr files, it ends with Started 50 remote fetchesShould there be a message saying that the fetch was completed?Any suggestions as to how I might diagnose why the fetch was not completed? Thanks!-Mike Here is the last part of the log:15/02/06 01:33:46 INFO storage.MemoryStore: ensureFreeSpace(5368) called with curMem=875861, maxMem=231564902415/02/06 01:33:46 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.2 KB, free 2.2 GB)15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-171-0-208.ec2.internal:44124/user/MapOutputTracker#-878402310]15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 5, fetching them15/02/06 01:33:46 INFO spark.MapOutputTrackerWorker: Got the output locations15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 47 ms15/02/06 01:33:46 INFO storage.ShuffleBlockFetcherIterator: Started 50 remote fetches in 48 msIt's been like that for half and hour. Thanks!-Mike
Re: Spark Job running on localhost on yarn cluster
1) Parameters like --num-executors should come before the jar. That is, you want something like$SPARK_HOME --num-executors 3 --driver-memory 6g --executor-memory 7g \--master yarn-cluster --class EDDApp target/scala-2.10/eddjar \outputPath That is, *your* parameters come after the jar, spark's parameters come *before* the jar.That's how spark knows which are which (at least that is my understanding). 2‚ Double check that in your code, when you create the SparkContext or the configuration object, that you don't set local there.(I don't recall the exact order of priority if the parameters disagree with the code). Good luck! -Mike From: kundan kumar iitr.kun...@gmail.com To: spark users user@spark.apache.org Sent: Wednesday, February 4, 2015 7:41 AM Subject: Spark Job running on localhost on yarn cluster Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class EDDApp target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g outpuPath But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards,Kundan
Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
=OFF_SWITCH2015-02-04 18:18:28,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt (ResourceManager Event Processor): Application application_1422834185427_0088 reserved container container_1422834185427_0088_01_21 on node host: ip-10-171-0-129.ec2.internal:9103 #containers=2 available=5632 used=17408, currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): Reserved container application=application_1422834185427_0088 resource=memory:8704, vCores:1 queue=default: capacity=1.0, absoluteCapacity=1.0, usedResources=memory:226304, vCores:26usedCapacity=0.982, absoluteUsedCapacity=0.982, numApps=1, numContainers=26 usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Skipping scheduling since node ip-10-171-0-129.ec2.internal:9103 is reserved by application appattempt_1422834185427_0088_012015-02-04 18:18:28,645 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): default usedResources: memory:226304, vCores:26 clusterResources: memory:230400, vCores:160 currentCapacity 0.982 required memory:8704, vCores:1 potentialNewCapacity: 1.02 ( max-capacity: 1.0)2015-02-04 18:18:28,646 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Trying to fulfill reservation for application application_1422834185427_0088 on node: ip-10-171-0-122.ec2.internal:91032015-02-04 18:18:28,646 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): assignContainers: node=ip-10-171-0-122.ec2.internal application=88 priority=1 request={Priority: 1, Capability: memory:8704, vCores:1, # Containers: 17, Labels: , Location: *, Relax Locality: true} type=OFF_SWITCH2015-02-04 18:18:28,646 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt (ResourceManager Event Processor): Application application_1422834185427_0088 reserved container container_1422834185427_0088_01_23 on node host: ip-10-171-0-122.ec2.internal:9103 #containers=2 available=5632 used=17408, currently has 6 at priority 1; currentReservation 522242015-02-04 18:18:28,646 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue (ResourceManager Event Processor): Reserved container application=application_1422834185427_0088 resource=memory:8704, vCores:1 queue=default: capacity=1.0, absoluteCapacity=1.0, usedResources=memory:226304, vCores:26usedCapacity=0.982, absoluteUsedCapacity=0.982, numApps=1, numContainers=26 usedCapacity=0.982 absoluteUsedCapacity=0.982 used=memory:226304, vCores:26 cluster=memory:230400, vCores:1602015-02-04 18:18:28,646 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler (ResourceManager Event Processor): Skipping scheduling since node ip-10-171-0-122.ec2.internal:9103 is reserved by application appattempt_1422834185427_0088_01 From: Sandy Ryza sandy.r...@cloudera.com To: Imran Rashid iras...@cloudera.com Cc: Michael Albert m_albert...@yahoo.com; user@spark.apache.org user@spark.apache.org Sent: Wednesday, February 4, 2015 12:54 PM Subject: Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job? Also, do you see any lines in the YARN NodeManager logs where it says that it's killing a container? -Sandy On Wed, Feb 4, 2015 at 8:56 AM, Imran Rashid iras...@cloudera.com wrote: Hi Michael, judging from the logs, it seems that those tasks are just working a really long time. If you have long running tasks, then you wouldn't expect the driver to output anything while those tasks are working. What is unusual is that there is no activity during all that time the tasks are executing. Are you sure you are looking at the activity of the executors (the nodes that are actually running the tasks), and not the activity of the driver node (the node where your main program lives, but that doesn't do any of the distributed computation)? It would be perfectly normal for the driver node to be idle while all the executors were busy with long running tasks. I would look at:(a) the cpu usage etc. of the executor nodes during those long running tasks(b) the thread dumps of the executors during those long running tasks (available via the UI under the Executors tab, or just log into the boxes and run jstack). Ideally this will point out a hotspot in your code that is making these tasks take so long. (Or perhaps it'll point out what is going on in spark internals that is so slow)(c
Re: 2GB limit for partitions?
Thank you! This is very helpful. -Mike From: Aaron Davidson ilike...@gmail.com To: Imran Rashid iras...@cloudera.com Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Tuesday, February 3, 2015 6:13 PM Subject: Re: 2GB limit for partitions? To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :)At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt
Re: 2GB limit for partitions?
Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Greetings! First, my sincere thanks to all who have given me advice.Following previous discussion, I've rearranged my code to try to keep the partitions to more manageable sizes.Thanks to all who commented. At the moment, the input set I'm trying to work with is about 90GB (avro parquet format). When I run on a reasonable chunk of the data (say half) things work reasonably. On the full data, the spark process stalls.That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.No cpu usage, no error message, no network activity.It just seems to sits there.The messages bracketing the stall are shown below. Any advice on how to diagnose this? I don't get any error messages. The spark UI says that it is running a stage, but it makes no discernible progress.Ganglia shows no CPU usage or network activity.When I shell into the worker nodes there are no filled disks or other obvious problems. How can I discern what Spark is waiting for? The only weird thing seen, other than the stall, is that the yarn logs on the workers have lines with messages like this:2015-02-03 22:59:58,890 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 13158 for container-id container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 7.6 GB of 42.5 GB virtual memory used It's rather strange that it mentions 42.5 GB of virtual memory. The machines are EMR machines with 32 GB of physical memory and, as far as I can determine, no swap space. The messages bracketing the stall are shown below. Any advice is welcome. Thanks! Sincerely, Mike Albert Before the stall.15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable stages15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8)15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 6)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: List(Stage 7)At this point, I see no activity for 1.5 hours except for this (XXX for I.P. address)15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor: akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor Then finally it started again:15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4)15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) finished in 7209.534 s
2GB limit for partitions?
Greetings! SPARK-1476 says that there is a 2G limit for blocks.Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following.1) Start with a moderately large data set (currently about 100GB, but growing).2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this.1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file.2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert
How does unmanaged memory work with the executor memory limits?
Greetings! My executors apparently are being terminated because they are running beyond physical memory limits according to the yarn-hadoop-nodemanager logs on the worker nodes (/mnt/var/log/hadoop on AWS EMR). I'm setting the driver-memory to 8G.However, looking at stdout in userlogs, I can see GC going on, but the lines looklike 6G - 5G(7.2G), 0.45secs, so the GC seems to think that the process is usingabout 6G of space, not 8G of space. However, ps aux shows an RSS hovering just below 8G. The process does a mapParitionsWithIndex, and the process uses compressionwhich (I believe) calls into the native zlib library (the overall purpose is to convert each partition into a matlab file). Could it be that the Yarn container is counting both the memory used by the JVM proper and memory used by zlib, but that the GC only sees the internal memory. So the GC keeps the memory usage reasonable, e.g., 6G in an 8G container, but then zlib grabs some memory, and the YARN container then terminates the task? If so, is there anything I can do so that I tell YARN to watch for a largermemory limit than I tell the JVM to use for it's memory? Thanks! Sincerely, Mike
Re: a vague question, but perhaps it might ring a bell
Greeting! Thank you very much for taking the time to respond. My apologies, but at the moment I don't have an example that I feel comfortable posting. Frankly, I've been struggling with variantsof this for the last two weeks and probably won't be able to work on this particular issue for a few days. However, I am intrigued by your comment. You mention when I closethe fs object inside map/mapPartition etc. Where else can one close theobject? If I don't close it, the output file is generally truncated. Again, the code seems to work for a few hundred files, then I get theseweird errors. Is this something subtle related to the shipping of the closure thatI'm not aware of? Can you give a general idea of how you handled this?Is it necessary to create a custom OutputFormat class?I was looking at the OutputFormat code and it looks like it also createsan fs object and starts writing, but perhaps there is some subtle difference in the context? Thank you. Sincerely, Mike From: Akhil Das ak...@sigmoidanalytics.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, January 5, 2015 1:21 AM Subject: Re: a vague question, but perhaps it might ring a bell What are you trying to do? Can you paste the whole code? I used to see this sort of Exception when i close the fs object inside map/mapPartition etc. ThanksBest Regards On Mon, Jan 5, 2015 at 6:43 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! So, I think I have data saved so that each partition (part-r-0, etc)is exactly what I wan to translate into an output file of a format not related to hadoop. I believe I've figured out how to tell Spark to read the data set without re-partitioning (in another post I mentioned this -- I have a non-splitable InputFormat). I do something like mapPartitionWithIndex( (partId, iter) = conf = new Configuration() fs = Filesystem.get(conf) strm = fs.create(new Path(...)) // write data to stream strm.close() // in finally block } This runs for a few hundred input files (so each executors sees 10's of files),and it chugs along nicely, then suddenly everything shuts down.I can restart (telling it to skip the partIds which it has already completed), and itchugs along again for a while (going past the previous stopping point) and again dies. I am a t a loss. This work for the first 10's of files (so it runs for about 1hr) then quits,and I see no useful error information (no Exceptions except the stuff below.I'm not shutting it down. Any idea what I might check? I've bumped up the memory multiple times (16G currently)and fiddled with increasing other parameters. Thanks.Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
Reading one partition at a time
Greetings! I would like to know if the code below will read one-partition-at-a-time, and whether I am reinventing the wheel. If I may explain, upstream code has managed (I hope) to save an RDD such that each partition file (e.g, part-r-0, part-r-1) contains exactly the data subset which I would like to repackage in a file of a non-hadoop format. So what I want to do is something like mapPartitionsWithIndex on this data (which is stored in sequence files, SNAPPY compressed). However, if I simply open the data set with sequenceFile(), the data is re-partitioned and I loose the partitioning I want. My intention is that in the closure passed to mapPartitionWithIndex, I'll open an HDFS file and write the data from the partition in my desired format, one file for each input partition. The code below seems to work, I think. Have I missed something bad? Thanks! -Mike class NonSplittingSequenceFileInputFormat[K,V] //extends org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[K,V] // XXX extends org.apache.hadoop.mapred.SequenceFileInputFormat[K,V] { override def isSplitable( //context: org.apache.hadoop.mapreduce.JobContext, //path: org.apache.hadoop.fs.Path) = false fs: org.apache.hadoop.fs.FileSystem, filename: org.apache.hadoop.fs.Path) = false } sc.hadoopFile(outPathPhase1, classOf[NonSplittingSequenceFileInputFormat[K, V]], classOf[K], classOf[V], 1) } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
a vague question, but perhaps it might ring a bell
Greetings! So, I think I have data saved so that each partition (part-r-0, etc)is exactly what I wan to translate into an output file of a format not related to hadoop. I believe I've figured out how to tell Spark to read the data set without re-partitioning (in another post I mentioned this -- I have a non-splitable InputFormat). I do something like mapPartitionWithIndex( (partId, iter) = conf = new Configuration() fs = Filesystem.get(conf) strm = fs.create(new Path(...)) // write data to stream strm.close() // in finally block } This runs for a few hundred input files (so each executors sees 10's of files),and it chugs along nicely, then suddenly everything shuts down.I can restart (telling it to skip the partIds which it has already completed), and itchugs along again for a while (going past the previous stopping point) and again dies. I am a t a loss. This work for the first 10's of files (so it runs for about 1hr) then quits,and I see no useful error information (no Exceptions except the stuff below.I'm not shutting it down. Any idea what I might check? I've bumped up the memory multiple times (16G currently)and fiddled with increasing other parameters. Thanks.Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
Re: unable to do group by with 1st column
Greetings! Thanks for the comment. I have tried several variants of this, as indicated. The code works on small sets, but fails on larger sets.However, I don't get memory errors.I see java.nio.channels.CancelledKeyException and things about lost taskand then things like Resubmitting state 1, and off it goes. I've already upped the memory (I think the last experiment had --executor-memory 6G and --driver memory 6G. I'm experimenting with recoding this with map-reduce and so far seem to be having more success (with HADOOP_OPTS=-Xmx6g -Xmx5g) Again, each grouping should have no more than 6E7 values, and the data is (DataKey(Int,Int), Option[Float]), so that shouldn't need 5g? Anyway, thanks for the info. Best wishes,Mike From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org Sent: Friday, December 26, 2014 3:23 PM Subject: Re: unable to do group by with 1st column Here is a sketch of what you need to do off the top of my head and based on a guess of what your RDD is like:val in: RDD[(K,Seq[(C,V)])] = ...in.flatMap { case (key, colVals) = colVals.map { case (col, val) = (col, (key, val)) } }.groupByKeySo the problem with both input and output here is that all values for each key exist in memory at once. When transposed, each element contains 50M key value pairs. You probably should try to do what you're trying to do a slightly different way.Depends on what you mean by resubmitting but I imagine you need a cache() on an RDD you are reusing. On Dec 26, 2014 4:18 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I'm trying to do something similar, and having a very bad time of it. What I start with is key1: (col1, val-1-1, col2: val-1-2, col3: val-1-3, col4: val-1-4...)key2: (col1: val-2-1, col2: val-2-2, col3: val-2-3, col4: val 2-4, ...) What I want (what I have been asked to produce :-)) is: col1: (key1: val-1-1, key2: val-2-1, key3, val-3-1, ...)col2: (key1: val-1-2, key2: val2-2, key3: val-3-2,...) So basically the transpose. The input is actually avro/parquet with each key in one record. In the output, the final step is to convert each column into a matlab file.Please don't ask me whether this is a good idea. I can get this to work for smallish data sets (e.g, a few hundred keys and a few hundred columns).However, if I crank up the number of keys to about 5e7, then this fails, even if I turn the number of columns that are actually used down to 10. The system seems to spend lots of time resubmitting parts of the first phase in which the data is read from the original records and shuffled and never quite finishes. I can't post the code, but I can give folks and idea of what I've tried. Try #1: Mapper emits data as (DataKey(col-as-int,key-as-int), value-as-Option[Any]), then create a ShuffledRDD using the col-as-int for partitioning and then SetKeyOrdering on the key-as-int. This is then fed to mapPartitionWithIndex. Try #2: Emit (col-as-int, (key-as-int, value)) and groupBy, and have a final map() on each col. Try #3: Emit (col-as-t, Collection[(key-as-int, value)]), then have a reduceByKey which takes the union of the collection (union for set, ++ for list) then havea final map() which attempts the final conversion. No matter what I do, it works for for small numbers of keys (hundreds), but when I crank it up, it seems to sit there resubmitting the shuffle phase. Happy holidays, all!-Mike From: Amit Behera amit.bd...@gmail.com To: u...@spark.incubator.apache.org Sent: Thursday, December 25, 2014 3:22 PM Subject: unable to do group by with 1st column Hi Users, I am reading a csv file and my data format is like : key1,value1key1,value2 key1,value1 key1,value3 key2,value1 key2,value5 key2,value5 key2,value4key1,value4key1,value4 key3,value1 key3,value1 key3,value2 required output : key1:[value1,value2,value1,value3,value4,value4]key2:[value1,value5,value5,value4]key3:[value1,value1,value2] How can I do it? Please help me to do. ThanksAmit
Re: unable to do group by with 1st column
Greetings! I'm trying to do something similar, and having a very bad time of it. What I start with is key1: (col1, val-1-1, col2: val-1-2, col3: val-1-3, col4: val-1-4...)key2: (col1: val-2-1, col2: val-2-2, col3: val-2-3, col4: val 2-4, ...) What I want (what I have been asked to produce :-)) is: col1: (key1: val-1-1, key2: val-2-1, key3, val-3-1, ...)col2: (key1: val-1-2, key2: val2-2, key3: val-3-2,...) So basically the transpose. The input is actually avro/parquet with each key in one record. In the output, the final step is to convert each column into a matlab file.Please don't ask me whether this is a good idea. I can get this to work for smallish data sets (e.g, a few hundred keys and a few hundred columns).However, if I crank up the number of keys to about 5e7, then this fails, even if I turn the number of columns that are actually used down to 10. The system seems to spend lots of time resubmitting parts of the first phase in which the data is read from the original records and shuffled and never quite finishes. I can't post the code, but I can give folks and idea of what I've tried. Try #1: Mapper emits data as (DataKey(col-as-int,key-as-int), value-as-Option[Any]), then create a ShuffledRDD using the col-as-int for partitioning and then SetKeyOrdering on the key-as-int. This is then fed to mapPartitionWithIndex. Try #2: Emit (col-as-int, (key-as-int, value)) and groupBy, and have a final map() on each col. Try #3: Emit (col-as-t, Collection[(key-as-int, value)]), then have a reduceByKey which takes the union of the collection (union for set, ++ for list) then havea final map() which attempts the final conversion. No matter what I do, it works for for small numbers of keys (hundreds), but when I crank it up, it seems to sit there resubmitting the shuffle phase. Happy holidays, all!-Mike From: Amit Behera amit.bd...@gmail.com To: u...@spark.incubator.apache.org Sent: Thursday, December 25, 2014 3:22 PM Subject: unable to do group by with 1st column Hi Users, I am reading a csv file and my data format is like : key1,value1key1,value2 key1,value1 key1,value3 key2,value1 key2,value5 key2,value5 key2,value4key1,value4key1,value4 key3,value1 key3,value1 key3,value2 required output : key1:[value1,value2,value1,value3,value4,value4]key2:[value1,value5,value5,value4]key3:[value1,value1,value2] How can I do it? Please help me to do. ThanksAmit
Re: avro + parquet + vectorstring + NullPointerException while reading
Thanks for the advice! What seems to work for is is that I define the array type as: type: { type: array, items: string, java-class: java.util.ArrayList }It seems to be creating an avro.Generic.List, which spark doesn't know how to serialize, instead of a guava.util.List, which spark likes. Hive at 0.13.1 still can't read it though...Thanks!-Mike From: Michael Armbrust mich...@databricks.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Tuesday, November 4, 2014 2:37 PM Subject: Re: avro + parquet + vectorstring + NullPointerException while reading You might consider using the native parquet support built into Spark SQL instead of using the raw library: http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I'm trying to use avro and parquet with the following schema:{ name: TestStruct, namespace: bughunt, type: record, fields: [ { name: string_array, type: { type: array, items: string } } ]}The writing process seems to be OK, but when I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException: java.lang.NullPointerExceptionSerialization trace:string_array (bughunt.TestStruct) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try to read it with Hive, I get this:Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself. I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior? For completeness, here is the code that writes the data:package bughunt import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ import parquet.avro.AvroWriteSupportimport parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat import java.util.ArrayList object GenData { val outputPath = /user/x/testdata val words = List( List(apple, banana, cherry), List(car, boat, plane), List(lion, tiger, bear), List(north, south, east, west), List(up, down, left, right), List(red, green, blue)) def main(args: Array[String]) { val conf = new SparkConf(true) .setAppName(IngestLoanApplicattion) //.set(spark.kryo.registrator, // classOf[CommonRegistrator].getName) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 4.toString) .set(spark.kryo.referenceTracking, false) val sc = new SparkContext(conf) val rdd = sc.parallelize(words) val job = new Job(sc.hadoopConfiguration) ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, TestStruct.SCHEMA$) rdd.map(p = { val xs = new java.util.ArrayList[String] for (z-p) { xs.add(z) } val bldr = TestStruct.newBuilder() bldr.setStringArray(xs) (null, bldr.build()) }) .saveAsNewAPIHadoopFile(outputPath, classOf[Void], classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]], job.getConfiguration) }} To read the data, I use this sort of code from the spark-shell::paste import bughunt.TestStruct import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport def openRddSpecific(sc: SparkContext) = { val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[TestStruct]]) sc.newAPIHadoopFile(/user/malbert/testdata, classOf[ParquetInputFormat[TestStruct]], classOf[Void], classOf[TestStruct], job.getConfiguration)}I start the Spark shell as follows:spark-shell \ --jars ../my-jar-containing-the-class
avro + parquet + vectorstring + NullPointerException while reading
Greetings! I'm trying to use avro and parquet with the following schema: { name: TestStruct, namespace: bughunt, type: record, fields: [ { name: string_array, type: { type: array, items: string } } ] } The writing process seems to be OK, but when I try to read it with Spark, I get: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: string_array (bughunt.TestStruct) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) When I try to read it with Hive, I get this: Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritable Which would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself. I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior? For completeness, here is the code that writes the data: package bughunt import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import parquet.avro.AvroWriteSupport import parquet.avro.AvroParquetOutputFormat import parquet.hadoop.ParquetOutputFormat import java.util.ArrayList object GenData { val outputPath = /user/x/testdata val words = List( List(apple, banana, cherry), List(car, boat, plane), List(lion, tiger, bear), List(north, south, east, west), List(up, down, left, right), List(red, green, blue)) def main(args: Array[String]) { val conf = new SparkConf(true) .setAppName(IngestLoanApplicattion) //.set(spark.kryo.registrator, // classOf[CommonRegistrator].getName) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 4.toString) .set(spark.kryo.referenceTracking, false) val sc = new SparkContext(conf) val rdd = sc.parallelize(words) val job = new Job(sc.hadoopConfiguration) ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, TestStruct.SCHEMA$) rdd.map(p = { val xs = new java.util.ArrayList[String] for (z-p) { xs.add(z) } val bldr = TestStruct.newBuilder() bldr.setStringArray(xs) (null, bldr.build()) }) .saveAsNewAPIHadoopFile(outputPath, classOf[Void], classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]], job.getConfiguration) } } To read the data, I use this sort of code from the spark-shell: :paste import bughunt.TestStruct import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkContext import parquet.hadoop.ParquetInputFormat import parquet.avro.AvroReadSupport def openRddSpecific(sc: SparkContext) = { val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[TestStruct]]) sc.newAPIHadoopFile(/user/malbert/testdata, classOf[ParquetInputFormat[TestStruct]], classOf[Void], classOf[TestStruct], job.getConfiguration) } I start the Spark shell as follows: spark-shell \ --jars ../my-jar-containing-the-class-definitions.jar \ --conf mapreduce.user.classpath.first=true \ --conf spark.kryo.referenceTracking=false \ --conf spark.kryoserializer.buffer.mb=4 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer I'm stumped. I can read and write records and maps, but arrays/vectors elude me.Am I missing something obvious? Thanks! Sincerely, Mike Albert
BUG: when running as extends App, closures don't capture variables
Greetings! This might be a documentation issue as opposed to a coding issue, in that perhaps the correct answer is don't do that, but as this is not obvious, I am writing. The following code produces output most would not expect: package misc import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ object DemoBug extends App { val conf = new SparkConf() val sc = new SparkContext(conf) val rdd = sc.parallelize(List(A,B,C,D)) val str1 = A val rslt1 = rdd.filter(x = { x != A }).count val rslt2 = rdd.filter(x = { str1 != null x != A }).count println(DemoBug: rslt1 = + rslt1 + rslt2 = + rslt2)} This produces the output:DemoBug: rslt1 = 3 rslt2 = 0 Compiled with sbt:libraryDependencies += org.apache.spark % spark-core_2.10 % 1.1.0Run on an EC2 EMR instance with a recent image (hadoop 2.4.0, spark 1.1.0) If instead there is a proper main(), it works as expected. Thank you. Sincerely, Mike