Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15696#discussion_r85859487
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.util.{Date, UUID}
    +
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.mapreduce._
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +
    +import org.apache.spark.SparkHadoopWriter
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.mapred.SparkHadoopMapRedUtil
    +import org.apache.spark.sql.internal.SQLConf
    +
    +
    +object FileCommitProtocol {
    +  class TaskCommitMessage(obj: Any) extends Serializable
    +
    +  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
    +}
    +
    +
    +/**
    + * An interface to define how a Spark job commits its outputs. 
Implementations must be serializable.
    + *
    + * The proper call sequence is:
    + *
    + * 1. Driver calls setupJob.
    + * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
    + *    (or abortTask if task failed).
    + * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
    + *    failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
    + */
    +abstract class FileCommitProtocol {
    +  import FileCommitProtocol._
    +
    +  /**
    +   * Setups up a job. Must be called on the driver before any other 
methods can be invoked.
    +   */
    +  def setupJob(jobContext: JobContext): Unit
    +
    +  /**
    +   * Commits a job after the writes succeed. Must be called on the driver.
    +   */
    +  def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit
    +
    +  /**
    +   * Aborts a job after the writes fail. Must be called on the driver.
    +   *
    +   * Calling this function is a best-effort attempt, because it is 
possible that the driver
    +   * just crashes (or killed) before it can call abort.
    +   */
    +  def abortJob(jobContext: JobContext): Unit
    +
    +  /**
    +   * Sets up a task within a job.
    +   * Must be called before any other task related methods can be invoked.
    +   */
    +  def setupTask(taskContext: TaskAttemptContext): Unit
    +
    +  /**
    +   * Notifies the commit protocol to add a new file, and gets back the 
full path that should be
    +   * used. Must be called on the executors when running tasks.
    +   *
    +   * A full file path consists of the following parts:
    +   *  1. the base path
    +   *  2. some sub-directory within the base path, used to specify 
partitioning
    +   *  3. file prefix, usually some unique job id with the task id
    +   *  4. bucket id
    +   *  5. source specific file extension, e.g. ".snappy.parquet"
    +   *
    +   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 
and 5, and the rest
    +   * are left to the commit protocol implementation to decide.
    +   */
    +  def addTaskTempFile(taskContext: TaskAttemptContext, dir: 
Option[String], ext: String): String
    +
    +  /**
    +   * Commits a task after the writes succeed. Must be called on the 
executors when running tasks.
    +   */
    +  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
    +
    +  /**
    +   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
    +   */
    +  def abortTask(taskContext: TaskAttemptContext): Unit
    --- End diff --
    
    Is this also best-effort?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to