Re: Spark SQL. Memory consumption

2015-04-02 Thread Vladimir Rodionov
 Using large memory for executors (*--executor-memory 120g*).

Not really a good advice.

On Thu, Apr 2, 2015 at 9:17 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Spark SQL tries to load the entire partition data and organized as
 In-Memory HashMaps, it does eat large memory if there are not many
 duplicated group by keys with large amount of records;



 Couple of things you can try case by case:

 ·Increasing the partition numbers (the records count in each
 partition will reduce)

 ·Using large memory for executors (*--executor-memory 120g*).

 ·Reduce the SPARK COREs (to reduce the parallel running threads)



 We are trying to approve that by using the sort-merge aggregation, which
 should reduce the memory utilization significantly, but that’s still on
 going.



 Cheng Hao



 *From:* Masf [mailto:masfwo...@gmail.com]
 *Sent:* Thursday, April 2, 2015 11:47 PM
 *To:* user@spark.apache.org
 *Subject:* Spark SQL. Memory consumption




  Hi.



 I'm using Spark SQL 1.2. I have this query:



 CREATE TABLE test_MA STORED AS PARQUET AS

  SELECT

 field1

 ,field2

 ,field3

 ,field4

 ,field5

 ,COUNT(1) AS field6

 ,MAX(field7)

 ,MIN(field8)

 ,SUM(field9 / 100)

 ,COUNT(field10)

 ,SUM(IF(field11  -500, 1, 0))

 ,MAX(field12)

 ,SUM(IF(field13 = 1, 1, 0))

 ,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0))

 ,SUM(IF(field13 = 2012 , 1, 0))

 ,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0))



 FROM table1 CL

JOIN table2 netw

 ON CL.field15 = netw.id

 WHERE

 AND field3 IS NOT NULL

 AND field4 IS NOT NULL

 AND field5 IS NOT NULL

 GROUP BY field1,field2,field3,field4, netw.field5





 spark-submit --master spark://master:7077 *--driver-memory 20g
 --executor-memory 60g* --class GMain project_2.10-1.0.jar
 --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 --driver-java-options
 '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 2 ./error





 Input data is 8GB in parquet format. Many times crash by * GC overhead*.
 I've fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB
 RAM/node) is collapsed.



 *Is it a query too difficult to Spark SQL? *

 *Would It be better to do it in Spark?*

 *Am I doing something wrong?*





 Thanks

 --




 Regards.
 Miguel Ángel



Re: Running out of space (when there's no shortage)

2015-02-24 Thread Vladimir Rodionov
Usually it happens in Linux when application deletes file w/o double
checking that there are no open FDs (resource leak). In this case, Linux
holds all space allocated and does not release it until application exits
(crashes in your case). You check file system and everything is normal, you
have enough space and you have no idea why does application report no
space left on device.

Just a guess.

-Vladimir Rodionov

On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote:

 I'm running a cluster of 3 Amazon EC2 machines (small number because it's
 expensive when experiments keep crashing after a day!).

 Today's crash looks like this (stacktrace at end of message).
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 On my three nodes, I have plenty of space and inodes:

 A $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97937  426351   19% /
 tmpfs1909200   1 19091991% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds831869296   23844 8318454521% /vol0

 A $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  802G  199G  81% /vol0

 B $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97947  426341   19% /
 tmpfs1906639   1 19066381% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds816200704   24223 8161764811% /vol0

 B $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.6G  4.3G  46% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  805G  195G  81% /vol0

 C $df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97938  426350   19% /
 tmpfs1906897   1 19068961% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds755218352   24024 7551943281% /vol0
 root@ip-10-204-136-223 ~]$

 C $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  820G  181G  82% /vol0

 The devices may be ~80% full but that still leaves ~200G free on each. My
 spark-env.sh has

 export SPARK_LOCAL_DIRS=/vol0/spark

 I have manually verified that on each slave the only temporary files are
 stored on /vol0, all looking something like this


 /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884

 So it looks like all the files are being stored on the large drives
 (incidentally they're AWS EBS volumes, but that's the only way to get
 enough storage). My process crashed before with a slightly different
 exception under the same circumstances: kryo.KryoException:
 java.io.IOException: No space left on device

 These both happen after several hours and several GB of temporary files.

 Why does Spark think it's run out of space?

 TIA

 Joe

 Stack trace 1:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
 at
 org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
 at
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40

Re: GC Issues with randomSplit on large dataset

2014-10-30 Thread Vladimir Rodionov
GC limit overhead exceeded is usually sign of either inadequate heap size
(not the case here) or application produces garbage (temp objects) faster
than garbage collector collects them - GC consumes most CPU cycles. 17G of
Java heap is quite large for many application and is above safe and
recommended limit (6-8GB) for Java server application.

From what I saw in the stack trace I can conclude that some operations in
RDD implementation are heap polluters. I am not the expert in Spark but it
seems that Spark is not well optimized yet to work with reasonably large
Java heaps.

One of the options here is try to reduce JVM heap size and reduce data size
per JVM instance.

-Vladimir Rodionov



