Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166398432 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { - val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + val stageId = context.stageId() + val partId = context.partitionId() + val attemptId = context.attemptNumber() + val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { + val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- Let me know if you want me to change this PR. I'd like to see this go into 2.3.0 if there's still time. Just because it is documented doesn't mean it isn't a choice that severely limits the utility of DataSourceV2. I'd rather not support work-arounds for the life of 2.3.0.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org