Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166360278
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    The API is flexible. The problem is that it defaults to no coordination, 
which cause correctness bugs.
    
    The safe option is to coordinate commits by default. If an implementation 
doesn't change the default, then it at least won't duplicate task outputs in 
job commit. Worst case is that it takes a little longer for committers that 
don't need coordination. On the other hand, not making this the default will 
cause some writers to work most of the time, but duplicate data in some cases.
    
    What do you think is the down side to adding this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to