Re: Spark SQL. Memory consumption
Using large memory for executors (*--executor-memory 120g*). Not really a good advice. On Thu, Apr 2, 2015 at 9:17 AM, Cheng, Hao hao.ch...@intel.com wrote: Spark SQL tries to load the entire partition data and organized as In-Memory HashMaps, it does eat large memory if there are not many duplicated group by keys with large amount of records; Couple of things you can try case by case: ·Increasing the partition numbers (the records count in each partition will reduce) ·Using large memory for executors (*--executor-memory 120g*). ·Reduce the SPARK COREs (to reduce the parallel running threads) We are trying to approve that by using the sort-merge aggregation, which should reduce the memory utilization significantly, but that’s still on going. Cheng Hao *From:* Masf [mailto:masfwo...@gmail.com] *Sent:* Thursday, April 2, 2015 11:47 PM *To:* user@spark.apache.org *Subject:* Spark SQL. Memory consumption Hi. I'm using Spark SQL 1.2. I have this query: CREATE TABLE test_MA STORED AS PARQUET AS SELECT field1 ,field2 ,field3 ,field4 ,field5 ,COUNT(1) AS field6 ,MAX(field7) ,MIN(field8) ,SUM(field9 / 100) ,COUNT(field10) ,SUM(IF(field11 -500, 1, 0)) ,MAX(field12) ,SUM(IF(field13 = 1, 1, 0)) ,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0)) ,SUM(IF(field13 = 2012 , 1, 0)) ,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0)) FROM table1 CL JOIN table2 netw ON CL.field15 = netw.id WHERE AND field3 IS NOT NULL AND field4 IS NOT NULL AND field5 IS NOT NULL GROUP BY field1,field2,field3,field4, netw.field5 spark-submit --master spark://master:7077 *--driver-memory 20g --executor-memory 60g* --class GMain project_2.10-1.0.jar --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' 2 ./error Input data is 8GB in parquet format. Many times crash by * GC overhead*. I've fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB RAM/node) is collapsed. *Is it a query too difficult to Spark SQL? * *Would It be better to do it in Spark?* *Am I doing something wrong?* Thanks -- Regards. Miguel Ángel
Re: Running out of space (when there's no shortage)
Usually it happens in Linux when application deletes file w/o double checking that there are no open FDs (resource leak). In this case, Linux holds all space allocated and does not release it until application exits (crashes in your case). You check file system and everything is normal, you have enough space and you have no idea why does application report no space left on device. Just a guess. -Vladimir Rodionov On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote: I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40
Re: GC Issues with randomSplit on large dataset
GC limit overhead exceeded is usually sign of either inadequate heap size (not the case here) or application produces garbage (temp objects) faster than garbage collector collects them - GC consumes most CPU cycles. 17G of Java heap is quite large for many application and is above safe and recommended limit (6-8GB) for Java server application. From what I saw in the stack trace I can conclude that some operations in RDD implementation are heap polluters. I am not the expert in Spark but it seems that Spark is not well optimized yet to work with reasonably large Java heaps. One of the options here is try to reduce JVM heap size and reduce data size per JVM instance. -Vladimir Rodionov On Thu, Oct 30, 2014 at 5:22 AM, Ilya Ganelin ilgan...@gmail.com wrote: The split is something like 30 million into 2 milion partitions. The reason that it becomes tractable is that after I perform the Cartesian on the split data and operate on it I don't keep the full results - I actually only keep a tiny fraction of that generated dataset - making the overall dataset tractable ( I neglected to mention this in the first email). The way the code is structured I have forced linear execution until this point so at the time of execution of the split it is the only thing happening. In terms of memory I have assigned 23gb of memory and 17gb of heap. On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote: Can you be more specific about numbers? I am not sure that splitting helps so much in the end, in that it has the same effect as executing a smaller number at a time of the large number of tasks that the full cartesian join would generate. The full join is probably intractable no matter what in this case? The OOM is not necessarily directly related. It depends on where it happened, what else you are doing, how much memory you gave, etc. On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format
Re: Reading from HBase is too slow
Using TableInputFormat is not the fastest way of reading data from HBase. Do not expect 100s of Mb per sec. You probably should take a look at M/R over HBase snapshots. https://issues.apache.org/jira/browse/HBASE-8369 -Vladimir Rodionov On Wed, Oct 1, 2014 at 8:17 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I can submit a MapReduce job reading that table, although its processing rate is also a litter slower than I expected, but not that slow as Spark.
Re: Spark And Mapr
There is doc on MapR: http://doc.mapr.com/display/MapR/Accessing+MapR-FS+in+Java+Applications -Vladimir Rodionov On Wed, Oct 1, 2014 at 3:00 PM, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi We were using Horton 2.4.1 as our Hadoop distribution and now switched to MapR Previously to read a text file we would use : test = sc.textFile(\hdfs://10.48.101.111:8020/user/hdfs/test\ http://10.48.101.111:8020/user/hdfs/test%5C) What would be the equivalent of the same for Mapr. Best Regards Santosh
Re: Reading from HBase is too slow
HBase TableInputFormat creates input splits one per each region. You can not achieve high level of parallelism unless you have 5-10 regions per RS at least. What does it mean? You probably have too few regions. You can verify that in HBase Web UI. -Vladimir Rodionov On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I submitted a job in Yarn-Client mode, which simply reads from a HBase table containing tens of millions of records and then does a *count *action. The job runs for a much longer time than I expected, so I wonder whether it was because the data to read was too much. Actually, there are 20 nodes in my Hadoop cluster so the HBase table seems not so big (tens of millopns of records). : I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96). BTW, when the job was running, I can see logs on the console, and specifically I'd like to know what the following log means: 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL) 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as 13454 bytes in 0 ms 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426 ms on b04.jsepc.com (progress: 18/86) 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19) Thanks
Spark caching questions
Hi, users 1. Disk based cache eviction policy? The same LRU? 2. What is the scope of a cached RDD? Does it survive application? What happen if I run Java app next time? Will RRD be created or read from cache? If , answer is YES, then ... 3. Is there are any way to invalidate cached RDD automatically? RDD partitions? Some API kind of : RDD.isValid()? 4. HadoopRDD InputFormat - based. Some partitions (splits) may become invalid in cache. Can we reload only those partitions? Into cache? -Vladimir