Github user ssimeonov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16204#discussion_r91437338
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
    @@ -225,32 +228,50 @@ object FileFormatWriter extends Logging {
           taskAttemptContext: TaskAttemptContext,
           committer: FileCommitProtocol) extends ExecuteWriteTask {
     
    -    private[this] var outputWriter: OutputWriter = {
    +    private[this] var currentWriter: OutputWriter = _
    --- End diff --
    
    Looking through the code, three things stand out:
    
    1. There is code duplication between `SingleDirectoryWriteTask` and 
`DynamicPartitionWriteTask` when it comes to current writer management and 
cleanup.
    
    2. There is duplication within `releaseResources()` and `newOutputWriter()` 
of the write tasks when it comes to releasing resources. 
    
    3. Write task state management is leaky because `releaseResources()` is 
called explicitly by `executeTask()`. Also, `releaseResources()` will be called 
twice when there are no exceptions and once if there is an exception in 
`execute()`, which is a bit confusing. 
    
    What about asking the base trait to do a bit more work and present a 
stronger contract to its users, e.g.:
    
    ```scala
      private trait ExecuteWriteTask {
    
        protected[this] var currentWriter: OutputWriter = null
    
        def execute(iterator: Iterator[InternalRow]): Set[String] = {
          try {
            executeImp(iterator)
          } finally {
            releaseResources()
          }
        }
    
        /**
         * Writes data out to files, and then returns the list of partition 
strings written out.
         * The list of partitions is sent back to the driver and used to update 
the catalog.
         */
        protected def executeImp(iterator: Iterator[InternalRow]): Set[String]
    
        protected def resetCurrentWriter(): Unit = {
          if (currentWriter != null) {
            currentWriter.close()
            currentWriter = null
          }
        }
    
        protected def releaseResources(): Unit = {
          resetCurrentWriter()
        }
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to