Re: Is SPARK-3322 fixed in latest version of Spark?
ConnectionManager has been deprecated and is no longer used by default (NettyBlockTransferService is the replacement). Hopefully you would no longer see these messages unless you have explicitly flipped it back on. On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote: And also https://issues.apache.org/jira/browse/SPARK-3106 This one is still open. On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote: *Symotom:* Even sample job fails: $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10 Pi is roughly 3.140636 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(xxx,) not found WARN ConnectionManager: All connections not cleaned up Found https://issues.apache.org/jira/browse/SPARK-3322 But the code changes are not in newer version os Spark, however this jira is marked as fixed. Is this issue really fixed in latest version? If so, what is the related JIRA? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: S3 vs HDFS
Note that if you use multi-part upload, each part becomes 1 block, which allows for multiple concurrent readers. One would typically use fixed-size block sizes which align with Spark's default HDFS block size (64 MB, I think) to ensure the reads are aligned. On Sat, Jul 11, 2015 at 11:14 AM, Steve Loughran ste...@hortonworks.com wrote: seek() is very, very expensive on s3, even short forward seeks. If your code does a lot of, it will kill performance. (forward seeks are better in s3a, which with Hadoop 2.3 is now something safe to use, and in the s3 client that Amazon include in EMR), but its still sluggish. The other killers are -anything involving renaming files or directories -copy operations -listing lots of files. Finally, S3 is HDD backed,1 file == 1 block. In HDFS while you can have 3 processes reading different replicas of the same block of a file —giving 3x the bandwidth, disk bandwidth from an s3 object will be shared by all readers. The more readers: the worse performance On 9 Jul 2015, at 14:31, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: I recommend testing it for yourself. Even if you have no application, you can just run the spark-ec2 script, log in, run spark-shell and try reading files from an S3 bucket and from hdfs://master IP:9000/. (This is the ephemeral HDFS cluster, which uses SSD.) I just tested our application this way yesterday and found the SSD-based HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or the HDFS client library and protocol are just better than the S3 versions (which is HTTP-based and uses some 6-year-old libraries). On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam su...@sujee.net wrote: latency is much bigger for S3 (if that matters) And with HDFS you'd get data-locality that will boost your app performance. I did some light experimenting on this. see my presentation here for some benchmark numbers ..etc http://www.slideshare.net/sujee/hadoop-to-sparkv2 from slide# 34 cheers Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam ) teaching Spark http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com wrote: Are there any significant performance differences between reading text files from S3 and hdfs?
Re: All master are unreponsive issue
Are you seeing this after the app has already been running for some time, or just at the beginning? Generally, registration should only occur once initially, and a timeout would be due to the master not being accessible. Try telneting to the master IP/port from the machine on which the driver will run.
Re: s3 bucket access/read file
I think 2.6 failed to abruptly close streams that weren't fully read, which we observed as a huge performance hit. We had to backport the 2.7 improvements before being able to use it.
RE: ReduceByKey with a byte array as the key
Be careful shoving arbitrary binary data into a string, invalid utf characters can cause significant computational overhead in my experience. On Jun 11, 2015 10:09 AM, Mark Tse mark@d2l.com wrote: Makes sense – I suspect what you suggested should work. However, I think the overhead between this and using `String` would be similar enough to warrant just using `String`. Mark *From:* Sonal Goyal [mailto:sonalgoy...@gmail.com] *Sent:* June-11-15 12:58 PM *To:* Mark Tse *Cc:* user@spark.apache.org *Subject:* Re: ReduceByKey with a byte array as the key I think if you wrap the byte[] into an object and implement equals and hashcode methods, you may be able to do this. There will be the overhead of extra object, but conceptually it should work unless I am missing something. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ On Thu, Jun 11, 2015 at 9:27 PM, Mark Tse mark@d2l.com wrote: I would like to work with RDD pairs of Tuple2byte[], obj, but byte[]s with the same contents are considered as different values because their reference values are different. I didn't see any to pass in a custom comparer. I could convert the byte[] into a String with an explicit charset, but I'm wondering if there's a more efficient way. Also posted on SO: http://stackoverflow.com/q/30785615/2687324 Thanks, Mark
Re: RDD resiliency -- does it keep state?
Note that speculation is off by default to avoid these kinds of unexpected issues. On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran ste...@hortonworks.com wrote: It's worth adding that there's no guaranteed that re-evaluated work would be on the same host as before, and in the case of node failure, it is not guaranteed to be elsewhere. this means things that depend on host-local information is going to generate different numbers even if there are no other side effects. random number generation for seeding RDD.sample() would be a case in point here. There's also the fact that if you enable speculative execution, then operations may be repeated —even in the absence of any failure. If you are doing side effect work, or don't have an output committer whose actions are guaranteed to be atomic then you want to turn that option off. On 27 Mar 2015, at 19:46, Patrick Wendell pwend...@gmail.com wrote: If you invoke this, you will get at-least-once semantics on failure. For instance, if a machine dies in the middle of executing the foreach for a single partition, that will be re-executed on another machine. It could even fully complete on one machine, but the machine dies immediately before reporting the result back to the driver. This means you need to make sure the side-effects are idempotent, or use some transactional locking. Spark's own output operations, such as saving to Hadoop, use such mechanisms. For instance, in the case of Hadoop it uses the OutputCommitter classes. - Patrick On Fri, Mar 27, 2015 at 12:36 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark group, We haven't been able to find clear descriptions of how Spark handles the resiliency of RDDs in relationship to executing actions with side-effects. If you do an `rdd.foreach(someSideEffect)`, then you are doing a side-effect for each element in the RDD. If a partition goes down -- the resiliency rebuilds the data, but did it keep track of how far it go in the partition's set of data or will it start from the beginning again. So will it do at-least-once execution of foreach closures or at-most-once? thanks, Michal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark will process _temporary folder on S3 is very slow and always cause failure
Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be configured in Spark with: sparkConf.set(spark.hadoop.mapred.output.committer.class, classOf[DirectOutputCommitter].getName) On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote: I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
Re: Which OutputCommitter to use for S3?
Yes, unfortunately that direct dependency makes this injection much more difficult for saveAsParquetFile. On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee pl...@appier.com wrote: Thanks for the DirectOutputCommitter example. However I found it only works for saveAsHadoopFile. What about saveAsParquetFile? It looks like SparkSQL is using ParquetOutputCommitter, which is subclass of FileOutputCommitter. On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: FYI. We're currently addressing this at the Hadoop level in https://issues.apache.org/jira/browse/HADOOP-9565 Thomas Demoor On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Just to close the loop in case anyone runs into the same problem I had. By setting --hadoop-major-version=2 when using the ec2 scripts, everything worked fine. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3? Thanks. I think my problem might actually be the other way around. I'm compiling with hadoop 2, but when I startup Spark, using the ec2 scripts, I don't specify a -hadoop-major-version and the default is 1. I'm guessing that if I make that a 2 that it might work correctly. I'll try it and post a response. - Original Message - From: Mingyu Kim m...@palantir.com To: Darin McBeath ddmcbe...@yahoo.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:06 PM Subject: Re: Which OutputCommitter to use for S3? Cool, we will start from there. Thanks Aaron and Josh! Darin, it¹s likely because the DirectOutputCommitter is compiled with Hadoop 1 classes and you¹re running it with Hadoop 2. org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it became an interface in Hadoop 2. Mingyu On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote: Aaron. Thanks for the class. Since I'm currently writing Java based Spark applications, I tried converting your class to Java (it seemed pretty straightforward). I set up the use of the class as follows: SparkConf conf = new SparkConf() .set(spark.hadoop.mapred.output.committer.class, com.elsevier.common.DirectOutputCommitter); And I then try and save a file to S3 (which I believe should use the old hadoop apis). JavaPairRDDText, Text newBaselineRDDWritable = reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, Text.class, SequenceFileOutputFormat.class, org.apache.hadoop.io.compress.GzipCodec.class); But, I get the following error message. Exception in thread main java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapred.JobContext, but interface was expected at com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. java:68) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions .scala:1075) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:940) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc ala:902) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 71) at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) In my class, JobContext is an interface of type org.apache.hadoop.mapred.JobContext. Is there something obvious that I might be doing wrong (or messed up in the translation from Scala to Java) or something I should look into? I'm using Spark 1.2 with hadoop 2.4. Thanks. Darin. From: Aaron Davidson ilike...@gmail.com To: Andrew Ash and...@andrewash.com Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com; user@spark.apache.org user@spark.apache.org; Aaron Davidson aa...@databricks.com Sent: Saturday, February 21, 2015 7:01 PM Subject: Re: Which OutputCommitter to use for S3? Here is the class: https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e= You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only
Re: Having lots of FetchFailedException in join
However, Executors were dying when using Netty as well, so it is possible that the OOM was occurring then too. It is also possible only one of your Executors OOMs (due to a particularly large task) and the others display various exceptions while trying to fetch the shuffle blocks from the failed executor. I cannot explain the local FileNotFoundExcepions occurring on machines that were not throwing fatal errors, though -- typically I have only seen that happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it to begin the termination process which involves deleting its own shuffle files. It may then throw the FNF if other Executors request those files before it has completed its shutdown (and will throw a ConnectionFailed once it's completed terminating). On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai saisai.s...@intel.com wrote: I’ve no idea why Netty didn’t meet OOM issue, one possibility is that Netty uses direct memory to save each block, whereas NIO uses on-heap memory, so Netty occupies less on heap memory than NIO. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 4:14 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join Thanks. I was about to submit a ticket for this :) Also there's a ticket for sort-merge based groupbykey https://issues.apache.org/jira/browse/SPARK-3461 BTW, any idea why run with netty didn't output OOM error messages? It's very confusing in troubleshooting. Jianshi On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote: I think there’s a lot of JIRA trying to solve this problem ( https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge join is a good choice. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:55 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join There're some skew. 64 6164 0 SUCCESS PROCESS_LOCAL 200 / 2015/03/04 23:45:47 1.1 min 6 s 198.6 MB 21.1 GB 240.8 MB 59 6159 0 SUCCESS PROCESS_LOCAL 30 / 2015/03/04 23:45:47 44 s 5 s 200.7 MB 4.8 GB 154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space
Re: Having lots of FetchFailedException in join
Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:724) Jianshi On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson ilike...@gmail.com wrote: Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built
Re: Problem getting program to run on 15TB input
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote: Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also try `OFF_HEAP` (and use Tachyon). Burak On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote: My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins .set(spark.shuffle.consolidateFiles,true) // helpful for too many files open .set(spark.mesos.coarse, true) // helpful for MapOutputTracker errors? .set(spark.akka.frameSize,500) // helpful when using consildateFiles=true .set(spark.akka.askTimeout, 30) .set(spark.shuffle.compress,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.file.transferTo,false) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.core.connection.ack.wait.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html .set(spark.speculation,true) .set(spark.worker.timeout,600) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.akka.timeout,300) // http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html .set(spark.storage.blockManagerSlaveTimeoutMs,12) .set(spark.driver.maxResultSize,2048) // in response to error: Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) .set(spark.serializer,
Re: Worker and Nodes
Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though, then some will not be used, so you would want to increase parallelism further. (There are other rules-of-thumb -- for instance, it's generally good to have at least 2x more partitions than cores for straggler mitigation, but these are essentially just optimizations.) Further note that when you increase the number of Executors for the same set of resources (i.e., starting 10 Executors on a single machine instead of 1), you make Spark's job harder. Spark has to communicate in an all-to-all manner across Executors for shuffle operations, and it uses TCP sockets to do so whether or not the Executors happen to be on the same machine. So increasing Executors without increasing physical resources means Spark has to do more communication to do the same work. We expect that increasing the number of Executors by a factor of 10, given an increase in the number of physical resources by the same factor, would also improve performance by 10x. This is not always the case for the precise reason above (increased communication overhead), but typically we can get close. The actual observed improvement is very algorithm-dependent, though; for instance, some ML algorithms become hard to scale out past a certain point because the increase in communication overhead outweighs the increase in parallelism. On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, if I keep the number of instances constant and increase the degree of parallelism in steps, can I expect the performance to increase? Thank You On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you
Re: Which OutputCommitter to use for S3?
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu
Re: RangePartitioner in Spark 1.2.1
RangePartitioner does not actually provide a guarantee that all partitions will be equal sized (that is hard), and instead uses sampling to approximate equal buckets. Thus, it is possible that a bucket is left empty. If you want the specified behavior, you should define your own partitioner. It would look something like this (untested): class AlphabetPartitioner extends Partitioner { def numPartitions = 26 def getPartition(key: Any): Int = key match { case s: String = s(0).toUpper - 'A' } override def equals(other: Any): Boolean = other.isInstanceOf[AlphabetPartitioner] override def hashCode: Int = 0 } On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I just happened to search in google for something related to the RangePartitioner of spark, and found an old thread in this email list as here: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html I followed the code example mentioned in that email thread as following: scala import org.apache.spark.RangePartitioner import org.apache.spark.RangePartitioner scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, fox, gas, horse, index, jet, kitsch, long, moon, Neptune, ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, xeon, Yam, zebra)) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at console:13 scala rdd.keyBy(s = s(0).toUpper) res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at console:16 scala res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at console:18 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, s))).collect.foreach(println) The above example is clear for me to understand the meaning of the RangePartitioner, but to my surprise, I got the following result: *(0,apple)* *(0,Ball)* (1,cat) (2,dog) (3,Elephant) (4,fox) (5,gas) (6,horse) (7,index) (8,jet) (9,kitsch) (10,long) (11,moon) (12,Neptune) (13,ooze) (14,Pen) (15,quiet) (16,rose) (17,sun) (18,talk) (19,umbrella) (20,voice) (21,Walrus) (22,xeon) (23,Yam) (24,zebra) instead of a perfect range index from 0 to 25 in old email thread. Why is that? Is this a bug, or some new feature I don't understand? BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary release. Thanks Yong
Re: Shuffle write increases in spark 1.2
I think Xuefeng Wu's suggestion is likely correct. This different is more likely explained by the compression library changing versions than sort vs hash shuffle (which should not affect output size significantly). Others have reported that switching to lz4 fixed their issue. We should document this if this is the case. I wonder if we're asking Snappy to be super-low-overhead and as a result the new version does a better job of it (less overhead, less compression). On Sat, Feb 14, 2015 at 9:32 AM, Peng Cheng pc...@uow.edu.au wrote: I double check the 1.2 feature list and found out that the new sort-based shuffle manager has nothing to do with HashPartitioner :- Sorry for the misinformation. In another hand. This may explain increase in shuffle spill as a side effect of the new shuffle manager, let me revert spark.shuffle.manager to hash and see if it make things better (or worse, as the benchmark in https://issues.apache.org/jira/browse/SPARK-3280 indicates) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle read/write issue in spark 1.2
Did the problem go away when you switched to lz4? There was a change from the default compression codec fro 1.0 to 1.1, where we went from LZF to Snappy. I don't think there was any such change from 1.1 to 1.2, though. On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg praveen.g...@guavus.com wrote: We tried changing the compression codec from snappy to lz4. It did improve the performance but we are still wondering why default options didn’t work as claimed. From: Raghavendra Pandey raghavendra.pan...@gmail.com Date: Friday, 6 February 2015 1:23 pm To: Praveen Garg praveen.g...@guavus.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Shuffle read/write issue in spark 1.2 Even I observed the same issue. On Fri, Feb 6, 2015 at 12:19 AM, Praveen Garg praveen.g...@guavus.com wrote: Hi, While moving from spark 1.1 to spark 1.2, we are facing an issue where Shuffle read/write has been increased significantly. We also tried running the job by rolling back to spark 1.1 configuration where we set spark.shuffle.manager to hash and spark.shuffle.blockTransferService to nio. It did improve the performance a bit but it was still much worse than spark 1.1. The scenario seems similar to the bug raised sometime back https://issues.apache.org/jira/browse/SPARK-5081. Has anyone come across any similar issue? Please tell us if any configuration change can help. Regards, Praveen
Re: ephemeral-hdfs vs persistent-hdfs - performance
The latter would be faster. With S3, you want to maximize number of concurrent readers until you hit your network throughput limits. On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi if i have a 10GB file on s3 and set 10 partitions, would it be download whole file on master first and broadcast it or each worker would just read it's range from the file? Thanks, Peter On 2015-02-03 23:30, Sven Krasser wrote: Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so the usual limitations apply. To optimize throughput, you can use EBS volumes with provisioned IOPS and you can use EBS optimized instances. I don't have hard numbers at hand, but I'd expect this to be noticeably slower than using local SSDs. As far as only using S3 goes, it depends on your use case (i.e. what you plan on doing with the data while it is there). If you store it there in between running different applications, you can likely work around consistency issues. Also, if you use Amazon's EMRFS to access data in S3, you can use their new consistency feature ( https://aws.amazon.com/blogs/aws/emr-consistent-file-system/). Hope this helps! -Sven On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
Re: 2GB limit for partitions?
To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use
Re: Duplicate key when sorting BytesWritable with Kryo?
Ah, this is in particular an issue due to sort-based shuffle (it was not the case for hash-based shuffle, which would immediately serialize each record rather than holding many in memory at once). The documentation should be updated. On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Andrew, Here's a note from the doc for sequenceFile: * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. This should probably say direct cachingly *or directly shuffling*. To sort directly from a sequence file, the records need to be cloned first. -Sandy On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson andrew.row...@thomsonreuters.com wrote: I've found a strange issue when trying to sort a lot of data in HDFS using spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class that derives from BytesWritable (the value is also a BytesWritable). I'm using a custom KryoSerializer to serialize the underlying byte array (basically write the length and the byte array). My spark job looks like this: spark.sequenceFile(inputPath, classOf[CustomKey], classOf[BytesWritable]).sortByKey().map(t = t._1).saveAsTextFile(outputPath) CustomKey extends BytesWritable, adds a toString method and some other helper methods that extract and convert parts of the underlying byte[]. This should simply output a series of textfiles which contain the sorted list of keys. The problem is that under certain circumstances I get many duplicate keys. The number of records output is correct, but it appears that large chunks of the output are simply copies of the last record in that chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. This appears to happen only above certain input data volumes, and it appears to be when shuffle spills. For a job where shuffle spill for memory and disk = 0B, the data is correct. If there is any spill, I see the duplicate behaviour. Oddly, the shuffle write is much smaller when there's a spill. E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write, whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write. I'm guessing some sort of compression is happening on duplicate identical values? Oddly, I can fix this issue if I adjust my scala code to insert a map step before the call to sortByKey(): .map(t = (new CustomKey(t._1),t._2)) This constructor is just: public CustomKey(CustomKey left) { this.set(left); } Why does this work? I've no idea. The spark job is running in yarn-client mode with all the default configuration values set. Using the external shuffle service and disabling spill compression makes no difference. Is this a bug? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: performance of saveAsTextFile moving files from _temporary
Upon completion of the 2 hour part of the run, the files did not exist in the output directory? One thing that is done serially is deleting any remaining files from _temporary, so perhaps there was a lot of data remaining in _temporary but the committed data had already been moved. I am, unfortunately, not aware of other issues that would cause this to be so slow. On Tue, Jan 27, 2015 at 6:54 PM, Josh Walton j...@openbookben.com wrote: I'm not sure how to confirm how the moving is happening, however, one of the jobs just completed that I was talking about with 9k files of 4mb each. Spark UI showed the job being complete after ~2 hours. The last four hours of the job was just moving the files from _temporary to their final destination. The tasks for the write were definitely shown as complete, no logging is happening on the master or workers. The last line of my java code logs, but the job sits there as the moving of files happens. On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com wrote: This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: performance of saveAsTextFile moving files from _temporary
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Lost task - connection closed
It looks like something weird is going on with your object serialization, perhaps a funny form of self-reference which is not detected by ObjectOutputStream's typical loop avoidance. That, or you have some data structure like a linked list with a parent pointer and you have many thousand elements. Assuming the stack trace is coming from an executor, it is probably a problem with the objects you're sending back as results, so I would carefully examine these and maybe try serializing some using ObjectOutputStream manually. If your program looks like foo.map { row = doComplexOperation(row) }.take(10) you can also try changing it to foo.map { row = doComplexOperation(row); 1 }.take(10) to avoid serializing the result of that complex operation, which should help narrow down where exactly the problematic objects are coming from. On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Here is the first error I get at the executors: 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[handle-message-executor-16,5,main] java.lang.StackOverflowError at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) If you have any pointers for me on how to debug this, that would be very useful. I tried running with both spark 1.2.0 and 1.1.1, getting the same error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361p21371.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Lost task - connection closed
Please take a look at the executor logs (on both sides of the IOException) to see if there are other exceptions (e.g., OOM) which precede this one. Generally, the connections should not fail spontaneously. On Sun, Jan 25, 2015 at 10:35 PM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi, I am running a program that executes map-reduce jobs in a loop. The first time the loop runs, everything is ok. After that, it starts giving the following error, first it gives it for one task, then for more tasks and eventually the entire program fails: 15/01/26 01:41:25 WARN TaskSetManager: Lost task 10.0 in stage 15.0 (TID 1063, hostnameXX): java.io.IOException: Connection from hostnameXX/172.31.109.50:50808 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:98) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Can someone help me with debugging this ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lost-task-connection-closed-tp21361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 – How to change Default (Random) port ….
This was a regression caused by Netty Block Transfer Service. The fix for this just barely missed the 1.2 release, and you can see the associated JIRA here: https://issues.apache.org/jira/browse/SPARK-4837 Current master has the fix, and the Spark 1.2.1 release will have it included. If you don't want to rebuild from master or wait, then you can turn it off by setting spark.shuffle.blockTransferService to nio. On Sun, Jan 25, 2015 at 6:28 PM, Shailesh Birari sbirar...@gmail.com wrote: Can anyone please let me know ? I don't want to open all ports on n/w. So, am interested in the property by which this new port I can configure. Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306p21360.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro
Spark's network-common package depends on guava as a provided dependency in order to avoid conflicting with other libraries (e.g., Hadoop) that depend on specific versions. com/google/common/base/Preconditions has been present in Guava since version 2, so this is likely a dependency not found rather than wrong version of dependency issue. To resolve this, please depend on some version of Guava (14.0.1 is guaranteed to work, as should any other version from the past few years). On Tue, Jan 20, 2015 at 6:16 PM, Shailesh Birari sbirar...@gmail.com wrote: Hi Frank, Its a normal eclipse project where I added Scala and Spark libraries as user libraries. Though, I am not attaching any hadoop libraries, in my application code I have following line. System.setProperty(hadoop.home.dir, C:\\SB\\HadoopWin) This Hadoop home dir contains winutils.exe only. Don't think that its an issue. Please suggest. Thanks, Shailesh On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: Shailesh, To add, are you packaging Hadoop in your app? Hadoop will pull in Guava. Not sure if you are using Maven (or what) to build, but if you can pull up your builds dependency tree, you will likely find com.google.guava being brought in by one of your dependencies. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 20, 2015, at 5:13 PM, Shailesh Birari sbirar...@gmail.com wrote: Hello, I double checked the libraries. I am linking only with Spark 1.2. Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and nothing else. Thanks, Shailesh On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen so...@cloudera.com wrote: Guava is shaded in Spark 1.2+. It looks like you are mixing versions of Spark then, with some that still refer to unshaded Guava. Make sure you are not packaging Spark with your app and that you don't have other versions lying around. On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari sbirar...@gmail.com wrote: Hello, I recently upgraded my setup from Spark 1.1 to Spark 1.2. My existing applications are working fine on ubuntu cluster. But, when I try to execute Spark MLlib application from Eclipse (Windows node) it gives java.lang.NoClassDefFoundError: com/google/common/base/Preconditions exception. Note, 1. With Spark 1.1 this was working fine. 2. The Spark 1.2 jar files are linked in Eclipse project. 3. Checked the jar -tf output and found the above com.google.common.base is not present. -Exception log: Exception in thread main java.lang.NoClassDefFoundError: com/google/common/base/Preconditions at org.apache.spark.network.client.TransportClientFactory.init(TransportClientFactory.java:94) at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77) at org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194) at org.apache.spark.SparkContext.init(SparkContext.scala:340) at org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74) at org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 7 more - jar -tf output: consb2@CONSB2A /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions org/spark-project/guava/common/base/Preconditions.class org/spark-project/guava/common/math/MathPreconditions.class com/clearspring/analytics/util/Preconditions.class parquet/Preconditions.class com/google/inject/internal/util/$Preconditions.class --- Please help me in resolving this. Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html Sent from the Apache
Re: Serializability: for vs. while loops
Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from closures to make otherwise-unserializable objects serializable. My understanding is, in particular, that the closure cleaner will null out unused fields in the closure, but cannot go past the first level of depth (i.e., it will not follow field references and null out *their *unused, and possibly unserializable, references), because this could end up mutating state outside of the closure itself. Thus, the extra level of depth of the closure that was introduced by the anonymous class (where presumably the outer this pointer is considered used by the closure cleaner) is sufficient to make it unserializable. While loops, on the other hand, involve none of this trickery, and everyone is happy. On Wed, Jan 14, 2015 at 11:37 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, sorry, I don't like questions about serializability myself, but still... Can anyone give me a hint why for (i - 0 to (maxId - 1)) { ... } throws a NotSerializableException in the loop body while var i = 0 while (i maxId) { // same code as in the for loop i += 1 } works fine? I guess there is something fundamentally different in the way Scala realizes for loops? Thanks Tobias
Re: use netty shuffle for network cause high gc time
What version are you running? I think spark.shuffle.use.netty was a valid option only in Spark 1.1, where the Netty stuff was strictly experimental. Spark 1.2 contains an officially supported and much more thoroughly tested version under the property spark.shuffle.blockTransferService, which is set to netty by default. On Tue, Jan 13, 2015 at 9:26 PM, lihu lihu...@gmail.com wrote: Hi, I just test groupByKey method on a 100GB data, the cluster is 20 machine, each with 125GB RAM. At first I set conf.set(spark.shuffle.use.netty, false) and run the experiment, and then I set conf.set(spark.shuffle.use.netty, true) again to re-run the experiment, but at the latter case, the GC time is much higher。 I thought the latter one should be better, but it is not. So when should we use netty for network shuffle fetching?
Re: FileNotFoundException in appcache shuffle files
As Jerry said, this is not related to shuffle file consolidation. The unique thing about this problem is that it's failing to find a file while trying to _write_ to it, in append mode. The simplest explanation for this would be that the file is deleted in between some check for existence and opening the file for append. The deletion of such files as a race condition with writing them (on the map side) would be most easily explained by a JVM shutdown event, for instance caused by a fatal error such as OutOfMemoryError. So, as Ilya said, please look for another exception possibly preceding this one. On Sat, Jan 10, 2015 at 12:16 PM, lucio raimondo luxmea...@hotmail.com wrote: Hey, I am having a similar issue, did you manage to find a solution yet? Please check my post below for reference: http://apache-spark-user-list.1001560.n3.nabble.com/IOError-Errno-2-No-such-file-or-directory-tmp-spark-9e23f17e-2e23-4c26-9621-3cb4d8b832da-tmp3i3xno-td21076.html Thank you, Lucio -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p21077.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId
Do note that this problem may be fixed in Spark 1.2, as we changed the default transfer service to use a Netty-based one rather than the ConnectionManager. On Thu, Jan 8, 2015 at 7:05 AM, Spidy yoni...@gmail.com wrote: Hi, Can you please explain which settings did you changed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ConnectionManager-Corresponding-SendingConnection-to-ConnectionManagerId-tp17050p21035.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDDs being cleaned too fast
The ContextCleaner uncaches RDDs that have gone out of scope on the driver. So it's possible that the given RDD is no longer reachable in your program's control flow, or else it'd be a bug in the ContextCleaner. On Wed, Dec 10, 2014 at 5:34 PM, ankits ankitso...@gmail.com wrote: I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too fast. How can i inspect the size of RDD in memory and get more information about why it was cleaned up. There should be more than enough memory available on the cluster to store them, and by default, the spark.cleaner.ttl is infinite, so I want more information about why this is happening and how to prevent it. Spark just logs this when removing RDDs: [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running two different Spark jobs vs multi-threading RDDs
You can actually submit multiple jobs to a single SparkContext in different threads. In the case you mentioned with 2 stages having a common parent, both will wait for the parent stage to complete and then the two will execute in parallel, sharing the cluster resources. Solutions that submit multiple applications are also reasonable, but then you have to manage the job dependencies yourself. On Sat, Dec 6, 2014 at 8:41 AM, Corey Nolet cjno...@gmail.com wrote: Reading the documentation a little more closely, I'm using the wrong terminology. I'm using stages to refer to what spark is calling a job. I guess application (more than one spark context) is what I'm asking about On Dec 5, 2014 5:19 PM, Corey Nolet cjno...@gmail.com wrote: I've read in the documentation that RDDs can be run concurrently when submitted in separate threads. I'm curious how the scheduler would handle propagating these down to the tasks. I have 3 RDDs: - one RDD which loads some initial data, transforms it and caches it - two RDDs which use the cached RDD to provide reports I'm trying to figure out how the resources will be scheduled to perform these stages if I were to concurrently run the two RDDs that depend on the first RDD. Would the two RDDs run sequentially? Will they both run @ the same time and be smart about how they are caching? Would this be a time when I'd want to use Tachyon instead and run this as 2 separate physical jobs: one to place the shared data in the RAMDISK and one to run the two dependent RDDs concurrently? Or would it even be best in that case to run 3 completely separate jobs? We're planning on using YARN so there's 2 levels of scheduling going on. We're trying to figure out the best way to utilize the resources so that we are fully saturating the system and making sure there's constantly work being done rather than anything spinning gears waiting on upstream processing to occur (in mapreduce, we'd just submit a ton of jobs and have them wait in line).
Re: Announcing Spark 1.1.1!
Because this was a maintenance release, we should not have introduced any binary backwards or forwards incompatibilities. Therefore, applications that were written and compiled against 1.1.0 should still work against a 1.1.1 cluster, and vice versa. On Wed, Dec 3, 2014 at 1:30 PM, Andrew Or and...@databricks.com wrote: By the Spark server do you mean the standalone Master? It is best if they are upgraded together because there have been changes to the Master in 1.1.1. Although it might just work, it's highly recommended to restart your cluster manager too. 2014-12-03 13:19 GMT-08:00 Romi Kuntsman r...@totango.com: About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote: I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
Re: S3NativeFileSystem inefficient implementation when calling sc.textFile
Note that it does not appear that s3a solves the original problems in this thread, which are on the Spark side or due to the fact that metadata listing in S3 is slow simply due to going over the network. On Sun, Nov 30, 2014 at 10:07 AM, David Blewett da...@dawninglight.net wrote: You might be interested in the new s3a filesystem in Hadoop 2.6.0 [1]. 1. https://issues.apache.org/jira/plugins/servlet/mobile#issue/HADOOP-10400 On Nov 26, 2014 12:24 PM, Aaron Davidson ilike...@gmail.com wrote: Spark has a known problem where it will do a pass of metadata on a large number of small files serially, in order to find the partition information prior to starting the job. This will probably not be repaired by switching the FS impl. However, you can change the FS being used like so (prior to the first usage): sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) On Wed, Nov 26, 2014 at 1:47 AM, Tomer Benyamini tomer@gmail.com wrote: Thanks Lalit; Setting the access + secret keys in the configuration works even when calling sc.textFile. Is there a way to select which hadoop s3 native filesystem implementation would be used at runtime using the hadoop configuration? Thanks, Tomer On Wed, Nov 26, 2014 at 11:08 AM, lalit1303 la...@sigmoidanalytics.com wrote: you can try creating hadoop Configuration and set s3 configuration i.e. access keys etc. Now, for reading files from s3 use newAPIHadoopFile and pass the config object here along with key, value classes. - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p19845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: S3NativeFileSystem inefficient implementation when calling sc.textFile
Spark has a known problem where it will do a pass of metadata on a large number of small files serially, in order to find the partition information prior to starting the job. This will probably not be repaired by switching the FS impl. However, you can change the FS being used like so (prior to the first usage): sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) On Wed, Nov 26, 2014 at 1:47 AM, Tomer Benyamini tomer@gmail.com wrote: Thanks Lalit; Setting the access + secret keys in the configuration works even when calling sc.textFile. Is there a way to select which hadoop s3 native filesystem implementation would be used at runtime using the hadoop configuration? Thanks, Tomer On Wed, Nov 26, 2014 at 11:08 AM, lalit1303 la...@sigmoidanalytics.com wrote: you can try creating hadoop Configuration and set s3 configuration i.e. access keys etc. Now, for reading files from s3 use newAPIHadoopFile and pass the config object here along with key, value classes. - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p19845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bug in Accumulators...
As Mohit said, making Main extend Serializable should fix this example. In general, it's not a bad idea to mark the fields you don't want to serialize (e.g., sc and conf in this case) as @transient as well, though this is not the issue in this case. Note that this problem would not have arisen in your very specific example if you used a while loop instead of a for-each loop, but that's really more of a happy coincidence than something you should rely on, as nested lambdas are virtually unavoidable in Scala. On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi mohitja...@gmail.com wrote: perhaps the closure ends up including the main object which is not defined as serializable...try making it a case object or object main extends Serializable. On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote: I posted several examples in java at http://lordjoesoftware.blogspot.com/ Generally code like this works and I show how to accumulate more complex values. // Make two accumulators using Statistics final AccumulatorInteger totalLetters= ctx.accumulator(0L, ttl); JavaRDDstring lines = ... JavaRDDstring words = lines.flatMap(new FlatMapFunctionString, String() { @Override public Iterablestring call(final String s) throws Exception { // Handle accumulator here totalLetters.add(s.length()); // count all letters }); Long numberCalls = totalCounts.value(); I believe the mistake is to pass the accumulator to the function rather than letting the function find the accumulator - I do this in this case by using a final local variable -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Given multiple .filter()'s, is there a way to set the order?
In the situation you show, Spark will pipeline each filter together, and will apply each filter one at a time to each row, effectively constructing an statement. You would only see a performance difference if the filter code itself is somewhat expensive, then you would want to only execute it on a smaller set of rows. Otherwise, the runtime difference between a == b b == c c ==d is minimal when compared to a == b b == c c == d, the latter being sort of the worst-case scenario as it would always run all filters (though as I said, Spark acts like the former). Spark does not reorder the filters automatically. It uses the explicit ordering you provide. On Fri, Nov 14, 2014 at 10:20 AM, YaoPau jonrgr...@gmail.com wrote: I have an RDD x of millions of STRINGs, each of which I want to pass through a set of filters. My filtering code looks like this: x.filter(filter#1, which will filter out 40% of data). filter(filter#2, which will filter out 20% of data). filter(filter#3, which will filter out 2% of data). filter(filter#4, which will filter out 1% of data) There is no ordering requirement (filter #2 does not depend on filter #1, etc), but the filters are drastically different in the % of rows they should eliminate. What I'd like is an ordering similar to a || statement, where if it fails on filter#1 the row automatically gets filtered out before the other three filters run. But when I play around with the ordering of the filters, the runtime doesn't seem to change. Is Spark somehow intelligently guessing how effective each filter will be and ordering it correctly regardless of how I order them? If not, is there I way I can set the filter order? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Given-multiple-filter-s-is-there-a-way-to-set-the-order-tp18957.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: data locality, task distribution
You mentioned that the 3.1 min run was the one that did the actual caching, so did that run before any data was cached, or after? I would recommend checking the Storage tab of the UI, and clicking on the RDD, to see both how full the executors' storage memory is (which may be significantly less than the instance's memory). When a task completes over data that should be cached, it will try to cache it, so it's pretty weird that you're seeing 100% cache with memory to spare. It's possible that some partitions are significantly larger than others, which may cause us to not attempt to cache it (defined by spark.storage.unrollFraction). You can also try increasing the spark.locality.wait flag to ensure that Spark will wait longer for tasks to complete before running them non-locally. One possible situation is that a node hits GC for a few seconds, a task is scheduled non-locally, and then attempting to read from the other executors' cache is much more expensive than computing the data initially. Increasing the locality wait beyond the GC time would avoid this. On Thu, Nov 13, 2014 at 9:08 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I am seeing skewed execution times. As far as I can tell, they are attributable to differences in data locality - tasks with locality PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest. This seems entirely as it should be - the question is, why the different locality levels? I am seeing skewed caching, as I mentioned before - in the case I isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and 6%. However, the total amount was significantly less than the memory of any single node, so I don't think they could have overpopulated their cache. I am occasionally seeing task failures, but the re-execute themselves, and work fine the next time. Yet I'm still seeing incomplete caching (from 65% cached up to 100%, depending on the run). I shouldn't have much variance in task time - this is simply a foreach over the data, adding to an accumulator, and the data is completely randomly distributed, so should be pretty even overall. I am seeing GC regressions occasionally - they slow a request from about 2 seconds to about 5 seconds. They 8 minute slowdown seems to be solely attributable to the data locality issue, as far as I can tell. There was some further confusion though in the times I mentioned - the list I gave (3.1 min, 2 seconds, ... 8 min) were not different runs with different cache %s, they were iterations within a single run with 100% caching. -Nathan On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson ilike...@gmail.com wrote: Spark's scheduling is pretty simple: it will allocate tasks to open cores on executors, preferring ones where the data is local. It even performs delay scheduling, which means waiting a bit to see if an executor where the data resides locally becomes available. Are yours tasks seeing very skewed execution times? If some tasks are taking a very long time and using all the resources on a node, perhaps the other nodes are quickly finishing many tasks, and actually overpopulating their caches. If a particular machine were not overpopulating its cache, and there are no failures, then you should see 100% cached after the first run. It's also strange that running totally uncached takes 3.1 minutes, but running 80-90% cached may take 8 minutes. Does your workload produce nondeterministic variance in task times? Was it a single straggler, or many tasks, that was keeping the job from finishing? It's not too uncommon to see occasional performance regressions while caching due to GC, though 2 seconds to 8 minutes is a bit extreme. On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Sorry, I think I was not clear in what I meant. I didn't mean it went down within a run, with the same instance. I meant I'd run the whole app, and one time, it would cache 100%, and the next run, it might cache only 83% Within a run, it doesn't change. On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com wrote: The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case? On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Can anyone point me to a good primer on how spark decides where to send what task, how it distributes them, and how it determines data locality? I'm trying a pretty simple task - it's doing a foreach over cached data, accumulating some (relatively complex) values. So I see several inconsistencies I don't understand: (1) If I run it a couple times, as separate applications (i.e., reloading, recaching, etc), I will get different %'s cached each time. I've got about 5x as much memory
Re: data locality, task distribution
The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case? On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Can anyone point me to a good primer on how spark decides where to send what task, how it distributes them, and how it determines data locality? I'm trying a pretty simple task - it's doing a foreach over cached data, accumulating some (relatively complex) values. So I see several inconsistencies I don't understand: (1) If I run it a couple times, as separate applications (i.e., reloading, recaching, etc), I will get different %'s cached each time. I've got about 5x as much memory as I need overall, so it isn't running out. But one time, 100% of the data will be cached; the next, 83%, the next, 92%, etc. (2) Also, the data is very unevenly distributed. I've got 400 partitions, and 4 workers (with, I believe, 3x replication), and on my last run, my distribution was 165/139/25/71. Is there any way to get spark to distribute the tasks more evenly? (3) If I run the problem several times in the same execution (to take advantage of caching etc.), I get very inconsistent results. My latest try, I get: - 1st run: 3.1 min - 2nd run: 2 seconds - 3rd run: 8 minutes - 4th run: 2 seconds - 5th run: 2 seconds - 6th run: 6.9 minutes - 7th run: 2 seconds - 8th run: 2 seconds - 9th run: 3.9 minuts - 10th run: 8 seconds I understand the difference for the first run; it was caching that time. Later times, when it manages to work in 2 seconds, it's because all the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the tasks end up with locality level ANY. Why would that change when running the exact same task twice in a row on cached data? Any help or pointers that I could get would be much appreciated. Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: data locality, task distribution
Spark's scheduling is pretty simple: it will allocate tasks to open cores on executors, preferring ones where the data is local. It even performs delay scheduling, which means waiting a bit to see if an executor where the data resides locally becomes available. Are yours tasks seeing very skewed execution times? If some tasks are taking a very long time and using all the resources on a node, perhaps the other nodes are quickly finishing many tasks, and actually overpopulating their caches. If a particular machine were not overpopulating its cache, and there are no failures, then you should see 100% cached after the first run. It's also strange that running totally uncached takes 3.1 minutes, but running 80-90% cached may take 8 minutes. Does your workload produce nondeterministic variance in task times? Was it a single straggler, or many tasks, that was keeping the job from finishing? It's not too uncommon to see occasional performance regressions while caching due to GC, though 2 seconds to 8 minutes is a bit extreme. On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Sorry, I think I was not clear in what I meant. I didn't mean it went down within a run, with the same instance. I meant I'd run the whole app, and one time, it would cache 100%, and the next run, it might cache only 83% Within a run, it doesn't change. On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com wrote: The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case? On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: Can anyone point me to a good primer on how spark decides where to send what task, how it distributes them, and how it determines data locality? I'm trying a pretty simple task - it's doing a foreach over cached data, accumulating some (relatively complex) values. So I see several inconsistencies I don't understand: (1) If I run it a couple times, as separate applications (i.e., reloading, recaching, etc), I will get different %'s cached each time. I've got about 5x as much memory as I need overall, so it isn't running out. But one time, 100% of the data will be cached; the next, 83%, the next, 92%, etc. (2) Also, the data is very unevenly distributed. I've got 400 partitions, and 4 workers (with, I believe, 3x replication), and on my last run, my distribution was 165/139/25/71. Is there any way to get spark to distribute the tasks more evenly? (3) If I run the problem several times in the same execution (to take advantage of caching etc.), I get very inconsistent results. My latest try, I get: - 1st run: 3.1 min - 2nd run: 2 seconds - 3rd run: 8 minutes - 4th run: 2 seconds - 5th run: 2 seconds - 6th run: 6.9 minutes - 7th run: 2 seconds - 8th run: 2 seconds - 9th run: 3.9 minuts - 10th run: 8 seconds I understand the difference for the first run; it was caching that time. Later times, when it manages to work in 2 seconds, it's because all the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the tasks end up with locality level ANY. Why would that change when running the exact same task twice in a row on cached data? Any help or pointers that I could get would be much appreciated. Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Bug in Accumulators...
This may be due in part to Scala allocating an anonymous inner class in order to execute the for loop. I would expect if you change it to a while loop like var i = 0 while (i 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) i += 1 } then the problem may go away. I am not super familiar with the closure cleaner, but I believe that we cannot prune beyond 1 layer of references, so the extra class of nesting may be screwing something up. If this is the case, then I would also expect replacing the accumulator with any other reference to the enclosing scope (such as a broadcast variable) would have the same result. On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu zsxw...@gmail.com wrote: Could you provide all pieces of codes which can reproduce the bug? Here is my test code: import org.apache.spark._ import org.apache.spark.SparkContext._ object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName(SimpleApp) val sc = new SparkContext(conf) val accum = sc.accumulator(0) for (i - 1 to 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) } sc.stop() } } It works fine both in client and cluster. Since this is a serialization bug, the outer class does matter. Could you provide it? Is there a SparkContext field in the outer class? Best Regards, Shixiong Zhu 2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch: I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works if I run it in local mode! ) If I put the accumulator inside the for loop, everything will work fine. I guess the bug is that an accumulator can be applied to JUST one RDD. Still another undocumented 'feature' of Spark that no one from the people who maintain Spark is willing to solve or at least to tell us about ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark speed performance
coalesce() is a streaming operation if used without the second parameter, it does not put all the data in RAM. If used with the second parameter (shuffle = true), then it performs a shuffle, but still does not put all the data in RAM. On Sat, Nov 1, 2014 at 12:09 PM, jan.zi...@centrum.cz wrote: Now I am getting to problems using: distData = sc.textFile(sys.argv[2]).coalesce(10) The problem is that it seems that Spark is trying to put all the data to RAM first and then perform coalesce. Do you know if there is something that would do coalesce on fly with for example fixed size of the partition? Do you think that something like this is possible? Unfortunately I am not able to find anything like this in the Spark documentation. Thank you in advance for any advices or suggestions. Best regards, Jan __ Thank you very much lot of very small json files was exactly the speed performance problem, using coalesce makes my Spark program to run on single node only twice slower (even with starting Spark) than single node Python program, which is acceptable. Jan __ Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle issues in the current master
You may be running into this issue: https://issues.apache.org/jira/browse/SPARK-4019 You could check by having 2000 or fewer reduce partitions. On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote: PS, sorry for spamming the mailing list. Based my knowledge, both spark.shuffle.spill.compress and spark.shuffle.compress are default to true, so in theory, we should not run into this issue if we don't change any setting. Is there any other big we run into? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:37 PM, DB Tsai dbt...@dbtsai.com wrote: Or can it be solved by setting both of the following setting into true for now? spark.shuffle.spill.compress true spark.shuffle.compress ture Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 1:34 PM, DB Tsai dbt...@dbtsai.com wrote: It seems that this issue should be addressed by https://github.com/apache/spark/pull/2890 ? Am I right? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Oct 22, 2014 at 11:54 AM, DB Tsai dbt...@dbtsai.com wrote: Hi all, With SPARK-3948, the exception in Snappy PARSING_ERROR is gone, but I've another exception now. I've no clue about what's going on; does anyone run into similar issue? Thanks. This is the configuration I use. spark.rdd.compress true spark.shuffle.consolidateFiles true spark.shuffle.manager SORT spark.akka.frameSize 128 spark.akka.timeout 600 spark.core.connection.ack.wait.timeout 600 spark.core.connection.auth.wait.timeout 300 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2325) java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) java.io.ObjectInputStream.init(ObjectInputStream.java:299) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaDeserializationStream.init(JavaSerializer.scala:57) org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:351) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Getting spark to use more than 4 cores on Amazon EC2
Another wild guess, if your data is stored in S3, you might be running into an issue where the default jets3t properties limits the number of parallel S3 connections to 4. Consider increasing the max-thread-counts from here: http://www.jets3t.org/toolkit/configuration.html. On Tue, Oct 21, 2014 at 10:39 AM, Andy Davidson a...@santacruzintegration.com wrote: On a related note, how are you submitting your job? I have a simple streaming proof of concept and noticed that everything runs on my master. I wonder if I do not have enough load for spark to push tasks to the slaves. Thanks Andy From: Daniel Mahler dmah...@gmail.com Date: Monday, October 20, 2014 at 5:22 PM To: Nicholas Chammas nicholas.cham...@gmail.com Cc: user user@spark.apache.org Subject: Re: Getting spark to use more than 4 cores on Amazon EC2 I am using globs though raw = sc.textFile(/path/to/dir/*/*) and I have tons of files so 1 file per partition should not be a problem. On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The biggest danger with gzipped files is this: raw = sc.textFile(/path/to/file.gz, 8) raw.getNumPartitions()1 You think you’re telling Spark to parallelize the reads on the input, but Spark cannot parallelize reads against gzipped files. So 1 gzipped file gets assigned to 1 partition. It might be a nice user hint if Spark warned when parallelism is disabled by the input format. Nick On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote: Hi Nicholas, Gzipping is a an impressive guess! Yes, they are. My data sets are too large to make repartitioning viable, but I could try it on a subset. I generally have many more partitions than cores. This was happenning before I started setting those configs. thanks Daniel On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you dealing with gzipped files by any chance? Does explicitly repartitioning your RDD to match the number of cores in your cluster help at all? How about if you don't specify the configs you listed and just go with defaults all around? On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com wrote: I launch the cluster using vanilla spark-ec2 scripts. I just specify the number of slaves and instance type On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com wrote: I usually run interactively from the spark-shell. My data definitely has more than enough partitions to keep all the workers busy. When I first launch the cluster I first do: + cat EOF ~/spark/conf/spark-defaults.conf spark.serializerorg.apache.spark.serializer.KryoSerializer spark.rdd.compress true spark.shuffle.consolidateFiles true spark.akka.frameSize 20 EOF copy-dir /root/spark/conf spark/sbin/stop-all.sh sleep 5 spark/sbin/start-all.sh + before starting the spark-shell or running any jobs. On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Perhaps your RDD is not partitioned enough to utilize all the cores in your system. Could you post a simple code snippet and explain what kind of parallelism you are seeing for it? And can you report on how many partitions your RDDs have? On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com wrote: I am launching EC2 clusters using the spark-ec2 scripts. My understanding is that this configures spark to use the available resources. I can see that spark will use the available memory on larger istance types. However I have never seen spark running at more than 400% (using 100% on 4 cores) on machines with many more cores. Am I misunderstanding the docs? Is it just that high end ec2 instances get I/O starved when running spark? It would be strange if that consistently produced a 400% hard limit though. thanks Daniel
Re: input split size
The minPartitions argument of textFile/hadoopFile cannot decrease the number of splits past the physical number of blocks/files. So if you have 3 HDFS blocks, asking for 2 minPartitions will still give you 3 partitions (hence the min). It can, however, convert a file with fewer HDFS blocks into more (so you could ask for and get 4 partitions), assuming the blocks are splittable. HDFS blocks are usually splittable, but if it's compressed with something like bzip2, it would not be. If you wish to combine splits from a larger file, you can use RDD#coalesce. With shuffle=false, this will simply concatenate partitions, but it does not provide any ordering guarantees (it uses an algorithm which attempts to coalesce co-located partitions, to maintain locality information). coalesce() with shuffle=true causes all of the elements will be shuffled around randomly into new partitions, which is an expensive operation but guarantees uniformity of data distribution. On Sat, Oct 18, 2014 at 10:47 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Does it retain the order if its pulling from the hdfs blocks, meaning if file1 = a, b, c partition in order if I convert to 2 partition read will it map to ab, c or a, bc or it can also be a, cb ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, Oct 18, 2014 at 9:09 AM, Ilya Ganelin ilgan...@gmail.com wrote: Also - if you're doing a text file read you can pass the number of resulting partitions as the second argument. On Oct 17, 2014 9:05 PM, Larry Liu larryli...@gmail.com wrote: Thanks, Andrew. What about reading out of local? On Fri, Oct 17, 2014 at 5:38 PM, Andrew Ash and...@andrewash.com wrote: When reading out of HDFS it's the HDFS block size. On Fri, Oct 17, 2014 at 5:27 PM, Larry Liu larryli...@gmail.com wrote: What is the default input split size? How to change it?
Re: How to change the values in Array of Bytes
More of a Scala question than Spark, but apply here can be written with just parentheses like this: val array = Array.fill[Byte](10)(0) if (array(index) == 0) { array(index) = 1 } The second instance of array(index) = 1 is actually not calling apply, but update. It's a scala-ism that's usually invisible. The above is equivalent to: if (array.apply(index) == 0) { array.update(index, 1) } On Sat, Sep 6, 2014 at 2:09 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have an array of bytes and I have filled the array with 0 in all the postitions. *var Array = Array.fill[Byte](10)(0)* Now, if certain conditions are satisfied, I want to change some elements of the array to 1 instead of 0. If I run, *if (Array.apply(index)==0) Array.apply(index) = 1* it returns me an error. But if I assign *Array.apply(index) *to a variable and do the same thing then it works. I do not want to assign this to variables because if I do this, I would be creating a lot of variables. Can anyone tell me a method to do this? Thank You
Re: error: type mismatch while Union
Are you doing this from the spark-shell? You're probably running into https://issues.apache.org/jira/browse/SPARK-1199 which should be fixed in 1.1. On Sat, Sep 6, 2014 at 3:03 AM, Dhimant dhimant84.jays...@gmail.com wrote: I am using Spark version 1.0.2 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547p13618.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: question on replicate() in blockManager.scala
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if check, perhaps obscuring its existence. On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } for (peer: BlockManagerId - cachedPeers) { val start = System.nanoTime data.rewind() logDebug(Try to replicate BlockId + blockId + once; The size of the data is + data.limit() + Bytes. To node: + peer) if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), new ConnectionManagerId(peer.host, peer.port))) { logError(Failed to call syncPutBlock to + peer) } logDebug(Replicated BlockId + blockId + once used + (System.nanoTime - start) / 1e6 + s; The size of the data is + data.limit() + bytes.) } I get the flow of this code. But, I dont find any method being called for actually writing the data into the set of peers chosen for replication. Where exaclty is the replication happening? Thank you!! -Karthik
Re: Getting the type of an RDD in spark AND pyspark
Pretty easy to do in Scala: rdd.elementClassTag.runtimeClass You can access this method from Python as well by using the internal _jrdd. It would look something like this (warning, I have not tested it): rdd._jrdd.classTag().runtimeClass() (The method name is classTag for JavaRDDLike, and elementClassTag for Scala's RDD.) On Thu, Sep 4, 2014 at 1:32 PM, esamanas evan.sama...@gmail.com wrote: Hi, I'm new to spark and scala, so apologies if this is obvious. Every RDD appears to be typed, which I can see by seeing the output in the spark-shell when I execute 'take': scala val t = sc.parallelize(Array(1,2,3)) t: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at console:12 scala t.take(3) res4: Array[Int] = Array(1, 2, 3) scala val u = sc.parallelize(Array(1,Array(2,2,2,2,2),3)) u: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[3] at parallelize at console:12 scala u.take(3) res5: Array[Any] = Array(1, Array(2, 2, 2, 2, 2), 3) Array type stays the same even if only one type returned. scala u.take(1) res6: Array[Any] = Array(1) Is there some way to just get the name of the type of the entire RDD from some function call? Also, I would really like this same functionality in pyspark, so I'm wondering if that exists on that side, since clearly the underlying RDD is typed (I'd be fine with either the Scala or Python type name). Thank you, Evan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-type-of-an-RDD-in-spark-AND-pyspark-tp13498.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stage failure in BlockManager due to FileNotFoundException on long-running streaming job
This is likely due to a bug in shuffle file consolidation (which you have enabled) which was hopefully fixed in 1.1 with this patch: https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd Until 1.0.3 or 1.1 are released, the simplest solution is to disable spark.shuffle.consolidateFiles. (You could also try backporting the fixes yourself if you really need consolidated files.) On Wed, Aug 20, 2014 at 9:28 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: This is a long running Spark Streaming job running in YARN, Spark v1.0.2 on CDH5. The jobs will run for about 34-37 hours then die due to this FileNotFoundException. There’s very little CPU or RAM usage, I’m running 2 x cores, 2 x executors, 4g memory, YARN cluster mode. Here’s the stack trace that I pulled from the History server: Job aborted due to stage failure: Task 34331.0:1 failed 4 times, most recent failure: Exception failure in TID 902905 on host host05: java.io.FileNotFoundException: /data02/yarn/nm/usercache/sfiorito/appcache/application_1402494159106_0524/spark-local-20140818181035-079a/29/merged_shuffle_9809_1_0 (No such file or directory) java.io.RandomAccessFile.open(Native Method) java.io.RandomAccessFile.init(RandomAccessFile.java:241) org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:203) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.getLocalBlocks(BlockFetcherIterator.scala:203) org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.initialize(BlockFetcherIterator.scala:234) org.apache.spark.storage.BlockManager.getMultiple(BlockManager.scala:537) org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:76) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:133) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:123) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:123) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace:
Re: s3:// sequence file startup time
The driver must initially compute the partitions and their preferred locations for each part of the file, which results in a serial getFileBlockLocations() on each part. However, I would expect this to take several seconds, not minutes, to perform on 1000 parts. Is your driver inside or outside of AWS? There is an order of magnitude difference in the latency of S3 requests if you're running outside of AWS. We have also experienced an excessive slowdown in the metadata lookups using Hadoop 2 versus Hadoop 1, likely due to the differing jets3t library versions. If you're using Hadoop 2, you might try downgrading to Hadoop 1.2.1 and seeing if the startup time decreases. On Sat, Aug 16, 2014 at 6:46 PM, kmatzen kmat...@gmail.com wrote: I have some RDD's stored as s3://-backed sequence files sharded into 1000 parts. The startup time is pretty long (~10's of minutes). It's communicating with S3, but I don't know what it's doing. Is it just fetching the metadata from S3 for each part? Is there a way to pipeline this with the computation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-sequence-file-startup-time-tp12242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark: why need a masterLock when sending heartbeat to master
Yes, good point, I believe the masterLock is now unnecessary altogether. The reason for its initial existence was that changeMaster() originally could be called out-of-band of the actor, and so we needed to make sure the master reference did not change out from under us. Now it appears that all master mutation and accesses are within the actor (we only call tryRegisterAllMasters() from a different thread, which does not use the master field). If you like, feel free to submit a PR to remove the masterLock. On Sun, Aug 17, 2014 at 8:58 AM, Victor Sheng victorsheng...@gmail.com wrote: I don't understand why worker need a master lock when sending heartbeat. Caused by master HA ? Who can explain this in detail? Thanks~ Please refer: http://stackoverflow.com/questions/25173219/why-does-the-spark-worker-actor-use-a-masterlock case SendHeartbeat = masterLock.synchronized { if (connected) { master ! Heartbeat(workerId) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-why-need-a-masterLock-when-sending-heartbeat-to-master-tp12256.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Iterator over RDD in PySpark
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Re: Iterator over RDD in PySpark
Ah, that's unfortunate, that definitely should be added. Using a pyspark-internal method, you could try something like javaIterator = rdd._jrdd.toLocalIterator() it = rdd._collect_iterator_through_file(javaIterator) On Fri, Aug 1, 2014 at 3:04 PM, Andrei faithlessfri...@gmail.com wrote: Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?). But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it? I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.: rdd = sc.parallelize([1, 2, 3, 4, 5]) it = rdd._jrdd.toLocalIterator() next(it) 14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371 ... 14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s bytearray(b'\x80\x02K\x01.') I understand that returned byte array somehow corresponds to actual data, but how can I get it? On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote: rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Re: Spilling in-memory... messages in log even with MEMORY_ONLY
I see. There should not be a significant algorithmic difference between those two cases, as far as I can think, but there is a good bit of local-mode-only logic in Spark. One typical problem we see on large-heap, many-core JVMs, though, is much more time spent in garbage collection. I'm not sure how oprofile gathers its statistics, but it's possible the stop-the-world pauses just appear as pausing inside regular methods. You could see if this is happening by adding -XX:+PrintGCDetails to spark.executor.extraJavaOptions (in spark-defaults.conf) and --driver-java-options (as a command-line argument), and then examining the stdout logs. On Sun, Jul 27, 2014 at 10:29 AM, lokesh.gidra lokesh.gi...@gmail.com wrote: I am comparing the total time spent in finishing the job. And What I am comparing, to be precise, is on a 48-core machine. I am comparing the performance of local[48] vs. standalone mode with 8 nodes of 6 cores each (totalling 48 cores) on localhost. In this comparison, the standalone mode outperforms local[48] substantially. When I did some troublshooting using oprofile, I found that local[48] was spending much more time in writeObject0 as compared to standalone mode. I am running the PageRank example provided in the package. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10743.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Configuring Spark Memory
Whoops, I was mistaken in my original post last year. By default, there is one executor per node per Spark Context, as you said. spark.executor.memory is the amount of memory that the application requests for each of its executors. SPARK_WORKER_MEMORY is the amount of memory a Spark Worker is willing to allocate in executors. So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster, and spark.executor.memory to 4g, you would be able to run 2 simultaneous Spark Contexts who get 4g per node. Similarly, if spark.executor.memory were 8g, you could only run 1 Spark Context at a time on the cluster, but it would get all the cluster's memory. On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com wrote: Thank you Nishkam, I have read your code. So, for the sake of my understanding, it seems that for each spark context there is one executor per node? Can anyone confirm this? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote: See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: What if there are large, read-only variables shared by all map functions?
In particular, take a look at the TorrentBroadcast, which should be much more efficient than HttpBroadcast (which was the default in 1.0) for large files. If you find that TorrentBroadcast doesn't work for you, then another way to solve this problem is to place the data on all nodes' local disks, and amortize the cost of the data loading by using RDD#mapPartitions instead of #map, which allows you to do the loading once for a large set of elements. You could refine this model further by keeping some sort of (perhaps static) state on your Executors, like object LookupTable { def getOrLoadTable(): LookupTable } and then using this method in your map partitions method. This would ensure the table is only loaded once on each Executor, and could also be used to ensure the data remains between jobs. You should be careful, though, at using so much memory outside of Spark's knowledge -- you may need to tune the Spark memory options if you run into OutOfMemoryErrors. On Wed, Jul 23, 2014 at 8:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Have a look at broadcast variables . On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote: Hi there, I was wondering if anybody could help me find an efficient way to make a MapReduce program like this: 1) For each map function, it need access some huge files, which is around 6GB 2) These files are READ-ONLY. Actually they are like some huge look-up table, which will not change during 2~3 years. I tried two ways to make the program work, but neither of them is efficient: 1) The first approach I tried is to let each map function load those files independently, like this: map (...) { load(files); DoMapTask(...)} 2) The second approach I tried is to load the files before RDD.map(...) and broadcast the files. However, because the files are too large, the broadcasting overhead is 30min ~ 1 hour. Could anybody help me find an efficient way to solve it? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
What's the exception you're seeing? Is it an OOM? On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote: Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in hive/impala, they are folders in HDFS, and each table has different schema, and in its folder there are one or more parquet files. that means xx001_suffix xx002_suffix are folders, there are some parquet files like xx001_suffix/parquet_file1_with_schema1 xx002_suffix/parquet_file1_with_schema2 xx002_suffix/parquet_file2_with_schema2 it seems only union can do this job~ Nonetheless, thank you very much, maybe the only reason is that spark eating up too much memory... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: which kind of BlockId should I use?
Hm, this is not a public API, but you should theoretically be able to use TestBlockId if you like. Internally, we just use the BlockId's natural hashing and equality to do lookups and puts, so it should work fine. However, since it is in no way public API, it may change even in maintenance releases. On Sun, Jul 20, 2014 at 10:25 PM, william k...@qq.com wrote: When spark is 0.7.3, I use SparkEnv.get.blockManager.getLocal(model) and SparkEnv.get.blockManager.put(model, buf, StorageLevel.MEMORY_ONLY, false) to cached model object When I porting to spark 1.0.1, I found SparkEnv.get.blockManager.getLocal SparkEnv.get.blockManager.put's APIs changed to use BlockId instead of String. And BlockId has 5 concrete class, which one should I use? None of them have a string input parameter. Thx
Re: Large Task Size?
Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Confused by groupByKey() and the default partitioner
Ah -- I should have been more clear, list concatenation isn't going to be any faster. In many cases I've seen people use groupByKey() when they are really trying to do some sort of aggregation. and thus constructing this concatenated list is more expensive than they need. On Sun, Jul 13, 2014 at 9:13 AM, Guanhua Yan gh...@lanl.gov wrote: Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list concatenation operations, and found that the performance becomes even worse. So groupByKey is not that bad in my code. Best regards, - Guanhua From: Aaron Davidson ilike...@gmail.com Reply-To: user@spark.apache.org Date: Sat, 12 Jul 2014 16:32:22 -0700 To: user@spark.apache.org Subject: Re: Confused by groupByKey() and the default partitioner Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: Confused by groupByKey() and the default partitioner
Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: KMeans for large training data
The netlib.BLAS: Failed to load implementation warning only means that the BLAS implementation may be slower than using a native one. The reason why it only shows up at the end is that the library is only used for the finalization step of the KMeans algorithm, so your job should've been wrapping up at this point. I am not familiar with the algorithm beyond that, so I'm not sure if for some reason we're trying to collect too much data back to the driver here. SPARK_DRIVER_MEMORY can increase the driver memory, by the way (or by using the --driver-memory option when using spark-submit). On Sat, Jul 12, 2014 at 2:38 AM, durin m...@simon-schaefer.net wrote: Your latest response doesn't show up here yet, I only got the mail. I'll still answer here in the hope that it appears later: Which memory setting do you mean? I can go up with spark.executor.memory a bit, it's currently set to 12G. But thats already way more than the whole SchemaRDD of Vectors that I currently use for training, which shouldn't be more than a few hundred M. I suppose you rather mean something comparable to SHARK_MASTER_MEM in Shark. I can't find the equivalent for Spark in the documentations, though. And if it helps, I can summarize the whole code currently that I currently use. It's nothing really fancy at the moment, I'm just trying to classify Strings that each contain a few words (words are handled each as atomic items). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9509.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Convert from RDD[Object] to RDD[Array[Object]]
If you don't really care about the batchedDegree, but rather just want to do operations over some set of elements rather than one at a time, then just use mapPartitions(). Otherwise, if you really do want certain sized batches and you are able to relax the constraints slightly, is to construct these batches within each partition. For instance: val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] = new Iterator[Array[Int]] { def hasNext: Boolean = iter.hasNext def next(): Array[Int] = { iter.take(batchedDegree).toArray } } } This function is efficient in that it does not load the entire partition into memory, just enough to construct each batch. However, there will be one smaller batch at the end of each partition (rather than just one over the entire dataset). On Sat, Jul 12, 2014 at 6:03 PM, Parthus peng.wei@gmail.com wrote: Hi there, I have a bunch of data in a RDD, which I processed it one by one previously. For example, there was a RDD denoted by data: RDD[Object] and then I processed it using data.map(...). However, I got a new requirement to process the data in a patched way. It means that I need to convert the RDD from RDD[Object] to RDD[Array[Object]] and then process it, which is to fill out this function: def convert2array(inputs: RDD[Object], batchedDegree: Int): RDD[Array[Object]] = {...}. I hope that after the conversion, each element of the new RDD is an array of the previous RDD elements. The parameter batchedDegree specifies how many elements are batched together. For example, if I have 210 elements in the previous RDD, the result of the conversion functions should be a RDD with 3 elements. Each element is an array, and the first two arrays contains 1~100 and 101~200 elements. The third element contains 201~210 elements. I was wondering if anybody could help me complete this scala function with an efficient way. Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-from-RDD-Object-to-RDD-Array-Object-tp9530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pyspark sc.parallelize running OOM with smallish data
I think this is probably dying on the driver itself, as you are probably materializing the whole dataset inside your python driver. How large is spark_data_array compared to your driver memory? On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote: I put the same dataset into scala (using spark-shell) and it acts weird. I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96 in the status bar, shows details about the worker nodes but there is no progress. sc.parallelize does finish (takes too long for the data size) in scala. On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote: spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in the cluster and gave 48g to executors. also tried kyro serialization. traceback (most recent call last): File /mohit/./m.py, line 58, in module spark_data = sc.parallelize(spark_data_array) File /mohit/spark/python/pyspark/context.py, line 265, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Putting block rdd failed when running example svm on large data
Also check the web ui for that. Each iteration will have one or more stages associated with it in the driver web ui. On Sat, Jul 12, 2014 at 6:47 PM, crater cq...@ucmerced.edu wrote: Hi Xiangrui, Thanks for the information. Also, it is possible to figure out the execution time per iteration for SVM? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Putting-block-rdd-failed-when-running-example-svm-on-large-data-tp9515p9535.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large Task Size?
I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)
There is a difference from actual GC overhead, which can be reduced by reusing objects, versus this error, which actually means you ran out of memory. This error can probably be relieved by increasing your executor heap size, unless your data is corrupt and it is allocating huge arrays, or you are otherwise keeping too much memory around. For your other question, you can reuse objects similar to MapReduce (HadoopRDD does this by actually using Hadoop's Writables, for instance), but the general Spark APIs don't support this because mutable objects are not friendly to caching or serializing. On Tue, Jul 8, 2014 at 9:27 AM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I faced with the next exception during map step: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.reflect.Array.newInstance(Array.java:70) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:325) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:155) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) I'm using Spark 1.0In map I create new object each time, as I understand I can't reuse object similar to MapReduce development? I wondered, if you could point me how is it possible to avoid GC overhead...thank you in advance Thank you, Konstantin Kudryavtsev
Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)
This seems almost equivalent to a heap size error -- since GCs are stop-the-world events, the fact that we were unable to release more than 2% of the heap suggests that almost all the memory is *currently in use *(i.e., live). Decreasing the number of cores is another solution which decreases memory pressure, because each core requires its own set of buffers (for instance, each kryo serializer has a certain buffer allocated to it), and has its own working set of data (some subset of a partition). Thus, decreasing the number of used cores decreases memory contention. On Tue, Jul 8, 2014 at 10:44 AM, Jerry Lam chiling...@gmail.com wrote: Hi Konstantin, I just ran into the same problem. I mitigated the issue by reducing the number of cores when I executed the job which otherwise it won't be able to finish. Unlike many people believes, it might not means that you were running out of memory. A better answer can be found here: http://stackoverflow.com/questions/4371505/gc-overhead-limit-exceeded and copied here as a reference: Excessive GC Time and OutOfMemoryError The concurrent collector will throw an OutOfMemoryError if too much time is being spent in garbage collection: if more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, an OutOfMemoryError will be thrown. This feature is designed to prevent applications from running for an extended period of time while making little or no progress because the heap is too small. If necessary, this feature can be disabled by adding the option -XX:-UseGCOverheadLimit to the command line. The policy is the same as that in the parallel collector, except that time spent performing concurrent collections is not counted toward the 98% time limit. In other words, only collections performed while the application is stopped count toward excessive GC time. Such collections are typically due to a concurrent mode failure or an explicit collection request (e.g., a call to System.gc()). It could be that there are many tasks running in the same node and they all compete for running GCs which slow things down and trigger the error you saw. By reducing the number of cores, there are more cpu resources available to a task so the GC could finish before the error gets throw. HTH, Jerry On Tue, Jul 8, 2014 at 1:35 PM, Aaron Davidson ilike...@gmail.com wrote: There is a difference from actual GC overhead, which can be reduced by reusing objects, versus this error, which actually means you ran out of memory. This error can probably be relieved by increasing your executor heap size, unless your data is corrupt and it is allocating huge arrays, or you are otherwise keeping too much memory around. For your other question, you can reuse objects similar to MapReduce (HadoopRDD does this by actually using Hadoop's Writables, for instance), but the general Spark APIs don't support this because mutable objects are not friendly to caching or serializing. On Tue, Jul 8, 2014 at 9:27 AM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I faced with the next exception during map step: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.reflect.Array.newInstance(Array.java:70) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:325) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34
Re: CoarseGrainedExecutorBackend: Driver Disassociated
Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line: 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are you configuring the driver public dns name somewhere? On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote: Dear All, When I look inside the following directory on my worker node: $SPARK_HOME/work/app-20140708110707-0001/3 I see the following error message: log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444) 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started 14/07/08 11:07:12 INFO Remoting: Starting remoting 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679] 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp:// sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679] - [akka I am not sure what the problem is but it is preventing me to get the 4 node test cluster up and running.
Re: Comparative study
Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. +1 @Reynold Spark can handle big big data. There are known issues with informing the user about what went wrong and how to fix it that we're actively working on, but the first impulse when a job fails should be what did I do wrong rather than Spark can't handle this workload. Messaging is a huge part in making this clear -- getting things like a job hanging or an out of memory error can be very difficult to debug, and improving this is one of our highest priorties. On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com wrote: Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. I've personally run a workload that shuffled 300 TB of data. I've also ran something that shuffled 5TB/node and stuffed my disks fairly full that the file system is close to breaking. We can definitely do a better job in Spark to make it output more meaningful diagnosis and more robust with partitions of data that don't fit in memory though. A lot of the work in the next few releases will be on that. On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some comparative study on the below: Spark vs Impala Spark vs MapREduce . Is it worth migrating from existing MR implementation to Spark? Please share your thoughts and expertise. Thanks, Santosh -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W:
Re: CoarseGrainedExecutorBackend: Driver Disassociated
You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I thought you might have preemptively done so. I think the issue is actually related to your network configuration, as Spark probably failed to find your driver's ip address. Do you see a warning on the driver that looks something like Your hostname, localhost resolves to a loopback address, but we couldn't find any external IP address? Either way, let's try to set SPARK_LOCAL_IP (see http://spark.apache.org/docs/latest/configuration.html) inside ~/spark/conf/spark-env.sh on your driver machine to an IP address that's reachable by your executors. Something like export SPARK_LOCAL_IP=194.168.1.105 You can make sure it was set correctly by running sc.getConf.get(spark.driver.host), which should return the driver hostname, and NOT localhost. (Note that it's also possible that your /etc/hosts file contains a mapping from the driver's ip address to localhost, which it should not.) On Tue, Jul 8, 2014 at 2:23 PM, Sameer Tilak ssti...@live.com wrote: Hi Aaron, Would really appreciate your help if you can point me to the documentation. Is this something that I need to do with /etc/hosts on each of the worker machines ? Or do I set SPARK_PUBLIC_DNS (if yes, what is the format?) or something else? I have the following set up: master node: pzxnvm2018.x.y.org worker nodes: pzxnvm2022.x.y.org pzxnvm2023.x.y.org pzxnvm2024.x.y.org From: ilike...@gmail.com Date: Tue, 8 Jul 2014 11:59:54 -0700 Subject: Re: CoarseGrainedExecutorBackend: Driver Disassociated To: user@spark.apache.org Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line: 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are you configuring the driver public dns name somewhere? On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote: Dear All, When I look inside the following directory on my worker node: $SPARK_HOME/work/app-20140708110707-0001/3 I see the following error message: log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444) 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started 14/07/08 11:07:12 INFO Remoting: Starting remoting 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679 http://pzxnvm2022.dcld.pldc.kp.org:34679] 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679 http://pzxnvm2022.x.y.name.org:34679] 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp:// sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker http://pzxnvm2022.x.y.name.org:37054/user/Worker 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679 http://pzxnvm2022.dcld.pldc.kp.org:34679] - [akka I am not sure what the problem is but it is preventing me to get the 4 node test cluster up and running.
Re: CoarseGrainedExecutorBackend: Driver Disassociated
By the way, you can run the sc.getConf.get(spark.driver.host) thing inside spark-shell, whether or not the Executors actually start up successfully. On Tue, Jul 8, 2014 at 8:23 PM, Aaron Davidson ilike...@gmail.com wrote: You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I thought you might have preemptively done so. I think the issue is actually related to your network configuration, as Spark probably failed to find your driver's ip address. Do you see a warning on the driver that looks something like Your hostname, localhost resolves to a loopback address, but we couldn't find any external IP address? Either way, let's try to set SPARK_LOCAL_IP (see http://spark.apache.org/docs/latest/configuration.html) inside ~/spark/conf/spark-env.sh on your driver machine to an IP address that's reachable by your executors. Something like export SPARK_LOCAL_IP=194.168.1.105 You can make sure it was set correctly by running sc.getConf.get(spark.driver.host), which should return the driver hostname, and NOT localhost. (Note that it's also possible that your /etc/hosts file contains a mapping from the driver's ip address to localhost, which it should not.) On Tue, Jul 8, 2014 at 2:23 PM, Sameer Tilak ssti...@live.com wrote: Hi Aaron, Would really appreciate your help if you can point me to the documentation. Is this something that I need to do with /etc/hosts on each of the worker machines ? Or do I set SPARK_PUBLIC_DNS (if yes, what is the format?) or something else? I have the following set up: master node: pzxnvm2018.x.y.org worker nodes: pzxnvm2022.x.y.org pzxnvm2023.x.y.org pzxnvm2024.x.y.org From: ilike...@gmail.com Date: Tue, 8 Jul 2014 11:59:54 -0700 Subject: Re: CoarseGrainedExecutorBackend: Driver Disassociated To: user@spark.apache.org Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line: 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are you configuring the driver public dns name somewhere? On Tue, Jul 8, 2014 at 11:52 AM, Sameer Tilak ssti...@live.com wrote: Dear All, When I look inside the following directory on my worker node: $SPARK_HOME/work/app-20140708110707-0001/3 I see the following error message: log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/08 11:07:11 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/08 11:07:11 INFO SecurityManager: Changing view acls to: p529444 14/07/08 11:07:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(p529444) 14/07/08 11:07:12 INFO Slf4jLogger: Slf4jLogger started 14/07/08 11:07:12 INFO Remoting: Starting remoting 14/07/08 11:07:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679 http://pzxnvm2022.dcld.pldc.kp.org:34679] 14/07/08 11:07:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@pzxnvm2022.x.y.name.org:34679 http://pzxnvm2022.x.y.name.org:34679] 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler 14/07/08 11:07:13 INFO WorkerWatcher: Connecting to worker akka.tcp:// sparkwor...@pzxnvm2022.x.y.name.org:37054/user/Worker http://pzxnvm2022.x.y.name.org:37054/user/Worker 14/07/08 11:07:13 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@pzxnvm2022.dcld.pldc.kp.org:34679 http://pzxnvm2022.dcld.pldc.kp.org:34679] - [akka I am not sure what the problem is but it is preventing me to get the 4 node test cluster up and running.
Re: Serialization of objects
If you want to stick with Java serialization and need to serialize a non-Serializable object, your best choices are probably to either subclass it with a Serializable one or wrap it in a class of your own which implements its own writeObject/readObject methods (see here: http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java ) Otherwise you can use Kryo to register custom serializers for other people's objects. On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: public class BasicStringWrapper implements Serializable However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar. -- From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: Serialization of objects Date: Thu, 26 Jun 2014 09:30:31 -0700 Hi everyone, Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I am still facing the serialization issue. I get org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper Any help with this will be great. Scala code: package approxstrmatch import com.wcohen.ss.BasicStringWrapper; import com.wcohen.ss.Jaccard; import java.util.Iterator; import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd; import org.apache.spark.rdd.RDD; import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[approxstrmatch.JaccardScore]) kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) kryo.register(classOf[com.wcohen.ss.Jaccard]) } } class JaccardScore { val mjc = new Jaccard() with Serializable val conf = new SparkConf().setMaster(spark://pzxnvm2018:7077).setAppName(ApproxStrMatch) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, approxstrmatch.MyRegistrator) val sc = new SparkContext(conf) def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) { val jc_ = this.mjc var i: Int = 0 for (sentence - sourcerdd.toLocalIterator) {val str1 = new BasicStringWrapper (sentence) var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x))) val fileName = new String(/apps/software/scala-approsstrmatch-sentence + i) scorevector.saveAsTextFile(fileName) i += 1 } } Here is the script: val distFile = sc.textFile(hdfs://serverip:54310/data/dummy/sample.txt); val srcFile = sc.textFile(hdfs://serverip:54310/data/dummy/test.txt); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) O/P: 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at console:12), which has no missing parents 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at console:12) 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms 14/06/25 12:32:05 INFO Executor: Running task ID 0 14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564 14/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp 14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader 14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 1403724701562 14/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp 14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally 14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+140 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717
Re: Failed to launch Worker
Where are you running the spark-class version? Hopefully also on the workers. If you're trying to centrally start/stop all workers, you can add a slaves file to the spark conf/ directory which is just a list of your hosts, one per line. Then you can just use ./sbin/start-slaves.sh to start the worker on all of your machines. Note that this is already setup correctly if you're using the spark-ec2 scripts. On Tue, Jul 1, 2014 at 5:53 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Yes. Thanks Regards, Meethu M On Tuesday, 1 July 2014 6:14 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this command working?? java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/ assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://x.x.x.174:7077 Thanks Best Regards On Tue, Jul 1, 2014 at 6:08 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi , I am using Spark Standalone mode with one master and 2 slaves.I am not able to start the workers and connect it to the master using ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://x.x.x.174:7077 The log says Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/x.x.x.174:0 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ... Caused by: java.net.BindException: Cannot assign requested address When I try to start the worker from the slaves using the following java command,its running without any exception java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://:master:7077 Thanks Regards, Meethu M
Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0
In your spark-env.sh, do you happen to set SPARK_PUBLIC_DNS or something of that kin? This error suggests the worker is trying to bind a server on the master's IP, which clearly doesn't make sense On Mon, Jun 30, 2014 at 11:59 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I did netstat -na | grep 192.168.125.174 and its showing 192.168.125.174:7077 LISTEN(after starting master) I tried to execute the following script from the slaves manually but it ends up with the same exception and log.This script is internally executing the java command. /usr/local/spark-1.0.0/sbin/start-slave.sh 1 spark://192.168.125.174:7077 In this case netstat is showing any connection established to master:7077. When we manually execute the java command,the connection is getting established to master. Thanks Regards, Meethu M On Monday, 30 June 2014 6:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you sure you have this ip 192.168.125.174 http://192.168.125.174:0/ bind for that machine? (netstat -na | grep 192.168.125.174 http://192.168.125.174:0/) Thanks Best Regards On Mon, Jun 30, 2014 at 5:34 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, I reinstalled spark,reboot the system,but still I am not able to start the workers.Its throwing the following exception: Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/192.168.125.174:0 I doubt the problem is with 192.168.125.174:0. Eventhough the command contains master:7077,why its showing 0 in the log. java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://master:7077 Can somebody tell me a solution. Thanks Regards, Meethu M On Friday, 27 June 2014 4:28 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, ya I tried setting another PORT also,but the same problem.. master is set in etc/hosts Thanks Regards, Meethu M On Friday, 27 June 2014 3:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: tha's strange, did you try setting the master port to something else (use SPARK_MASTER_PORT). Also you said you are able to start it from the java commandline java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/ assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://:*master*:7077 What is the master ip specified here? is it like you have entry for *master* in the /etc/hosts? Thanks Best Regards On Fri, Jun 27, 2014 at 3:09 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, I am running it in a LAN itself..The IP of the master is given correctly. Thanks Regards, Meethu M On Friday, 27 June 2014 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: why is it binding to port 0? 192.168.125.174:0 :/ Check the ip address of that master machine (ifconfig) looks like the ip address has been changed (hoping you are running this machines on a LAN) Thanks Best Regards On Fri, Jun 27, 2014 at 12:00 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, My Spark(Standalone mode) was running fine till yesterday.But now I am getting the following exeception when I am running start-slaves.sh or start-all.sh slave3: failed to launch org.apache.spark.deploy.worker.Worker: slave3: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) slave3: at java.lang.Thread.run(Thread.java:662) The log files has the following lines. 14/06/27 11:06:30 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/27 11:06:30 INFO SecurityManager: Changing view acls to: hduser 14/06/27 11:06:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser) 14/06/27 11:06:30 INFO Slf4jLogger: Slf4jLogger started 14/06/27 11:06:30 INFO Remoting: Starting remoting Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/192.168.125.174:0 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ... Caused by: java.net.BindException: Cannot assign requested address ... I saw the same error reported before and have tried the following solutions. Set the variable SPARK_LOCAL_IP ,Changed the SPARK_MASTER_PORT to a different number..But nothing is working. When I try to start the worker from the respective machines using the following java command,its running without any exception java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m
Re: Interconnect benchmarking
A simple throughput test is also repartition()ing a large RDD. This also stresses the disks, though, so you might try to mount your spark temporary directory as a ramfs. On Fri, Jun 27, 2014 at 5:57 PM, danilopds danilob...@gmail.com wrote: Hi, According with the research paper bellow of Mathei Zaharia, Spark's creator, http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf He says on page 10 that: Grep is network-bound due to the cost to replicate the input data to multiple nodes. So, I guess a can be a good initial recommendation. But I would like to know others workloads too. Best Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Improving Spark multithreaded performance?
I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: jsonFile function in SQLContext does not work
Is it possible you have blank lines in your input? Not that this should be an error condition, but it may be what's causing it. On Wed, Jun 25, 2014 at 11:57 AM, durin m...@simon-schaefer.net wrote: Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and realized that analyzing very small parts of the dataset (which is ~130GB in ~4.3M lines) works fine. The error occurs when I analyze larger parts. Using 5% of the whole data, the error is the same as posted before for certain TIDs. However, I get the structure determined so far as a result when using 5%. The Spark WebUI shows the following: Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent failure: Exception failure in TID 108 on host foo.bar.com: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@3697781f; line: 1, column: 1] com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: Is the only possible reason that some of these 4.3 Million JSON-Objects are not valid JSON, or could there be another explanation? And if it is the reason, is there some way to tell the function to just skip faulty lines? Thanks, Durin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Changing log level of spark
If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote: I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging , changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level.
Re: Shark vs Impala
Note that regarding a long load time, data format means a whole lot in terms of query performance. If you load all your data into compressed, columnar Parquet files on local hardware, Spark SQL would also perform far, far better than it would reading from gzipped S3 files. You must also be careful about your queries; certain queries can be answered much more efficiently due to specific optimizations implemented in the query engine. For instance, Parquet keeps statistics. so you could theoretically do a count(*) over petabytes of data in less than a second, blowing away any competition that resorts to actually reading data. On Sun, Jun 22, 2014 at 6:24 PM, Matei Zaharia matei.zaha...@gmail.com wrote: In this benchmark, the problem wasn’t that Shark could not run without enough memory; Shark spills some of the data to disk and can run just fine. The issue was that the in-memory form of the RDDs was larger than the cluster’s memory, although the raw Parquet / ORC files did fit in memory, so Cloudera did not want to run an “RDD” number where some of the RDD is not in memory. But the wording “could not complete” is confusing — the queries complete just fine. We do plan to update the AMPLab benchmark with Spark SQL as well, and expand it to include more of TPC-DS. Matei On Jun 22, 2014, at 9:53 AM, Debasish Das debasish.da...@gmail.com wrote: 600s for Spark vs 5s for Redshift...The numbers look much different from the amplab benchmark... https://amplab.cs.berkeley.edu/benchmark/ Is it like SSDs or something that's helping redshift or the whole data is in memory when you run the query ? Could you publish the query ? Also after spark-sql are we planning to add spark-sql runtimes in the amplab benchmark as well ? On Sun, Jun 22, 2014 at 9:13 AM, Toby Douglass t...@avocet.io wrote: I've just benchmarked Spark and Impala. Same data (in s3), same query, same cluster. Impala has a long load time, since it cannot load directly from s3. I have to create a Hive table on s3, then insert from that to an Impala table. This takes a long time; Spark took about 600s for the query, Impala 250s, but Impala required 6k seconds to load data from s3. If you're going to go the long-initial-load-then-quick-queries route, go for Redshift. On equivalent hardware, that took about 4k seconds to load, but then queries are like 5s each.
Re: DAGScheduler: Failed to run foreach
Please note that this: for (sentence - sourcerdd) { ... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence = ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try this instead: for (sentence - sourcerdd.toLocalIterator) { ... } (By the way, the reason this was throwing a NotSerializableException was because you were trying to pass printScoreCanndedString as part of the job's closure. In Java, class methods have an implicit reference to this, so it tried to serialize the class CalculateScore, which is presumably not marked as Serializable.) On Mon, Jun 23, 2014 at 5:45 PM, Sameer Tilak ssti...@live.com wrote: The subject should be: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: and not DAGScheduler: Failed to run foreach If I call printScoreCanndedString with a hard-coded string and identical 2nd parameter, it works fine. However for my application that is not sufficient. -- From: ssti...@live.com To: u...@spark.incubator.apache.org Subject: DAGScheduler: Failed to run foreach Date: Mon, 23 Jun 2014 17:05:03 -0700 Hi All, I am using spark for text analysis. I have a source file that has few thousand sentences and a dataset of tens of millions of statements. I want to compare each statement from the sourceFile with each statement from the dataset and generate a score. I am having following problem. I would really appreciate help. Here is what I do within spark-shell // Source file with few thousand sentences val srcFile = sc.textFile(hdfs://serverip/data/dummy/src.txt); // Dataset with tens of millions of statements. val destFile = sc.textFile(hdfs://serverip/data/dummy/sample.txt); // Initialize the score variable. val score = new mypackage.Score() // Generate score. score.calculateScore(srcFile, destFile); Here is my snippet from my scala class (Score.scala) def calculateScore (sourcerdd: RDD[String], destrdd: RDD[String]) { for (sentence - sourcerdd) { println(Source String is: + sentence + Data Type is: + sentence.getClass.getSimpleName) printScoreCanndedString(sentence, destrdd); } } def printScoreCanndedString(sourceStr: String,rdd: RDD[String]) : RDD[Double] = { // Do the analysis here. } The print statement displays the data correctly along with data type as String as expected. However, I get the following error message when it tries to execute printScoreCanndedString method. Any help with this will be great. 14/06/23 16:45:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/23 16:45:04 WARN LoadSnappy: Snappy native library not loaded 14/06/23 16:45:04 INFO FileInputFormat: Total input paths to process : 1 14/06/23 16:45:04 INFO SparkContext: Starting job: foreach at calculateScore.scala:51 14/06/23 16:45:04 INFO DAGScheduler: Got job 0 (foreach at CalculateScore.scala:51) with 2 output partitions (allowLocal=false) 14/06/23 16:45:04 INFO DAGScheduler: Final stage: Stage 0(foreach at CalculateScore.scala:51) 14/06/23 16:45:04 INFO DAGScheduler: Parents of final stage: List() 14/06/23 16:45:04 INFO DAGScheduler: Missing parents: List() 14/06/23 16:45:04 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at console:12), which has no missing parents 14/06/23 16:45:04 INFO DAGScheduler: Failed to run foreach at CalculateScore.scala:51 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: approxstrmatch. CalculateScore at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at
Re: spark with docker: errors with akka, NAT?
I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote: It was a DNS issue. AKKA apparently uses the hostname of the endpoints and hence they need to be resolvable. In my case the hostname of the docker container was a randomly generated string and was not resolvable. I added a workaround (entry in etc/hosts file of spark master) for now. If anyone can point to a more elegant solution, that would be awesome! On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote: I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Re: Executors not utilized properly.
repartition() is actually just an alias of coalesce(), but which the shuffle flag to set to true. This shuffle is probably what you're seeing as taking longer, but it is required when you go from a smaller number of partitions to a larger. When actually decreasing the number of partitions, coalesce(shuffle = false) will be fully pipelined, but is limited in how it can redistribute data, as it can only combine whole partitions into larger partitions. For example, if you have an rdd with 101 partitions, and you do rdd.coalesce(100, shuffle = false), then the resultant rdd will have 99 of the original partitions, and 1 partition will just be 2 original partitions combined. This can lead to increased data skew, but requires no effort to create. On the other hand, if you do rdd.coalesce(100, shuffle = true), then all of the data will actually be reshuffled into 100 new evenly-sized partitions, eliminating any data skew at the cost of actually moving all data around. On Tue, Jun 17, 2014 at 4:52 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: I found the main reason to be that i was using coalesce instead of repartition. coalesce was shrinking the portioning so the number of tasks were very less to be executed by all of the executors. Can you help me in understudying when to use coalesce and when to use repartition. In application coalesce is being processed faster then repartition. Which is unusual. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-not-utilized-properly-tp7744p7787.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark with docker: errors with akka, NAT?
Yup, alright, same solution then :) On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi mohitja...@gmail.com wrote: I used --privileged to start the container and then unmounted /etc/hosts. Then I created a new /etc/hosts file On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com wrote: I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi mohitja...@gmail.com wrote: It was a DNS issue. AKKA apparently uses the hostname of the endpoints and hence they need to be resolvable. In my case the hostname of the docker container was a randomly generated string and was not resolvable. I added a workaround (entry in etc/hosts file of spark master) for now. If anyone can point to a more elegant solution, that would be awesome! On Tue, Jun 17, 2014 at 3:48 PM, Mohit Jaggi mohitja...@gmail.com wrote: I am using cutting edge code from git but doing my own sbt assembly. On Mon, Jun 16, 2014 at 10:28 PM, Andre Schumacher schum...@icsi.berkeley.edu wrote: Hi, are you using the amplab/spark-1.0.0 images from the global registry? Andre On 06/17/2014 01:36 AM, Mohit Jaggi wrote: Hi Folks, I am having trouble getting spark driver running in docker. If I run a pyspark example on my mac it works but the same example on a docker image (Via boot2docker) fails with following logs. I am pointing the spark driver (which is running the example) to a spark cluster (driver is not part of the cluster). I guess this has something to do with docker's networking stack (it may be getting NAT'd) but I am not sure why (if at all) the spark-worker or spark-master is trying to create a new TCP connection to the driver, instead of responding on the connection initiated by the driver. I would appreciate any help in figuring this out. Thanks, Mohit. logs Spark Executor Command: java -cp ::/home/ayasdi/spark/conf:/home//spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar -Xms2g -Xmx2g -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 1 cobalt 24 akka.tcp://sparkWorker@:33952/user/Worker app-20140616152201-0021 log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/06/16 15:22:05 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/16 15:22:05 INFO SecurityManager: Changing view acls to: ayasdi,root 14/06/16 15:22:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xxx, xxx) 14/06/16 15:22:05 INFO Slf4jLogger: Slf4jLogger started 14/06/16 15:22:05 INFO Remoting: Starting remoting 14/06/16 15:22:06 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@:33536] 14/06/16 15:22:06 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@fc31887475e3:43921/user/CoarseGrainedScheduler 14/06/16 15:22:06 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@:33952/user/Worker 14/06/16 15:22:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@fc31887475e3:43921]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/06/16 15:22:06 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@:33536] - [akka.tcp://spark@fc31887475e3:43921] disassociated! Shutting down.
Re: long GC pause during file.cache()
Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be smaller and avoid GC pauses [1] See standalone documentation here: http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t check….) Best, -- Nan Zhu On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: How to specify executor memory in EC2 ?
The scripts for Spark 1.0 actually specify this property in /root/spark/conf/spark-defaults.conf I didn't know that this would override the --executor-memory flag, though, that's pretty odd. On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Yes, I am launching a cluster with the spark_ec2 script. I checked /root/spark/conf/spark-env.sh on the master node and on slaves and it looks like this: #!/usr/bin/env bash export SPARK_LOCAL_DIRS=/mnt/spark # Standalone cluster options export SPARK_MASTER_OPTS= export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_CORES=1 export HADOOP_HOME=/root/ephemeral-hdfs export SPARK_MASTER_IP=ec2-54-89-95-238.compute-1.amazonaws.com export MASTER=`cat /root/spark-ec2/cluster-url` export SPARK_SUBMIT_LIBRARY_PATH=$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/ export SPARK_SUBMIT_CLASSPATH=$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf # Bind Spark's web UIs to this machine's public EC2 hostname: export SPARK_PUBLIC_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname` http://169.254.169.254/latest/meta-data/public-hostname # Set a high ulimit for large shuffles ulimit -n 100 None of these variables seem to be related to memory size. Let me know if I am missing something. On Thu, Jun 12, 2014 at 7:17 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you launching this using our EC2 scripts? Or have you set up a cluster by hand? Matei On Jun 12, 2014, at 2:32 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: spark-env.sh doesn't seem to contain any settings related to memory size :( I will continue searching for a solution and will post it if I find it :) Thank you, anyway On Wed, Jun 11, 2014 at 12:19 AM, Matei Zaharia matei.zaha...@gmail.com wrote: It might be that conf/spark-env.sh on EC2 is configured to set it to 512, and is overriding the application’s settings. Take a look in there and delete that line if possible. Matei On Jun 10, 2014, at 2:38 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: I am testing my application in EC2 cluster of m3.medium machines. By default, only 512 MB of memory on each machine is used. I want to increase this amount and I'm trying to do it by passing --executor-memory 2G option to the spark-submit script, but it doesn't seem to work - each machine uses only 512 MB instead of 2 gigabytes. What am I doing wrong? How do I increase the amount of memory?
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as you said, and only delete the input data once the entire output has been successfully created. On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Without (C), what is the best practice to implement the following scenario? 1. rdd = sc.textFile(FileA) 2. rdd = rdd.map(...) // actually modifying the rdd 3. rdd.saveAsTextFile(FileA) Since the rdd transformation is 'lazy', rdd will not materialize until saveAsTextFile(), so FileA must still exist, but it must be deleted before saveAsTextFile(). What I can think is: 3. rdd.saveAsTextFile(TempFile) 4. delete FileA 5. rename TempFile to FileA This is not very convenient... Thanks. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, June 03, 2014 11:40 AM To: user@spark.apache.org Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output format check and overwrite files in the destination directory. But it won't clobber the directory entirely. I.e. if the directory already had part1 part2 part3 part4 and you write a new job outputing only two files (part1, part2) then it would leave the other two files intact, confusingly. (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check which means the directory must not exist already or an excpetion is thrown. (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK): Spark will delete/clobber an existing destination directory if it exists, then fully over-write it with new data. I'm fine to add a flag that allows (B) for backwards-compatibility reasons, but my point was I'd prefer not to have (C) even though I see some cases where it would be useful. - Patrick On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Is there a third way? Unless I miss something. Hadoop's OutputFormat wants the target dir to not exist no matter what, so it's just a question of whether Spark deletes it for you or errors. On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com wrote: We can just add back a flag to make it backwards compatible - it was just missed during the original PR. Adding a *third* set of clobber semantics, I'm slightly -1 on that for the following reasons: 1. It's scary to have Spark recursively deleting user files, could easily lead to users deleting data by mistake if they don't understand the exact semantics. 2. It would introduce a third set of semantics here for saveAsXX... 3. It's trivial for users to implement this with two lines of code (if output dir exists, delete it) before calling saveAsHadoopFile. - Patrick
Re: How to enable fault-tolerance?
Looks like your problem is local mode: https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430 For some reason, someone decided not to do retries when running in local mode. Not exactly sure why, feel free to submit a JIRA on this. On Mon, Jun 9, 2014 at 8:59 AM, Peng Cheng pc...@uow.edu.au wrote: I speculate that Spark will only retry on exceptions that are registered with TaskSetScheduler, so a definitely-will-fail task will fail quickly without taking more resources. However I haven't found any documentation or web page on it -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
+1 please re-add this feature On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com wrote: Thanks for pointing that out. I've assigned you to SPARK-1677 (I think I accidentally assigned myself way back when I created it). This should be an easy fix. On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Patrick, I think https://issues.apache.org/jira/browse/SPARK-1677 is talking about the same thing? How about assigning it to me? I think I missed the configuration part in my previous commit, though I declared that in the PR description Best, -- Nan Zhu On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote: Hey There, The issue was that the old behavior could cause users to silently overwrite data, which is pretty bad, so to be conservative we decided to enforce the same checks that Hadoop does. This was documented by this JIRA: https://issues.apache.org/jira/browse/SPARK-1100 https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1 However, it would be very easy to add an option that allows preserving the old behavior. Is anyone here interested in contributing that? I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-1993 - Patrick On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans pierre.borckm...@realimpactanalytics.com wrote: Indeed, the behavior has changed for good or for bad. I mean, I agree with the danger you mention but I'm not sure it's happening like that. Isn't there a mechanism for overwrite in Hadoop that automatically removes part files, then writes a _temporary folder and then only the part files along with the _success folder. In any case this change of behavior should be documented IMO. Cheers Pierre Message sent from a mobile device - excuse typos and abbreviations Le 2 juin 2014 à 17:42, Nicholas Chammas nicholas.cham...@gmail.com a écrit : What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is that files get overwritten automatically. This is one danger to this though. If I save to a directory that already has 20 part- files, but this time around I'm only saving 15 part- files, then there will be 5 leftover part- files from the previous set mixed in with the 15 newer files. This is potentially dangerous. I haven't checked to see if this behavior has changed in 1.0.0. Are you saying it has, Pierre? On Mon, Jun 2, 2014 at 9:41 AM, Pierre B [pierre.borckm...@realimpactanalytics.com](mailto: pierre.borckm...@realimpactanalytics.com) wrote: Hi Michaël, Thanks for this. We could indeed do that. But I guess the question is more about the change of behaviour from 0.9.1 to 1.0.0. We never had to care about that in previous versions. Does that mean we have to manually remove existing files or is there a way to aumotically overwrite when using saveAsTextFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: hadoopRDD stalls reading entire directory
You may have to do sudo jps, because it should definitely list your processes. What does hivecluster2:8080 look like? My guess is it says there are 2 applications registered, and one has taken all the executors. There must be two applications running, as those are the only things that keep open those 4040/4041 ports. On Mon, Jun 2, 2014 at 11:32 AM, Russell Jurney russell.jur...@gmail.com wrote: If it matters, I have servers running at http://hivecluster2:4040/stages/ and http://hivecluster2:4041/stages/ When I run rdd.first, I see an item at http://hivecluster2:4041/stages/ but no tasks are running. Stage ID 1, first at console:46, Tasks: Succeeded/Total 0/16. On Mon, Jun 2, 2014 at 10:09 AM, Russell Jurney russell.jur...@gmail.com wrote: Looks like just worker and master processes are running: [hivedata@hivecluster2 ~]$ jps 10425 Jps [hivedata@hivecluster2 ~]$ ps aux|grep spark hivedata 10424 0.0 0.0 103248 820 pts/3S+ 10:05 0:00 grep spark root 10918 0.5 1.4 4752880 230512 ? Sl May27 41:43 java -cp :/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/conf:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/jline.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path=/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip hivecluster2 --port 7077 --webui-port 18080 root 12715 0.0 0.0 148028 656 ?SMay27 0:00 sudo /opt/cloudera/parcels/SPARK/lib/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077 root 12716 0.3 1.1 4155884 191340 ? Sl May27 30:21 java -cp :/opt/cloudera/parcels/SPARK/lib/spark/conf:/opt/cloudera/parcels/SPARK/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077 On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson ilike...@gmail.com wrote: Sounds like you have two shells running, and the first one is talking all your resources. Do a jps and kill the other guy, then try again. By the way, you can look at http://localhost:8080 (replace localhost with the server your Spark Master is running on) to see what applications are currently started, and what resource allocations they have. On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks again. Run results here: https://gist.github.com/rjurney/dc0efae486ba7d55b7d5 This time I get a port already in use exception on 4040, but it isn't fatal. Then when I run rdd.first, I get this over and over: 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson ilike...@gmail.com wrote: You can avoid that by using the constructor that takes a SparkConf, a la val conf = new SparkConf() conf.setJars(avro.jar, ...) val sc = new SparkContext(conf) On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney russell.jur
Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file
Yes. On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So in summary: - As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default. - There is an open JIRA issue to add an option to allow clobbering. - Even when clobbering, part- files may be left over from previous saves, which is dangerous. Is this correct? On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson ilike...@gmail.com wrote: +1 please re-add this feature On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com wrote: Thanks for pointing that out. I've assigned you to SPARK-1677 (I think I accidentally assigned myself way back when I created it). This should be an easy fix. On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Patrick, I think https://issues.apache.org/jira/browse/SPARK-1677 is talking about the same thing? How about assigning it to me? I think I missed the configuration part in my previous commit, though I declared that in the PR description Best, -- Nan Zhu On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote: Hey There, The issue was that the old behavior could cause users to silently overwrite data, which is pretty bad, so to be conservative we decided to enforce the same checks that Hadoop does. This was documented by this JIRA: https://issues.apache.org/jira/browse/SPARK-1100 https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1 However, it would be very easy to add an option that allows preserving the old behavior. Is anyone here interested in contributing that? I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-1993 - Patrick On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans pierre.borckm...@realimpactanalytics.com wrote: Indeed, the behavior has changed for good or for bad. I mean, I agree with the danger you mention but I'm not sure it's happening like that. Isn't there a mechanism for overwrite in Hadoop that automatically removes part files, then writes a _temporary folder and then only the part files along with the _success folder. In any case this change of behavior should be documented IMO. Cheers Pierre Message sent from a mobile device - excuse typos and abbreviations Le 2 juin 2014 à 17:42, Nicholas Chammas nicholas.cham...@gmail.com a écrit : What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is that files get overwritten automatically. This is one danger to this though. If I save to a directory that already has 20 part- files, but this time around I'm only saving 15 part- files, then there will be 5 leftover part- files from the previous set mixed in with the 15 newer files. This is potentially dangerous. I haven't checked to see if this behavior has changed in 1.0.0. Are you saying it has, Pierre? On Mon, Jun 2, 2014 at 9:41 AM, Pierre B [pierre.borckm...@realimpactanalytics.com](mailto: pierre.borckm...@realimpactanalytics.com) wrote: Hi Michaël, Thanks for this. We could indeed do that. But I guess the question is more about the change of behaviour from 0.9.1 to 1.0.0. We never had to care about that in previous versions. Does that mean we have to manually remove existing files or is there a way to aumotically overwrite when using saveAsTextFile? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using Spark on Data size larger than Memory size
There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many instances of this left (sorting comes to mind, and this is being worked on). In general, one problem with Spark today is that you *can* OOM under certain configurations, and it's possible you'll need to change from the default configuration if you're using doing very memory-intensive jobs. However, there are very few cases where Spark would simply fail as a matter of course *-- *for instance, you can always increase the number of partitions to decrease the size of any given one. or repartition data to eliminate skew. Regarding impact on performance, as Mayur said, there may absolutely be an impact depending on your jobs. If you're doing a join on a very large amount of data with few partitions, then we'll have to spill to disk. If you can't cache your working set of data in memory, you will also see a performance degradation. Spark enables the use of memory to make things fast, but if you just don't have enough memory, it won't be terribly fast. On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: Akka disassociation on Java SE Embedded
Thanks for the update! I've also run into the block manager timeout issue, it might be a good idea to increase the default significantly (it would probably timeout immediately if the TCP connection itself dropped anyway). On Sun, Jun 1, 2014 at 9:48 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, This is what I found: 1. Like Aaron suggested, an executor will be killed silently when the OS's memory is running out. I've found this many times to conclude this it's real. Adding swap and increasing the JVM heap solved the problem, but you will encounter OS paging out and full GC. 2. OS paging out and full GC are not likely to affect my benchmark much while processing data from HDFS. But Akka process's randomly killed during the network-related stage (for example, sorting). I've found that an Akka process cannot fetch the result fast enough. Increasing the block manager timeout helped a lot. I've doubled the value many times as the network of our ARM cluster is quite slow. 3. We'd like to collect times spent for all stages of our benchmark. So we always re-run when some tasks failed. Failure happened a lot but it's understandable as Spark is designed on top of Akka's let-it-crash philosophy. To make the benchmark run more perfectly (without a task failure), I called .cache() before calling the transformation of the next stage. And it helped a lot. Combined above and others tuning, we can now boost the performance of our ARM cluster to 2.8 times faster than our first report. Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit On Wed, May 28, 2014 at 1:13 AM, Chanwit Kaewkasi chan...@gmail.com wrote: May be that's explaining mine too. Thank you very much, Aaron !! Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit On Wed, May 28, 2014 at 12:47 AM, Aaron Davidson ilike...@gmail.com wrote: Spark should effectively turn Akka's failure detector off, because we historically had problems with GCs and other issues causing disassociations. The only thing that should cause these messages nowadays is if the TCP connection (which Akka sustains between Actor Systems on different machines) actually drops. TCP connections are pretty resilient, so one common cause of this is actual Executor failure -- recently, I have experienced a similar-sounding problem due to my machine's OOM killer terminating my Executors, such that they didn't produce any error output. On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi chan...@gmail.com wrote: Hi all, On an ARM cluster, I have been testing a wordcount program with JRE 7 and everything is OK. But when changing to the embedded version of Java SE (Oracle's eJRE), the same program cannot complete all computing stages. It is failed by many Akka's disassociation. - I've been trying to increase Akka's timeout but still stuck. I am not sure what is the right way to do so? (I suspected that GC pausing the world is causing this). - Another question is that how could I properly turn on Akka's logging to see what's the root cause of this disassociation problem? (If my guess about GC is wrong). Best regards, -chanwit -- Chanwit Kaewkasi linkedin.com/in/chanwit
Re: hadoopRDD stalls reading entire directory
Gotcha. The easiest way to get your dependencies to your Executors would probably be to construct your SparkContext with all necessary jars passed in (as the jars parameter), or inside a SparkConf with setJars(). Avro is a necessary jar, but it's possible your application also needs to distribute other ones to the cluster. An easy way to make sure all your dependencies get shipped to the cluster is to create an assembly jar of your application, and then you just need to tell Spark about that jar, which includes all your application's transitive dependencies. Maven and sbt both have pretty straightforward ways of producing assembly jars. On Sat, May 31, 2014 at 11:23 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks for the fast reply. I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in standalone mode. On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote: First issue was because your cluster was configured incorrectly. You could probably read 1 file because that was done on the driver node, but when it tried to run a job on the cluster, it failed. Second issue, it seems that the jar containing avro is not getting propagated to the Executors. What version of Spark are you running on? What deployment mode (YARN, standalone, Mesos)? On Sat, May 31, 2014 at 9:37 PM, Russell Jurney russell.jur...@gmail.com wrote: Now I get this: scala rdd.first 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at console:41) with 1 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at console:41, took 0.037371256 s 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at console:41) with 16 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37) 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 16 tasks 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0 as TID 92 on executor 2: hivecluster3 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:0 as 1294 bytes in 1 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3 as TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:3 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1 as TID 94 on executor 4: hivecluster4 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:1 as 1294 bytes in 1 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:2 as TID 95 on executor 0: hivecluster6.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:2 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:4 as TID 96 on executor 3: hivecluster1.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:4 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:6 as TID 97 on executor 2: hivecluster3 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:6 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:5 as TID 98 on executor 1: hivecluster5.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:5 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:8 as TID 99 on executor 4: hivecluster4 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:8 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:7 as TID 100 on executor 0: hivecluster6.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager
Re: hadoopRDD stalls reading entire directory
You can avoid that by using the constructor that takes a SparkConf, a la val conf = new SparkConf() conf.setJars(avro.jar, ...) val sc = new SparkContext(conf) On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney russell.jur...@gmail.com wrote: Followup question: the docs to make a new SparkContext require that I know where $SPARK_HOME is. However, I have no idea. Any idea where that might be? On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson ilike...@gmail.com wrote: Gotcha. The easiest way to get your dependencies to your Executors would probably be to construct your SparkContext with all necessary jars passed in (as the jars parameter), or inside a SparkConf with setJars(). Avro is a necessary jar, but it's possible your application also needs to distribute other ones to the cluster. An easy way to make sure all your dependencies get shipped to the cluster is to create an assembly jar of your application, and then you just need to tell Spark about that jar, which includes all your application's transitive dependencies. Maven and sbt both have pretty straightforward ways of producing assembly jars. On Sat, May 31, 2014 at 11:23 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks for the fast reply. I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in standalone mode. On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote: First issue was because your cluster was configured incorrectly. You could probably read 1 file because that was done on the driver node, but when it tried to run a job on the cluster, it failed. Second issue, it seems that the jar containing avro is not getting propagated to the Executors. What version of Spark are you running on? What deployment mode (YARN, standalone, Mesos)? On Sat, May 31, 2014 at 9:37 PM, Russell Jurney russell.jur...@gmail.com wrote: Now I get this: scala rdd.first 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at console:41) with 1 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at console:41, took 0.037371256 s 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at console:41) with 16 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37) 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 16 tasks 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0 as TID 92 on executor 2: hivecluster3 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:0 as 1294 bytes in 1 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3 as TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:3 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1 as TID 94 on executor 4: hivecluster4 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:1 as 1294 bytes in 1 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:2 as TID 95 on executor 0: hivecluster6.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:2 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:4 as TID 96 on executor 3: hivecluster1.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:4 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:6 as TID 97 on executor 2: hivecluster3 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:6 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:5 as TID 98 on executor 1: hivecluster5.labs.lan (NODE_LOCAL) 14/05/31 21
Re: hadoopRDD stalls reading entire directory
Sounds like you have two shells running, and the first one is talking all your resources. Do a jps and kill the other guy, then try again. By the way, you can look at http://localhost:8080 (replace localhost with the server your Spark Master is running on) to see what applications are currently started, and what resource allocations they have. On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks again. Run results here: https://gist.github.com/rjurney/dc0efae486ba7d55b7d5 This time I get a port already in use exception on 4040, but it isn't fatal. Then when I run rdd.first, I get this over and over: 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson ilike...@gmail.com wrote: You can avoid that by using the constructor that takes a SparkConf, a la val conf = new SparkConf() conf.setJars(avro.jar, ...) val sc = new SparkContext(conf) On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney russell.jur...@gmail.com wrote: Followup question: the docs to make a new SparkContext require that I know where $SPARK_HOME is. However, I have no idea. Any idea where that might be? On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson ilike...@gmail.com wrote: Gotcha. The easiest way to get your dependencies to your Executors would probably be to construct your SparkContext with all necessary jars passed in (as the jars parameter), or inside a SparkConf with setJars(). Avro is a necessary jar, but it's possible your application also needs to distribute other ones to the cluster. An easy way to make sure all your dependencies get shipped to the cluster is to create an assembly jar of your application, and then you just need to tell Spark about that jar, which includes all your application's transitive dependencies. Maven and sbt both have pretty straightforward ways of producing assembly jars. On Sat, May 31, 2014 at 11:23 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks for the fast reply. I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in standalone mode. On Saturday, May 31, 2014, Aaron Davidson ilike...@gmail.com wrote: First issue was because your cluster was configured incorrectly. You could probably read 1 file because that was done on the driver node, but when it tried to run a job on the cluster, it failed. Second issue, it seems that the jar containing avro is not getting propagated to the Executors. What version of Spark are you running on? What deployment mode (YARN, standalone, Mesos)? On Sat, May 31, 2014 at 9:37 PM, Russell Jurney russell.jur...@gmail.com wrote: Now I get this: scala rdd.first 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 4 (first at console:41) with 1 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 4 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Computing the requested partition locally 14/05/31 21:36:28 INFO rdd.HadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-0.avro:0+3864 14/05/31 21:36:28 INFO spark.SparkContext: Job finished: first at console:41, took 0.037371256 s 14/05/31 21:36:28 INFO spark.SparkContext: Starting job: first at console:41 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Got job 5 (first at console:41) with 16 output partitions (allowLocal=true) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Final stage: Stage 5 (first at console:41) 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Missing parents: List() 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting Stage 5 (HadoopRDD[0] at hadoopRDD at console:37), which has no missing parents 14/05/31 21:36:28 INFO scheduler.DAGScheduler: Submitting 16 missing tasks from Stage 5 (HadoopRDD[0] at hadoopRDD at console:37) 14/05/31 21:36:28 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 16 tasks 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:0 as TID 92 on executor 2: hivecluster3 (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:0 as 1294 bytes in 1 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:3 as TID 93 on executor 1: hivecluster5.labs.lan (NODE_LOCAL) 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Serialized task 5.0:3 as 1294 bytes in 0 ms 14/05/31 21:36:28 INFO scheduler.TaskSetManager: Starting task 5.0:1 as TID 94 on executor 4: hivecluster4
Re: Can anyone help me set memory for standalone cluster?
In addition to setting the Standalone memory, you'll also need to tell your SparkContext to claim the extra resources. Set spark.executor.memory to 1600m as well. This should be a system property set in SPARK_JAVA_OPTS in conf/spark-env.sh (in 0.9.1, which you appear to be using) -- e.g., export SPARK_JAVA_OPTS=-Dspark.executor.memory=1600mb On Sun, Jun 1, 2014 at 7:36 PM, Yunmeng Ban banyunm...@gmail.com wrote: Hi, I'm running the example of JavaKafkaWordCount in a standalone cluster. I want to set 1600MB memory for each slave node. I wrote in the spark/conf/spark-env.sh SPARK_WORKER_MEMORY=1600m But the logs on slave nodes looks this: Spark Executor Command: /usr/java/latest/bin/java -cp :/~path/spark/conf:/~path/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend The memory seems to be the default number, not 1600M. I don't how to make SPARK_WORKER_MEMORY work. Can anyone help me? Many thanks in advance. Yunmeng
Re: Spark 1.0.0 - Java 8
Also, the Spark examples can run out of the box on a single machine, as well as a cluster. See the Master URLs heading here: http://spark.apache.org/docs/latest/submitting-applications.html#master-urls On Fri, May 30, 2014 at 9:24 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: With respect to virtual hosts, my team uses Vagrant/Virtualbox. We have 3 CentOS VMs with 4 GB RAM each - 2 worker nodes and a master node. Everything works fine, though if you are using MapR, you have to make sure they are all on the same subnet. -Suren On Fri, May 30, 2014 at 12:20 PM, Upender Nimbekar upent...@gmail.com wrote: Great News ! I've been awaiting this release to start doing some coding with Spark using Java 8. Can I run Spark 1.0 examples on a virtual host with 16 GB ram and fair descent amount of hard disk ? Or do I reaaly need to use a cluster of machines. Second, are there any good exmaples of using MLIB on Spark. Please shoot me in the right direction. Thanks Upender On Fri, May 30, 2014 at 6:12 AM, Patrick Wendell pwend...@gmail.com wrote: I'm thrilled to announce the availability of Spark 1.0.0! Spark 1.0.0 is a milestone release as the first in the 1.0 line of releases, providing API stability for Spark's core interfaces. Spark 1.0.0 is Spark's largest release ever, with contributions from 117 developers. I'd like to thank everyone involved in this release - it was truly a community effort with fixes, features, and optimizations contributed from dozens of organizations. This release expands Spark's standard libraries, introducing a new SQL package (SparkSQL) which lets users integrate SQL queries into existing Spark workflows. MLlib, Spark's machine learning library, is expanded with sparse vector support and several new algorithms. The GraphX and Streaming libraries also introduce new features and optimizations. Spark's core engine adds support for secured YARN clusters, a unified tool for submitting Spark applications, and several performance and stability improvements. Finally, Spark adds support for Java 8 lambda syntax and improves coverage of the Java and Python API's. Those features only scratch the surface - check out the release notes here: http://spark.apache.org/releases/spark-release-1-0-0.html Note that since release artifacts were posted recently, certain mirrors may not have working downloads for a few hours. - Patrick -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: access hdfs file name in map()
Currently there is not a way to do this using textFile(). However, you could pretty straightforwardly define your own subclass of HadoopRDD [1] in order to get access to this information (likely using mapPartitionsWithIndex to look up the InputSplit for a particular partition). Note that sc.textFile() is just a convenience function to construct a new HadoopRDD [2]. [1] HadoopRDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L93 [2] sc.textFile(): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L456 On Thu, May 29, 2014 at 7:49 PM, Xu (Simon) Chen xche...@gmail.com wrote: Hello, A quick question about using spark to parse text-format CSV files stored on hdfs. I have something very simple: sc.textFile(hdfs://test/path/*).map(line = line.split(,)).map(p = (XXX, p[0], p[2])) Here, I want to replace XXX with a string, which is the current csv filename for the line. This is needed since some information may be encoded in the file name, like date. In hive, I am able to define an external table and use INPUT__FILE__NAME as a column in queries. I wonder if spark has something similar. Thanks! -Simon