Github user dafrista commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65287494
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,66 +46,82 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the 
write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on 
closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to 
periodically
        * output bytes written since the latter is expensive to do for each 
record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with 
ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be 
reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
       override def close() {
         if (initialized) {
           Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    +        commit()
    --- End diff --
    
    This is interesting. Making `close()` a no-op breaks my understanding of 
`OutputStream`, where I expect (a) any buffered data to be written out and (b) 
cleaning up resources. But I don't see anywhere that definitely prescribes this 
either.
    
    Apart from that, right now having `revertPartialWritesAndClose()` call 
`close()` may be causing more confusion -- on this code path, the `commit()` 
called on `close()` ends up having no effect because `streamOpen` is set to 
false, but it's not so clear. How about introducing a private 
`closeResources()` that cleans up only the resources, and have `close()` call 
`commit()`, then `closeResources()`; and have `revertPartialWritesAndClose()` 
call `closeResources()` instead of `close()`?
    
    If the previous paragraph doesn't make much sense, I can push a commit with 
this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to