Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71262947
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the 
write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on 
closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to 
periodically
        * output bytes written since the latter is expensive to do for each 
record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with 
ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be 
reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
    -  override def close() {
    +  /**
    +   * Close and cleanup all resources.
    +   * Should call after committing or reverting partial writes.
    +   */
    +  private def closeResources(): Unit = {
         if (initialized) {
    -      Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    -      } {
    -        objOut.close()
    -      }
    -
    +      mcs.manualClose()
           channel = null
    +      mcs = null
           bs = null
           fos = null
           ts = null
           objOut = null
           initialized = false
    +      streamOpen = false
           hasBeenClosed = true
         }
       }
     
    -  def isOpen: Boolean = objOut != null
    +  /**
    +   * Commits any remaining partial writes and closes resources.
    +   */
    +  override def close() {
    +    if (initialized) {
    +      Utils.tryWithSafeFinally {
    +        commit()
    +      } {
    +        closeResources()
    +      }
    +    }
    +  }
     
       /**
        * Flush the partial writes and commit them as a single atomic block.
    +   * A commit may write additional bytes to frame the atomic block.
    +   *
    +   * @return file segment with previous offset and length committed on 
this call.
        */
    -  def commitAndClose(): Unit = {
    -    if (initialized) {
    +  def commit(): FileSegment = {
    --- End diff --
    
    nit: commitAndGet()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to