Github user rxin commented on the issue:
https://github.com/apache/spark/pull/23207
``` var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle, partitionId, context,
context.taskMetrics().shuffleWriteMetrics)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_
<: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}```
Can we put the above in a closure and pass it into shuffle dependency? Then
in SQL we just put the above in SQL using custom metrics.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]