On Thu, Oct 30, 2014 at 5:22 AM, Ilya Ganelin ilgan...@gmail.com wrote:

 The split is something like 30 million into 2 milion partitions. The
 reason that it becomes tractable is that after I perform the Cartesian on
 the split data and operate on it I don't keep the full results - I actually
 only keep a tiny fraction of that generated dataset - making the overall
 dataset tractable ( I neglected to mention this in the first email).

 The way the code is structured I have forced linear execution until this
 point so at the time of execution of the split it is the only thing
 happening. In terms of memory I have assigned 23gb of memory and 17gb of
 heap.
 On Oct 30, 2014 3:32 AM, Sean Owen so...@cloudera.com wrote:

 Can you be more specific about numbers?
 I am not sure that splitting helps so much in the end, in that it has
 the same effect as executing a smaller number at a time of the large
 number of tasks that the full cartesian join would generate.
 The full join is probably intractable no matter what in this case?
 The OOM is not necessarily directly related. It depends on where it
 happened, what else you are doing, how much memory you gave, etc.

 On Thu, Oct 30, 2014 at 3:29 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hey all – not writing to necessarily get a fix but more to get an
  understanding of what’s going on internally here.
 
  I wish to take a cross-product of two very large RDDs (using
 cartesian), the
  product of which is well in excess of what can be stored on disk .
 Clearly
  that is intractable, thus my solution is to do things in batches -
  essentially I can take the cross product of a small piece of the first
 data
  set with the entirety of the other. To do this, I calculate how many
 items
  can fit into 1 gig of memory. Next, I use RDD.random Split() to
 partition
  the first data set. The issue is that I am trying to partition an RDD of
  several million items into several million partitions. This throws the
  following error:
 
  I would like to understand the internals of what’s going on here so
 that I
  can adjust my approach accordingly. Thanks in advance.
 
 
  14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from
 thread
  [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem
  [sparkDriver]
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
  at
 akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
  at
 
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
  at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
  at
 
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  Exception in thread main java.lang.OutOfMemoryError: GC overhead limit
  exceeded
  at java.util.Arrays.copyOfRange(Arrays.java:2694)
  at java.lang.String.init(String.java:203)
  at java.lang.String.substring(String.java:1913)
  at java.lang.String.subSequence(String.java:1946)
  at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
  at java.util.regex.Matcher.group(Matcher.java:490)
  at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
  at java.util.Formatter.parse(Formatter.java:2528)
  at java.util.Formatter.format(Formatter.java:2469)
  at java.util.Formatter.format(Formatter.java:2423)
  at java.lang.String.format(String.java:2790)
  at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
  at scala.collection.immutable.StringOps.format

Re: Reading from HBase is too slow

2014-10-01 Thread Vladimir Rodionov
Using TableInputFormat is not the fastest way of reading data from HBase.
Do not expect 100s of Mb per sec. You probably should take a look at M/R
over HBase snapshots.

https://issues.apache.org/jira/browse/HBASE-8369

-Vladimir Rodionov

On Wed, Oct 1, 2014 at 8:17 AM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I can submit a MapReduce job reading that table, although its processing
 rate is also a litter slower than I expected, but not that slow as Spark.





Re: Spark And Mapr

2014-10-01 Thread Vladimir Rodionov
There is doc on MapR:

http://doc.mapr.com/display/MapR/Accessing+MapR-FS+in+Java+Applications

-Vladimir Rodionov

On Wed, Oct 1, 2014 at 3:00 PM, Addanki, Santosh Kumar 
santosh.kumar.adda...@sap.com wrote:

  Hi



 We were using Horton 2.4.1 as our Hadoop distribution and now switched to
 MapR



 Previously to read a text file  we would use :



 test = sc.textFile(\hdfs://10.48.101.111:8020/user/hdfs/test\
 http://10.48.101.111:8020/user/hdfs/test%5C)





 What would be the equivalent of the same for Mapr.



 Best Regards

 Santosh



Re: Reading from HBase is too slow

2014-09-29 Thread Vladimir Rodionov
HBase TableInputFormat creates input splits one per each region. You can
not achieve high level of parallelism unless you have 5-10 regions per RS
at least. What does it mean? You probably have too few regions. You can
verify that in HBase Web UI.

-Vladimir Rodionov

On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao xiaotao.cs@gmail.com wrote:

 I submitted a job in Yarn-Client mode, which simply reads from a HBase
 table containing tens of millions of records and then does a *count *action.
 The job runs for a much longer time than I expected, so I wonder whether it
 was because the data to read was too much. Actually, there are 20 nodes in
 my Hadoop cluster so the HBase table seems not so big (tens of millopns of
 records). :

 I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).

 BTW, when the job was running, I can see logs on the console, and
 specifically I'd like to know what the following log means:

 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
 TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as
 13454 bytes in 0 ms
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
 ms on b04.jsepc.com (progress: 18/86)
 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)


 Thanks



Spark caching questions

2014-09-09 Thread Vladimir Rodionov
Hi, users

1. Disk based cache eviction policy? The same LRU?

2. What is the scope of a cached RDD? Does it survive application? What
happen if I run Java app next time? Will RRD be created or read from cache?

If , answer is YES, then ...


3. Is there are any way to invalidate cached RDD automatically? RDD
partitions? Some API kind of : RDD.isValid()?

4. HadoopRDD InputFormat - based. Some partitions (splits) may become
invalid in cache. Can we reload only those partitions? Into cache?

-Vladimir