Hi all, It looks like the result of task is serialized twice, once by serializer (I.e. Java/Kryo depending on configuration) and once again by closure serializer (I.e. Java). To link the actual code,
The first one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213 The second one: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226 This serializes the “value”, which is the result of task run twice, which affects things like collect(), takeSample(), and toLocalIterator(). Would it make sense to simply serialize the DirectTaskResult once using the regular “serializer” (as opposed to closure serializer)? Would it cause problems when the Accumulator values are not Kryo-serializable? Alternatively, if we can assume that Accumator values are small, we can closure-serialize those, put the serialized byte array in DirectTaskResult with the raw task result “value”, and serialize DirectTaskResult. What do people think? Thanks, Mingyu