[ https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054043#comment-16054043 ]
michael procopio commented on SPARK-21140: ------------------------------------------ I disagree executor memory does depend on the size of the partition being collected. A 6 to 1 ratio to collect data seems onerous to me. Seems like multiple copies must be created in the executor. I found that to collect an RDD containing a single partition of 512 mb took 3gb. Here's the code I was using: package com.test import org.apache.spark._ import org.apache.spark.SparkContext._ object SparkRdd2 { def main(args: Array[String]) { try { // // Process any arguments. // def parseOptions( map: Map[String,Any], listArgs: List[String]): Map[String,Any] = { listArgs match { case Nil => map case "-master" :: value :: tail => parseOptions( map+("master"-> value),tail) case "-recordSize" :: value :: tail => parseOptions( map+("recordSize"-> value.toInt),tail) case "-partitionSize" :: value :: tail => parseOptions( map+("partitionSize"-> value.toLong),tail) case "-executorMemory" :: value :: tail => parseOptions( map+("executorMemory"-> value),tail) case option :: tail => println("unknown option"+option) sys.exit(1) } } val listArgs = args.toList val optionmap = parseOptions( Map[String,Any](),listArgs) val master = optionmap.getOrElse("master","local").asInstanceOf[String] val recordSize = optionmap.getOrElse("recordSize",128).asInstanceOf[Int] val partitionSize = optionmap.getOrElse("partitionSize",1024*1024*1024).asInstanceOf[Long] val executorMemory = optionmap.getOrElse("executorMemory","6g").asInstanceOf[String] println(f"Creating single partition of $partitionSize%d with records of length $recordSize%d") println(f"Setting spark.executor.memory to $executorMemory") // // Create SparkConf. // val sparkConf = new SparkConf() sparkConf.setAppName("MyEnvVar").setMaster(master).setExecutorEnv("myenvvar","good") sparkConf.set("spark.executor.cores","1") sparkConf.set("spark.executor.instances","1") sparkConf.set("spark.executor.memory",executorMemory) sparkConf.set("spark.eventLog.enabled","true") sparkConf.set("spark.eventLog.dir","hdfs://hadoop01glnxa64:54310/user/mprocopi/spark-events"); sparkConf.set("spark.driver.maxResultSize","0") /* sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer.max","768m") sparkConf.set("spark.kryoserializer.buffer","64k") */ // // Create SparkContext // val sc = new SparkContext(sparkConf) // // // def createdSizedPartition( recordSize:Int ,partitionSize:Long): Iterator[Array[Byte]] = { var sizeReturned:Long = 0 new Iterator[Array[Byte]] { override def hasNext(): Boolean = { (sizeReturned<partitionSize) } override def next(): Array[Byte] = { val record = Array.fill(recordSize)(0.toByte) sizeReturned = sizeReturned+recordSize record } } } // // Off we go. // val startRdd = sc.parallelize( Array( (recordSize, partitionSize))) val sizedRdd = startRdd.flatMap ( rddInfo => createdSizedPartition( rddInfo._1, rddInfo._2)) val results = sizedRdd.collect var countLines: Int = 0 var countBytes: Long = 0 var maxRecord: Int = 0 for (line <- results) { countLines = countLines+1 countBytes = countBytes+line.length if (line.length> maxRecord) { maxRecord = line.length } } println(f"Collected $countLines%d lines") println(f" $countBytes%d bytes") println(f"Max record $maxRecord%d bytes") } catch { case e: Exception => println("Error in executing application: ", e.getMessage) throw e } } } After building it can be invoked as: spark-submit --class com.test.SparkRdd2 --driver-memory 10g ./target/scala-2.11/envtest_2.11-0.0.1.jar -recordSize 256 -partitionSize 536870912 Allows you to vary the > Reduce collect high memory requrements > -------------------------------------- > > Key: SPARK-21140 > URL: https://issues.apache.org/jira/browse/SPARK-21140 > Project: Spark > Issue Type: Improvement > Components: Input/Output > Affects Versions: 2.1.1 > Environment: Linux Debian 8 using hadoop 2.7.2. > Reporter: michael procopio > > I wrote a very simple Scala application which used flatMap to create an RDD > containing a 512 mb partition of 256 byte arrays. Experimentally, I > determined that spark.executor.memory had to be set at 3 gb in order to > colledt the data. This seems extremely high. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org