Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166398432
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    Let me know if you want me to change this PR.
    
    I'd like to see this go into 2.3.0 if there's still time. Just because it 
is documented doesn't mean it isn't a choice that severely limits the utility 
of DataSourceV2. I'd rather not support work-arounds for the life of 2.3.0.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to