[
https://issues.apache.org/jira/browse/SPARK-21140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054043#comment-16054043
]
michael procopio commented on SPARK-21140:
--
I disagree executor memory does depend on the size of the partition being
collected. A 6 to 1 ratio to collect data seems onerous to me. Seems like
multiple copies must be created in the executor.
I found that to collect an RDD containing a single partition of 512 mb took
3gb.
Here's the code I was using:
package com.test
import org.apache.spark._
import org.apache.spark.SparkContext._
object SparkRdd2 {
def main(args: Array[String]) {
try
{
//
// Process any arguments.
//
def parseOptions( map: Map[String,Any], listArgs: List[String]):
Map[String,Any] = {
listArgs match {
case Nil =>
map
case "-master" :: value :: tail =>
parseOptions( map+("master"-> value),tail)
case "-recordSize" :: value :: tail =>
parseOptions( map+("recordSize"-> value.toInt),tail)
case "-partitionSize" :: value :: tail =>
parseOptions( map+("partitionSize"-> value.toLong),tail)
case "-executorMemory" :: value :: tail =>
parseOptions( map+("executorMemory"-> value),tail)
case option :: tail =>
println("unknown option"+option)
sys.exit(1)
}
}
val listArgs = args.toList
val optionmap = parseOptions( Map[String,Any](),listArgs)
val master = optionmap.getOrElse("master","local").asInstanceOf[String]
val recordSize =
optionmap.getOrElse("recordSize",128).asInstanceOf[Int]
val partitionSize =
optionmap.getOrElse("partitionSize",1024*1024*1024).asInstanceOf[Long]
val executorMemory =
optionmap.getOrElse("executorMemory","6g").asInstanceOf[String]
println(f"Creating single partition of $partitionSize%d with records
of length $recordSize%d")
println(f"Setting spark.executor.memory to $executorMemory")
//
// Create SparkConf.
//
val sparkConf = new SparkConf()
sparkConf.setAppName("MyEnvVar").setMaster(master).setExecutorEnv("myenvvar","good")
sparkConf.set("spark.executor.cores","1")
sparkConf.set("spark.executor.instances","1")
sparkConf.set("spark.executor.memory",executorMemory)
sparkConf.set("spark.eventLog.enabled","true")
sparkConf.set("spark.eventLog.dir","hdfs://hadoop01glnxa64:54310/user/mprocopi/spark-events");
sparkConf.set("spark.driver.maxResultSize","0")
/*
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryoserializer.buffer.max","768m")
sparkConf.set("spark.kryoserializer.buffer","64k")
*/
//
// Create SparkContext
//
val sc = new SparkContext(sparkConf)
//
//
//
def createdSizedPartition( recordSize:Int ,partitionSize:Long):
Iterator[Array[Byte]] = {
var sizeReturned:Long = 0
new Iterator[Array[Byte]] {
override def hasNext(): Boolean = {
(sizeReturned createdSizedPartition(
rddInfo._1, rddInfo._2))
val results = sizedRdd.collect
var countLines: Int = 0
var countBytes: Long = 0
var maxRecord: Int = 0
for (line <- results) {
countLines = countLines+1
countBytes = countBytes+line.length
if (line.length> maxRecord)
{
maxRecord = line.length
}
}
println(f"Collected $countLines%d lines")
println(f" $countBytes%d bytes")
println(f"Max record $maxRecord%d bytes")
} catch {
case e: Exception =>
println("Error in executing application: ", e.getMessage)
throw e
}
}
}
After building it can be invoked as:
spark-submit --class com.test.SparkRdd2 --driver-memory 10g
./target/scala-2.11/envtest_2.11-0.0.1.jar -recordSize 256 -partitionSize
536870912
Allows you to vary the
> Reduce collect high memory requrements
> --
>
> Key: SPARK-21140
> URL: https://issues.apache.org/jira/browse/SPARK-21140
> Project: Spark
> Issue Type: Improvement
> Components: Input/Output
>Affects Versions: 2.1.1
> Environment: Linux Debian 8 using hadoop 2.7.2.
>Reporter: michael procopio
>
> I wrote a very simple Scala application which used flatMap to create an RDD
> containing a 512 mb partition of