Re: Spark heap issues

2013-12-06 Thread learner1014 all
Still see a whole lot of following erros
java.lang.OutOfMemoryError: Java heap space
13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned
task 553
13/12/05 16:04:13 INFO executor.Executor: Running task ID 553

Issue seems to be that the process hangs as we are probably performing full
GC cycles...
1536.617: [Full GC 1536.617: [CMS: 707839K-707839K(707840K), 5.0507000
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs]
1541.669: [Full GC 1541.669: [CMS: 707840K-707839K(707840K), 4.5483600
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs]
1546.218: [Full GC 1546.218: [CMS: 707839K-707839K(707840K), 4.5937460
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs]
1550.812: [Full GC 1550.812: [CMS: 707839K-707839K(707840K), 5.3572370
secs] 1014527K-1014527K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs]
1556.171: [Full GC 1556.171: [CMS: 707840K-694574K(707840K), 4.1462520
secs] 1014528K-860511K(1014528K), [CMS Perm : 31955K-31955K(53572K)],
4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs]
1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K),
0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs]


I tried the following parameters and they do not seem to help
  System.setProperty(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
  System.setProperty(spark.akka.timeout, 30)  //in seconds

  System.setProperty(spark.executor.memory,15g)
  System.setProperty(spark.akka.frameSize, 2000)  //in MB
  System.setProperty(spark.akka.threads,8)

Thanks


On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal
puravaggarwal...@gmail.comwrote:

 Try allocating some more resources to your application.
 You seem to be using 512Mb for you worker node - (you can verify that from
 the master UI)

 Try putting the following settings into your code and see if it helps -

 System.setProperty(spark.executor.memory,15g)   // Will allocate more
 memory
 System.setProperty(spark.akka.frameSize,2000)
 System.setProperty(spark.akka.threads,16)   // Dependent upon
 number of cores with your worker machine


 On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote:

 Hi,

 Trying to do a join operation on an RDD, my input is pipe delimited data
 and there are 2 files.
 One file is 24MB and the other file is 285MB.
 Setup being used is the single node (server) setup: SPARK_MEM set to 512m

 Master
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.master.Master --ip localhost --port 7077
 --webui-port 8080

 Worker
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://localhost:7077


 App
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
 akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


 Here is the code
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel

 object SimpleApp {

   def main (args: Array[String]) {


 System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp);
   System.setProperty(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   System.setProperty(spark.akka.timeout, 30)  //in seconds

   val dataFile2 = /tmp_data/data1.txt
   

Re: Spark heap issues

2013-12-05 Thread purav aggarwal
Try allocating some more resources to your application.
You seem to be using 512Mb for you worker node - (you can verify that from
the master UI)

Try putting the following settings into your code and see if it helps -

System.setProperty(spark.executor.memory,15g)   // Will allocate more
memory
System.setProperty(spark.akka.frameSize,2000)
System.setProperty(spark.akka.threads,16)   // Dependent upon
number of cores with your worker machine


On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote:

 Hi,

 Trying to do a join operation on an RDD, my input is pipe delimited data
 and there are 2 files.
 One file is 24MB and the other file is 285MB.
 Setup being used is the single node (server) setup: SPARK_MEM set to 512m

 Master
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.master.Master --ip localhost --port 7077
 --webui-port 8080

 Worker
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://localhost:7077


 App
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
 akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


 Here is the code
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel

 object SimpleApp {

   def main (args: Array[String]) {


 System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp);
   System.setProperty(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   System.setProperty(spark.akka.timeout, 30)  //in seconds

   val dataFile2 = /tmp_data/data1.txt
   val dataFile1 = /tmp_data/data2.txt
   val sc = new SparkContext(spark://localhost:7077, Simple App,
 /spark-0.8.0-incubating-bin-cdh4,
   List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar))

   val data10 = sc.textFile(dataFile1, 128)
   val data11 = data10.map(x = x.split(|))
   val data12 = data11.map( x  =  (x(1).toInt - x) )


   val data20 = sc.textFile(dataFile2, 128)
   val data21 = data20.map(x = x.split(|))
   val data22 = data21.map(x = (x(1).toInt - x))


   val data3 = data12.join(data22, 128)
   val data4 = data3.distinct(4)
   val numAs = data10.count()
   val numBs = data20.count()
   val numCs = data3.count()
   val numDs = data4.count()
   println(Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4:
 %s.format(numAs, numBs, numCs, numDs))
   data4.foreach(println)
 }

 I see the following errors
 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message
 to BlockManagerMaster in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [1]
 milliseconds
 at akka.dispatch.DefaultPromise.ready(Future.scala:870)
 at akka.dispatch.DefaultPromise.result(Future.scala:874)
 at akka.dispatch.Await$.result(Future.scala:74)

 and
 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 at