I am storing the output of mapPartitions in a ListBuffer and exposing its
iterator as the output. The output is a list of Long tuples(Tuple2). When I
check the size of the object using Spark's SizeEstimator.estimate method it
comes out to 80 bytes per record/tuple object(calculating this by "size of
ListBuffer object/# records"). This I think is too huge for a Tuple2 object
of long type(two 8 byte longs + some object overhead memory). Any ideas why
this is so and how to reduce the memory captured by output? I am sure I am
missing something obvious.

Also, these ListBuffer object are getting too huge for memory leading to
memory and disk spills causing bad performance. Any ideas on how I can just
simply write the output of mapPartitions without storing the whole output as
an in-memory object. Each input record to mapPartitions can generate 0 or
more output records, so I think I cannot use "rdd.map" function iterator. I
am not sure even if that will help my cause.

Here is the code code snippet.

/var outputRDD = sortedRDD.mapPartitionsWithIndex((partitionNo,p) => {
          var outputList = ListBuffer[(Long,Long)]()
          var inputCnt: Long = 0;
          var outputCnt: Long = 0;
      while (p.hasNext) {
          inputCnt = inputCnt + 1;
        val tpl = p.next()
        var partitionKey = ""
        try{
          partitionKey = tpl._1.split(keyDelimiter)(0)                         
//Partition key
        }catch{
          case aob : ArrayIndexOutOfBoundsException => {
            println("segmentKey:"+partitionKey);
          }
        }  
        val value = tpl._2
        var xs: Array[Any] = value.toSeq.toArray;
        //value.copyToArray(xs);
        
        val xs_string : Array[String] = new Array[String](value.size);
        for(i <- 0 to value.size-1){
          xs_string(i) = xs(i) match { case None => ""
                                       case null => ""
                                       case _ =>  xs(i).toString()
                                     }
        }
        
        val outputTuples = windowObject.process(partitionKey, xs_string);
        
                if(outputTuples != null){
                        for (i <- 0 until outputTuples.size()) {
                                val outputRecord = outputTuples.get(i)
                                if (outputRecord != null) {
                                        outputList += 
((outputRecord.getProfileID1 ,
outputRecord.getProfileID2))
                                        outputCnt = outputCnt +1;
                                }
                        }
                }       
      }
          
          if(debugFlag.equals("DEBUG")){        
                logger.info("partitionNo:"+ partitionNo + ", input #: "+ 
inputCnt +",
output #: "+ outputCnt+", outputList object size:" +
SizeEstimator.estimate(outputList));
          }
          
          outputList.iterator
                
    }, false)/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-mapPartition-output-object-size-coming-larger-than-expected-tp28367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to