Hi,

Trying to do a join operation on an RDD, my input is pipe delimited data
and there are 2 files.
One file is 24MB and the other file is 285MB.
Setup being used is the single node (server) setup: SPARK_MEM set to 512m

Master
/pkg/java/jdk1.7.0_11/bin/java -cp
:/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-Dspark.boundedMemoryCache.memoryFraction=0.4
-Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
-Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.master.Master --ip localhost --port 7077
--webui-port 8080

Worker
/pkg/java/jdk1.7.0_11/bin/java -cp
:/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-Dspark.boundedMemoryCache.memoryFraction=0.4
-Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
-Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://localhost:7077


App
/pkg/java/jdk1.7.0_11/bin/java -cp
:/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-Dspark.boundedMemoryCache.memoryFraction=0.4
-Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
-Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


Here is the code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel

object SimpleApp {

      def main (args: Array[String]) {


System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp");
      System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
      System.setProperty("spark.akka.timeout", "30")  //in seconds

      val dataFile2 = "/tmp_data/data1.txt"
      val dataFile1 = "/tmp_data/data2.txt"
      val sc = new SparkContext("spark://localhost:7077", "Simple App",
"/spark-0.8.0-incubating-bin-cdh4",
      List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))

      val data10 = sc.textFile(dataFile1, 128)
      val data11 = data10.map(x => x.split("|"))
      val data12 = data11.map( x  =>  (x(1).toInt -> x) )


      val data20 = sc.textFile(dataFile2, 128)
      val data21 = data20.map(x => x.split("|"))
      val data22 = data21.map(x => (x(1).toInt -> x))


      val data3 = data12.join(data22, 128)
      val data4 = data3.distinct(4)
      val numAs = data10.count()
      val numBs = data20.count()
      val numCs = data3.count()
      val numDs = data4.count()
      println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4:
%s".format(numAs, numBs, numCs, numDs))
      data4.foreach(println)
}

I see the following errors
13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message to
BlockManagerMaster in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [10000]
milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)

and
13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
java.lang.OutOfMemoryError: Java heap space
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106)
        at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

Lots of hem actually...


To give some additional information, i just added single columns in both
files and passed them through this program and encountered the same issue.
Out of memory and other errors.

What did work was removal of the following lines:

      val data21 = data20.map(x => x.split("|"))
      val data22 = data21.map(x => (x(1).toInt -> x))

which were replaced by:
      val data22 = data20.map(x => (x.toInt -> x))

However as soon as i add additional columns this is of-course not going to
work.
So can someone explain this and any suggestions are most welcome.
Any help is helpful.
Thanks

Reply via email to