Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23207#discussion_r239677653
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
    @@ -333,8 +343,19 @@ object ShuffleExchangeExec {
           new ShuffleDependency[Int, InternalRow, InternalRow](
             rddWithPartitionIds,
             new PartitionIdPassthrough(part.numPartitions),
    -        serializer)
    +        serializer,
    +        shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
     
         dependency
       }
    +
    +  /**
    +   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the 
default metrics reporter
    +   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for 
[[ShuffleWriteProcessor]].
    +   */
    +  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): 
ShuffleWriteProcessor = {
    +    (reporter: ShuffleWriteMetricsReporter) => {
    --- End diff --
    
    does this work with Scala 2.11? maybe we don't need to be that fancy and 
just write
    ```
    new ShuffleWriteProcessor {
      xxx
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to