[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209407#comment-14209407
 ] 

Aaron Davidson commented on SPARK-2468:
---------------------------------------

My test was significantly less strict in its memory requirements, which may be 
the difference with respect to OOMs. I used two 28GB containers on different 
machines, with 24GB of that given to Spark's heap. Due to the networking of the 
containers, the maximum throughput was around 7Gb/s (combined directionally), 
which I was able to saturate using Netty but could only achieve around 3.5Gb/s 
(combined) using Nio.

My test was a sort of 50GB generated data shuffled between the two machines. I 
tested the sort as a whole as well as a different version where I injected a 
deserializer which immediately EOFs (this causes us to still read all data but 
do no computation on the reducer side, maximizing network throughput).

Here is my full test, including the no-op deserializer:

{code}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{Serializer, SerializerInstance, 
SerializationStream, DeserializationStream}
import java.io._
import java.nio.ByteBuffer
import scala.reflect.ClassTag

class NoOpReadSerializer(conf: SparkConf) extends Serializer with Serializable {
  override def newInstance(): SerializerInstance = {
    new NoOpReadSerializerInstance()
  }
}

class NoOpReadSerializerInstance()
  extends SerializerInstance {

  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    val bos = new ByteArrayOutputStream()
    val out = serializeStream(bos)
    out.writeObject(t)
    out.close()
    ByteBuffer.wrap(bos.toByteArray)
  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    null.asInstanceOf[T]
  }

  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T = {
    null.asInstanceOf[T]
  }

  override def serializeStream(s: OutputStream): SerializationStream = {
    new NoOpSerializationStream(s, 100)
  }

  override def deserializeStream(s: InputStream): DeserializationStream = {
    new NoOpDeserializationStream(s, 
Thread.currentThread().getContextClassLoader)
  }

  def deserializeStream(s: InputStream, loader: ClassLoader): 
DeserializationStream = {
    new NoOpDeserializationStream(s, loader)
  }
}

class NoOpDeserializationStream(in: InputStream, loader: ClassLoader)
  extends DeserializationStream {
  def readObject[T: ClassTag](): T = throw new EOFException()
  def close() { }
}

class NoOpSerializationStream(out: OutputStream, counterReset: Int) extends 
SerializationStream {
  private val objOut = new ObjectOutputStream(out)
  private var counter = 0

  def writeObject[T: ClassTag](t: T): SerializationStream = {
    objOut.writeObject(t)
    counter += 1
    if (counterReset > 0 && counter >= counterReset) {
      objOut.reset()
      counter = 0
    }
    this
  }

  def flush() { objOut.flush() }
  def close() { objOut.close() }
}


// Test code below:
implicit val arrayOrdering = Ordering.by((_: Array[Byte]).toIterable)
def createSort() = sc.parallelize( 0 until 5000000, 320).map { x : Int =>
  val rand = new scala.util.Random(System.nanoTime())
  val bytes = new Array[Byte](10000)
  rand.nextBytes(bytes)
  (bytes, 1)
}.sortByKey(true, 333)

val x = createSort()
x.count() // does shuffle + sorting on reduce side

val y = createSort().asInstanceOf[org.apache.spark.rdd.ShuffledRDD[_, _, 
_]].setSerializer(new NoOpReadSerializer(sc.getConf))
y.count() // does shuffle with no read-side computation (warning: causes FD 
leak in Spark!)
{code}

Note that if you run that with less memory, you may have to tun the number of 
partitions or size of data to avoid invoking the ExternalSorter. I observed 
very little GC and no significant heap/process growth in memory after the first 
run.

I will try another test where the memory is more constrained to further 
investigate the OOM problem.

> Netty-based block server / client module
> ----------------------------------------
>
>                 Key: SPARK-2468
>                 URL: https://issues.apache.org/jira/browse/SPARK-2468
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>            Priority: Critical
>             Fix For: 1.2.0
>
>
> Right now shuffle send goes through the block manager. This is inefficient 
> because it requires loading a block from disk into a kernel buffer, then into 
> a user space buffer, and then back to a kernel send buffer before it reaches 
> the NIC. It does multiple copies of the data and context switching between 
> kernel/user. It also creates unnecessary buffer in the JVM that increases GC
> Instead, we should use FileChannel.transferTo, which handles this in the 
> kernel space with zero-copy. See 
> http://www.ibm.com/developerworks/library/j-zerocopy/
> One potential solution is to use Netty.  Spark already has a Netty based 
> network module implemented (org.apache.spark.network.netty). However, it 
> lacks some functionality and is turned off by default. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to