Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166463921
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    > As we can see, the number of people who can correctly implement a 
committer is << than those who have shipped one
    
    Totally agree. Great quote.
    
    From Wenchen's comments, I think we're in agreement that the default should 
be to use the commit coordinator. We just need to figure out how to get it in 
and how to document what it guarantees.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to