[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13382


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13382#discussion_r71266677
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
 
 /**
  * A class for writing JVM objects directly to a file on disk. This class 
allows data to be appended
- * to an existing block and can guarantee atomicity in the case of faults 
as it allows the caller to
- * revert partial writes.
+ * to an existing block. Callers can write to the same file and commit 
these writes.
+ * In case of faults, callers should atomically revert the uncommitted 
partial writes.
--- End diff --

Perhaps elaborate a bit more, e.g. "For efficiency, this class retains the 
underlying file channel across multiple commits to a file. The channel is kept 
open until close() is called on DiskBlockObjectWriter."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13382#discussion_r71262947
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
   extends OutputStream
   with Logging {
 
+  /**
+   * Guards against close calls, e.g. from a wrapping stream.
+   * Call manualClose to close the stream that was extended by this trait.
+   */
+  private trait ManualCloseOutputStream extends OutputStream {
+abstract override def close(): Unit = {
+  flush()
+}
+
+def manualClose(): Unit = {
+  super.close()
+}
+  }
+
   /** The file channel, used for repositioning / truncating the file. */
   private var channel: FileChannel = null
+  private var mcs: ManualCloseOutputStream = null
   private var bs: OutputStream = null
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private var initialized = false
+  private var streamOpen = false
   private var hasBeenClosed = false
-  private var commitAndCloseHasBeenCalled = false
 
   /**
* Cursors used to represent positions in the file.
*
-   * ||---   |
-   * ^^  ^
-   * ||finalPosition
-   * |  reportedPosition
-   *   initialPosition
+   * ||---|
+   *   ^  ^
+   *   |committedPosition
+   * reportedPosition
*
-   * initialPosition: Offset in the file where we start writing. Immutable.
* reportedPosition: Position at the time of the last update to the 
write metrics.
-   * finalPosition: Offset where we stopped writing. Set on 
closeAndCommit() then never changed.
+   * committedPosition: Offset after last committed write.
* -: Current writes to the underlying file.
* x: Existing contents of the file.
*/
-  private val initialPosition = file.length()
-  private var finalPosition: Long = -1
-  private var reportedPosition = initialPosition
+  private var committedPosition = file.length()
+  private var reportedPosition = committedPosition
 
   /**
* Keep track of number of records written and also use this to 
periodically
* output bytes written since the latter is expensive to do for each 
record.
*/
   private var numRecordsWritten = 0
 
+  private def initialize(): Unit = {
+fos = new FileOutputStream(file, true)
+channel = fos.getChannel()
+ts = new TimeTrackingOutputStream(writeMetrics, fos)
+class ManualCloseBufferedOutputStream
+  extends BufferedOutputStream(ts, bufferSize) with 
ManualCloseOutputStream
+mcs = new ManualCloseBufferedOutputStream
+  }
+
   def open(): DiskBlockObjectWriter = {
 if (hasBeenClosed) {
   throw new IllegalStateException("Writer already closed. Cannot be 
reopened.")
 }
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(writeMetrics, fos)
-channel = fos.getChannel()
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
+if (!initialized) {
+  initialize()
+  initialized = true
+}
+bs = compressStream(mcs)
 objOut = serializerInstance.serializeStream(bs)
-initialized = true
+streamOpen = true
 this
   }
 
-  override def close() {
+  /**
+   * Close and cleanup all resources.
+   * Should call after committing or reverting partial writes.
+   */
+  private def closeResources(): Unit = {
 if (initialized) {
-  Utils.tryWithSafeFinally {
-if (syncWrites) {
-  // Force outstanding writes to disk and track how long it takes
-  objOut.flush()
-  val start = System.nanoTime()
-  fos.getFD.sync()
-  writeMetrics.incWriteTime(System.nanoTime() - start)
-}
-  } {
-objOut.close()
-  }
-
+  mcs.manualClose()
   channel = null
+  mcs = null
   bs = null
   fos = null
   ts = null
   objOut = null
   initialized = false
+  streamOpen = false
   hasBeenClosed = true
 }
   }
 
-  def isOpen: Boolean = objOut != null
+  /**
+   * Commits any remaining partial writes and closes resources.
+   */
+  override def close() {
+if (initialized) {
+  Utils.tryWithSafeFinally {
+commit()
+  } {
+closeResources()
+  }

[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13382#discussion_r71262912
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
   extends OutputStream
   with Logging {
 
+  /**
+   * Guards against close calls, e.g. from a wrapping stream.
+   * Call manualClose to close the stream that was extended by this trait.
--- End diff --

Could you also update the class-level comment to note the commit-and-resume 
behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13382#discussion_r71262847
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
   extends OutputStream
   with Logging {
 
+  /**
+   * Guards against close calls, e.g. from a wrapping stream.
+   * Call manualClose to close the stream that was extended by this trait.
--- End diff --

Also comment that this is needed to support resume writing after a commit().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13382#discussion_r71262784
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
@@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
   extends OutputStream
   with Logging {
 
+  /**
+   * Guards against close calls, e.g. from a wrapping stream.
+   * Call manualClose to close the stream that was extended by this trait.
+   */
+  private trait ManualCloseOutputStream extends OutputStream {
+abstract override def close(): Unit = {
+  flush()
+}
+
+def manualClose(): Unit = {
+  super.close()
+}
+  }
+
   /** The file channel, used for repositioning / truncating the file. */
   private var channel: FileChannel = null
+  private var mcs: ManualCloseOutputStream = null
   private var bs: OutputStream = null
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private var initialized = false
+  private var streamOpen = false
   private var hasBeenClosed = false
-  private var commitAndCloseHasBeenCalled = false
 
   /**
* Cursors used to represent positions in the file.
*
-   * ||---   |
-   * ^^  ^
-   * ||finalPosition
-   * |  reportedPosition
-   *   initialPosition
+   * ||---|
--- End diff --

Could you update the diagram? I think this is misleading since 
reportedPosition will always be ahead of committedPosition except during some 
internal processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org