Try allocating some more resources to your application. You seem to be using 512Mb for you worker node - (you can verify that from the master UI)
Try putting the following settings into your code and see if it helps - System.setProperty("spark.executor.memory","15g") // Will allocate more memory System.setProperty("spark.akka.frameSize","2000") System.setProperty("spark.akka.threads","16") // Dependent upon number of cores with your worker machine On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all <learner1...@gmail.com>wrote: > Hi, > > Trying to do a join operation on an RDD, my input is pipe delimited data > and there are 2 files. > One file is 24MB and the other file is 285MB. > Setup being used is the single node (server) setup: SPARK_MEM set to 512m > > Master > /pkg/java/jdk1.7.0_11/bin/java -cp > :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar > -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -Dspark.boundedMemoryCache.memoryFraction=0.4 > -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC > -Djava.library.path= -Xms512m -Xmx512m > org.apache.spark.deploy.master.Master --ip localhost --port 7077 > --webui-port 8080 > > Worker > /pkg/java/jdk1.7.0_11/bin/java -cp > :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar > -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -Dspark.boundedMemoryCache.memoryFraction=0.4 > -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC > -Djava.library.path= -Xms512m -Xmx512m > org.apache.spark.deploy.worker.Worker spark://localhost:7077 > > > App > /pkg/java/jdk1.7.0_11/bin/java -cp > :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes > -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps > -Dspark.boundedMemoryCache.memoryFraction=0.4 > -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC > -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend > akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4 > > > Here is the code > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.storage.StorageLevel > > object SimpleApp { > > def main (args: Array[String]) { > > > System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp"); > System.setProperty("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > System.setProperty("spark.akka.timeout", "30") //in seconds > > val dataFile2 = "/tmp_data/data1.txt" > val dataFile1 = "/tmp_data/data2.txt" > val sc = new SparkContext("spark://localhost:7077", "Simple App", > "/spark-0.8.0-incubating-bin-cdh4", > List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar")) > > val data10 = sc.textFile(dataFile1, 128) > val data11 = data10.map(x => x.split("|")) > val data12 = data11.map( x => (x(1).toInt -> x) ) > > > val data20 = sc.textFile(dataFile2, 128) > val data21 = data20.map(x => x.split("|")) > val data22 = data21.map(x => (x(1).toInt -> x)) > > > val data3 = data12.join(data22, 128) > val data4 = data3.distinct(4) > val numAs = data10.count() > val numBs = data20.count() > val numCs = data3.count() > val numDs = data4.count() > println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4: > %s".format(numAs, numBs, numCs, numDs)) > data4.foreach(println) > } > > I see the following errors > 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message > to BlockManagerMaster in 1 attempts > java.util.concurrent.TimeoutException: Futures timed out after [10000] > milliseconds > at akka.dispatch.DefaultPromise.ready(Future.scala:870) > at akka.dispatch.DefaultPromise.result(Future.scala:874) > at akka.dispatch.Await$.result(Future.scala:74) > > and > 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517 > java.lang.OutOfMemoryError: Java heap space > at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) > at > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101) > at > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > > Lots of hem actually... > > > To give some additional information, i just added single columns in both > files and passed them through this program and encountered the same issue. > Out of memory and other errors. > > What did work was removal of the following lines: > > val data21 = data20.map(x => x.split("|")) > val data22 = data21.map(x => (x(1).toInt -> x)) > > which were replaced by: > val data22 = data20.map(x => (x.toInt -> x)) > > However as soon as i add additional columns this is of-course not going to > work. > So can someone explain this and any suggestions are most welcome. > Any help is helpful. > Thanks >