Cheng Su created SPARK-33298:
--------------------------------

             Summary: FileCommitProtocol V2
                 Key: SPARK-33298
                 URL: https://issues.apache.org/jira/browse/SPARK-33298
             Project: Spark
          Issue Type: New Feature
          Components: Spark Core
    Affects Versions: 3.1.0
            Reporter: Cheng Su


This Jira is to propose a new version for `FileCommitProtocol` 
([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala]
 ), e.g. `FileCommitProtocolV2`.

The motivation is currently we have two requirements to change the API for 
FileCommitProtocol:

(1).Support write Hive ORC/Parquet bucketed table 
([https://github.com/apache/spark/pull/30003] ): need to add new parameter 
`prefix` into method `newTaskTempFile` and `newTaskTempFileAbsPath`, to allow 
spark writes hive/presto-compatible bucketed files.

(2).Fix commit collision in dynamic partition overwrite mode 
([https://github.com/apache/spark/pull/29000] ): need to add new method 
`getStagingDir` to allow customize dynamic partition staging directory to avoid 
commit collision.

 

The reason to propose FileCommitProtocolV2 instead of changing 
`FileCommitProtocol` directly, is that the API for FileCommitProtocolV2 is kind 
of public where we allow customized commit protocol subclass to use during 
run-time 
([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L146]
 ). So if we change the API (e.g. adding method, or changing existing method 
signature), it will break external subclass for the commit protocol. And we are 
aware of some of external subclasses for better support of object store, 
according to [~cloud_fan] .

 

One proposal for `FileCommitProtocolV2` can be:
{code:java}
abstract class FileCommitProtocolV2 {
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String],
options: Map[String, String]): String
 
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFileAbsPath(
    taskContext: TaskAttemptContext, absoluteDir: String, options: Map[String, 
String]): String

  // other new methods, e.g. getStagingDir
  def getStagingDir(path: String, jobId: String): Path

  // rest of FileCommitProtocol methods
  ...
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to