[jira] [Commented] (SPARK-21140) Reduce collect high memory requrements

2017-06-19 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054055#comment-16054055
 ] 

Sean Owen commented on SPARK-21140:
---

Yes, it's possible the executor makes a copy of some data during processing. 
Given overhead of serializing data and merging intermediate buffers, it could 
be largeish.
This isn't a very minimal example, and it doesn't establish that something runs 
out of memory.
There is also no proposal here about what it is that could be done differently, 
or leads about where memory is being allocated a lot: serialization? 
I don't think this is actionable as is.

> Reduce collect high memory requrements
> --
>
> Key: SPARK-21140
> URL: https://issues.apache.org/jira/browse/SPARK-21140
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.1.1
> Environment: Linux Debian 8 using hadoop 2.7.2.
>Reporter: michael procopio
>
> I wrote a very simple Scala application which used flatMap to create an RDD 
> containing a 512 mb partition of 256 byte arrays.  Experimentally, I 
> determined that spark.executor.memory had to be set at 3 gb in order to 
> colledt the data.  This seems extremely high.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21140) Reduce collect high memory requrements

2017-06-19 Thread michael procopio (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054043#comment-16054043
 ] 

michael procopio commented on SPARK-21140:
--

I disagree executor memory does depend on the size of the partition being 
collected.  A 6 to 1 ratio to collect  data seems onerous to me.  Seems like 
multiple copies must be created in the executor.

I found that to collect an RDD containing a single partition of 512 mb took 
3gb.  

Here's the code I was using:

package com.test

import org.apache.spark._
import org.apache.spark.SparkContext._

object SparkRdd2 {
def main(args: Array[String]) {
  try
  {
  //
  // Process any arguments.
  //
 def parseOptions( map: Map[String,Any], listArgs: List[String]): 
Map[String,Any] = {
 listArgs match {
 case Nil =>
 map
 case "-master" :: value :: tail =>
 parseOptions( map+("master"-> value),tail)
 case "-recordSize" :: value :: tail =>
 parseOptions( map+("recordSize"-> value.toInt),tail)
 case "-partitionSize" :: value :: tail  =>
 parseOptions( map+("partitionSize"-> value.toLong),tail)
 case "-executorMemory" :: value :: tail =>
 parseOptions( map+("executorMemory"-> value),tail)
 case option :: tail =>
 println("unknown option"+option)
 sys.exit(1)
 }
 }
 val listArgs = args.toList
 val optionmap = parseOptions( Map[String,Any](),listArgs)
 val master = optionmap.getOrElse("master","local").asInstanceOf[String]
 val recordSize = 
optionmap.getOrElse("recordSize",128).asInstanceOf[Int]
 val partitionSize = 
optionmap.getOrElse("partitionSize",1024*1024*1024).asInstanceOf[Long]
 val executorMemory = 
optionmap.getOrElse("executorMemory","6g").asInstanceOf[String]
 println(f"Creating single partition of $partitionSize%d with records 
of length $recordSize%d")
 println(f"Setting spark.executor.memory to $executorMemory")
 //
 // Create SparkConf.
 //
 val sparkConf = new SparkConf()
 
sparkConf.setAppName("MyEnvVar").setMaster(master).setExecutorEnv("myenvvar","good")
 sparkConf.set("spark.executor.cores","1")
 sparkConf.set("spark.executor.instances","1")
 sparkConf.set("spark.executor.memory",executorMemory)
 sparkConf.set("spark.eventLog.enabled","true")
 
sparkConf.set("spark.eventLog.dir","hdfs://hadoop01glnxa64:54310/user/mprocopi/spark-events");
 sparkConf.set("spark.driver.maxResultSize","0")
 /*
  sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer.max","768m")
  sparkConf.set("spark.kryoserializer.buffer","64k")
  */
 //
 // Create SparkContext
 //
 val sc = new SparkContext(sparkConf)
 //
 //
 //
 def createdSizedPartition( recordSize:Int ,partitionSize:Long): 
Iterator[Array[Byte]] = {
var sizeReturned:Long = 0
new Iterator[Array[Byte]] {
override def hasNext(): Boolean = {
   (sizeReturned createdSizedPartition( 
rddInfo._1, rddInfo._2))
 val results = sizedRdd.collect
 var countLines: Int = 0
 var countBytes: Long = 0
 var maxRecord: Int = 0
 for (line <- results) {
 countLines = countLines+1
 countBytes = countBytes+line.length
  if (line.length> maxRecord)
  {
 maxRecord = line.length
  }
 }
 println(f"Collected $countLines%d lines")
 println(f"  $countBytes%d bytes")
 println(f"Max record $maxRecord%d bytes")
  } catch {
   case e: Exception =>
  println("Error in executing application: ", e.getMessage)
  throw e
  }

   }
}

After building it can be invoked as:

spark-submit --class com.test.SparkRdd2 --driver-memory 10g 
./target/scala-2.11/envtest_2.11-0.0.1.jar -recordSize 256 -partitionSize 
536870912

Allows you to vary the 

> Reduce collect high memory requrements
> --
>
> Key: SPARK-21140
> URL: https://issues.apache.org/jira/browse/SPARK-21140
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.1.1
> Environment: Linux Debian 8 using hadoop 2.7.2.
>Reporter: michael procopio
>
> I wrote a very simple Scala application which used flatMap to create an RDD 
> containing a 512 mb partition of