Repository: spark
Updated Branches:
  refs/heads/master 859dff56e -> 7bdc92197


[SPARK-11449][CORE] PortableDataStream should be a factory

```PortableDataStream``` maintains some internal state. This makes it tricky to 
reuse a stream (one needs to call ```close``` on both the 
```PortableDataStream``` and the ```InputStream``` it produces).

This PR removes all state from ```PortableDataStream``` and effectively turns 
it into an ```InputStream```/```Array[Byte]``` factory. This makes the user 
responsible for managing the ```InputStream``` it returns.

cc srowen

Author: Herman van Hovell <hvanhov...@questtec.nl>

Closes #9417 from hvanhovell/SPARK-11449.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bdc9219
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bdc9219
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bdc9219

Branch: refs/heads/master
Commit: 7bdc92197cce0edc0110dc9c2158e6e3f42c72ee
Parents: 859dff5
Author: Herman van Hovell <hvanhov...@questtec.nl>
Authored: Thu Nov 5 09:23:09 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Nov 5 09:23:09 2015 +0000

----------------------------------------------------------------------
 .../apache/spark/input/PortableDataStream.scala | 45 +++++++-------------
 1 file changed, 16 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7bdc9219/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 33e4ee0..280e7a5 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
DataInputStream, Da
 
 import scala.collection.JavaConverters._
 
-import com.google.common.io.ByteStreams
+import com.google.common.io.{Closeables, ByteStreams}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, 
TaskAttemptContext}
@@ -82,7 +82,6 @@ private[spark] abstract class StreamBasedRecordReader[T](
     if (!processed) {
       val fileIn = new PortableDataStream(split, context, index)
       value = parseStream(fileIn)
-      fileIn.close() // if it has not been open yet, close does nothing
       key = fileIn.getPath
       processed = true
       true
@@ -134,12 +133,6 @@ class PortableDataStream(
     index: Integer)
   extends Serializable {
 
-  // transient forces file to be reopened after being serialization
-  // it is also used for non-serializable classes
-
-  @transient private var fileIn: DataInputStream = null
-  @transient private var isOpen = false
-
   private val confBytes = {
     val baos = new ByteArrayOutputStream()
     SparkHadoopUtil.get.getConfigurationFromJobContext(context).
@@ -175,40 +168,34 @@ class PortableDataStream(
   }
 
   /**
-   * Create a new DataInputStream from the split and context
+   * Create a new DataInputStream from the split and context. The user of this 
method is responsible
+   * for closing the stream after usage.
    */
   def open(): DataInputStream = {
-    if (!isOpen) {
-      val pathp = split.getPath(index)
-      val fs = pathp.getFileSystem(conf)
-      fileIn = fs.open(pathp)
-      isOpen = true
-    }
-    fileIn
+    val pathp = split.getPath(index)
+    val fs = pathp.getFileSystem(conf)
+    fs.open(pathp)
   }
 
   /**
    * Read the file as a byte array
    */
   def toArray(): Array[Byte] = {
-    open()
-    val innerBuffer = ByteStreams.toByteArray(fileIn)
-    close()
-    innerBuffer
+    val stream = open()
+    try {
+      ByteStreams.toByteArray(stream)
+    } finally {
+      Closeables.close(stream, true)
+    }
   }
 
   /**
-   * Close the file (if it is currently open)
+   * Closing the PortableDataStream is not needed anymore. The user either can 
use the
+   * PortableDataStream to get a DataInputStream (which the user needs to 
close after usage),
+   * or a byte array.
    */
+  @deprecated("Closing the PortableDataStream is not needed anymore.", "1.6.0")
   def close(): Unit = {
-    if (isOpen) {
-      try {
-        fileIn.close()
-        isOpen = false
-      } catch {
-        case ioe: java.io.IOException => // do nothing
-      }
-    }
   }
 
   def getPath(): String = path


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to