This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7ea3a33 [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer 7ea3a33 is described below commit 7ea3a336b99915f09174a4c3e47fa17f30b88890 Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Tue Feb 9 16:05:39 2021 +0900 [SPARK-34355][CORE][SQL][FOLLOWUP] Log commit time in all File Writer ### What changes were proposed in this pull request? When doing https://issues.apache.org/jira/browse/SPARK-34399 based on https://github.com/apache/spark/pull/31471 Found FileBatchWrite will use `FileFormatWrite.processStates()` too. We need log commit duration in other writer too. In this pr: 1. Extract a commit job method in SparkHadoopWriter 2. address other commit writer ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No Closes #31520 from AngersZhuuuu/SPARK-34355-followup. Authored-by: Angerszhuuuu <angers....@gmail.com> Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> --- .../main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala | 5 +++-- .../apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 37b4708..4eeec63 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -96,8 +96,9 @@ object SparkHadoopWriter extends Logging { iterator = iter) }) - committer.commitJob(jobContext, ret) - logInfo(s"Job ${jobContext.getJobID} committed.") + logInfo(s"Start to commit write Job ${jobContext.getJobID}.") + val (_, duration) = Utils.timeTakenMs { committer.commitJob(jobContext, ret) } + logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") } catch { case cause: Throwable => logError(s"Aborting job ${jobContext.getJobID}.", cause) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala index 266c834..7227e48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.FileFormatWriter.processStats +import org.apache.spark.util.Utils class FileBatchWrite( job: Job, @@ -31,8 +32,9 @@ class FileBatchWrite( extends BatchWrite with Logging { override def commit(messages: Array[WriterCommitMessage]): Unit = { val results = messages.map(_.asInstanceOf[WriteTaskResult]) - committer.commitJob(job, results.map(_.commitMsg)) - logInfo(s"Write Job ${description.uuid} committed.") + logInfo(s"Start to commit write Job ${description.uuid}.") + val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) } + logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") processStats(description.statsTrackers, results.map(_.summary.stats)) logInfo(s"Finished processing stats for write job ${description.uuid}.") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org