Repository: spark
Updated Branches:
  refs/heads/master 40c4cb2fe -> 242b4f02d


[SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming

In Spark 1.2 we added a `binaryRecords` input method for loading flat binary 
data. This format is useful for numerical array data, e.g. in scientific 
computing applications. This PR adds support for the same format in Streaming 
applications, where it is similarly useful, especially for streaming time 
series or sensor data.

Summary of additions
- adding `binaryRecordsStream` to Spark Streaming
- exposing `binaryRecordsStream` in the new PySpark Streaming
- new unit tests in Scala and Python

This required adding an optional Hadoop configuration param to `fileStream` and 
`FileInputStream`, but was otherwise straightforward.

tdas davies

Author: freeman <the.freeman....@gmail.com>

Closes #3803 from freeman-lab/streaming-binary-records and squashes the 
following commits:

b676534 [freeman] Clarify note
5ff1b75 [freeman] Add note to java streaming context
eba925c [freeman] Simplify notes
c4237b8 [freeman] Add experimental tag
30eba67 [freeman] Add filter and newFilesOnly alongside conf
c2cfa6d [freeman] Expose new version of fileStream with conf in java
34d20ef [freeman] Add experimental tag
14bca9a [freeman] Add experimental tag
b85bffc [freeman] Formatting
47560f4 [freeman] Space formatting
9a3715a [freeman] Refactor to reflect changes to FileInputSuite
7373f73 [freeman] Add note and defensive assertion for byte length
3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into 
streaming-binary-records
317b6d1 [freeman] Make test inline
fcb915c [freeman] Formatting
becb344 [freeman] Formatting
d3e75b2 [freeman] Add tests in python
a4324a3 [freeman] Line length
029d49c [freeman] Formatting
1c739aa [freeman] Simpler default arg handling
94d90d0 [freeman] Spelling
2843e9d [freeman] Add params to docstring
8b70fbc [freeman] Reorganization
28bff9b [freeman] Fix missing arg
9398bcb [freeman] Expose optional hadoop configuration
23dd69f [freeman] Tests for binaryRecordsStream
36cb0fd [freeman] Add binaryRecordsStream to scala
fe4e803 [freeman] Add binaryRecordStream to Java API
ecef0eb [freeman] Add binaryRecordsStream to python
8550c26 [freeman] Expose additional argument combination


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/242b4f02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/242b4f02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/242b4f02

Branch: refs/heads/master
Commit: 242b4f02df7f71ebcfa86a85c9ed39e40750a7fd
Parents: 40c4cb2
Author: freeman <the.freeman....@gmail.com>
Authored: Tue Feb 3 22:24:30 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Feb 3 22:24:30 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  9 ++-
 python/pyspark/streaming/context.py             | 16 +++++-
 python/pyspark/streaming/tests.py               | 15 +++++
 .../spark/streaming/StreamingContext.scala      | 59 +++++++++++++++++++-
 .../api/java/JavaStreamingContext.scala         | 52 ++++++++++++++++-
 .../streaming/dstream/FileInputDStream.scala    | 17 +++++-
 .../spark/streaming/InputStreamsSuite.scala     | 51 +++++++++++++++++
 7 files changed, 212 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 16c6fdb..eecfdd4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    *
    * Load data from a flat binary file, assuming the length of each record is 
constant.
    *
+   * '''Note:''' We ensure that the byte array for each record in the 
resulting RDD
+   * has the provided record length.
+   *
    * @param path Directory to the input data files
    * @param recordLength The length at which to split the records
    * @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       classOf[LongWritable],
       classOf[BytesWritable],
       conf=conf)
-    val data = br.map{ case (k, v) => v.getBytes}
+    val data = br.map { case (k, v) =>
+      val bytes = v.getBytes
+      assert(bytes.length == recordLength, "Byte array does not have correct 
length")
+      bytes
+    }
     data
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index d48f359..18aaae9 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
 from py4j.java_gateway import java_import, JavaObject
 
 from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, 
CloudPickleSerializer
 from pyspark.context import SparkContext
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
         """
         return DStream(self._jssc.textFileStream(directory), self, 
UTF8Deserializer())
 
+    def binaryRecordsStream(self, directory, recordLength):
+        """
+        Create an input stream that monitors a Hadoop-compatible file system
+        for new files and reads them as flat binary files with records of
+        fixed length. Files must be written to the monitored directory by 
"moving"
+        them from another location within the same file system.
+        File names starting with . are ignored.
+
+        @param directory:       Directory to load data from
+        @param recordLength:    Length of each record in bytes
+        """
+        return DStream(self._jssc.binaryRecordsStream(directory, 
recordLength), self,
+                       NoOpSerializer())
+
     def _check_serializers(self, rdds):
         # make sure they have same serializer
         if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index a8d876d..608f8e2 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
 import operator
 import unittest
 import tempfile
+import struct
 
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
         self.wait_for(result, 2)
         self.assertEqual([range(10), range(10)], result)
 
+    def test_binary_records_stream(self):
+        d = tempfile.mkdtemp()
+        self.ssc = StreamingContext(self.sc, self.duration)
+        dstream = self.ssc.binaryRecordsStream(d, 10).map(
+            lambda v: struct.unpack("10b", str(v)))
+        result = self._collect(dstream, 2, block=False)
+        self.ssc.start()
+        for name in ('a', 'b'):
+            time.sleep(1)
+            with open(os.path.join(d, name), "wb") as f:
+                f.write(bytearray(range(10)))
+        self.wait_for(result, 2)
+        self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), 
result))
+
     def test_union(self):
         input = [range(i + 1) for i in range(3)]
         dstream = self.ssc.queueStream(input)

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8ef0787..ddc435c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -27,10 +27,12 @@ import scala.reflect.ClassTag
 import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.input.FixedLengthBinaryInputFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
@@ -361,6 +363,30 @@ class StreamingContext private[streaming] (
 
   /**
    * Create a input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input 
format.
+   * Files must be written to the monitored directory by "moving" them from 
another
+   * location within the same file system. File names starting with . are 
ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing 
files in the directory
+   * @param conf Hadoop configuration
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[
+    K: ClassTag,
+    V: ClassTag,
+    F <: NewInputFormat[K, V]: ClassTag
+  ] (directory: String,
+     filter: Path => Boolean,
+     newFilesOnly: Boolean,
+     conf: Configuration): InputDStream[(K, V)] = {
+    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, 
Option(conf))
+  }
+
+  /**
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, 
value
    * as Text and input format as TextInputFormat). Files must be written to the
    * monitored directory by "moving" them from another location within the same
@@ -372,6 +398,37 @@ class StreamingContext private[streaming] (
   }
 
   /**
+   * :: Experimental ::
+   *
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them as flat binary files, assuming a fixed 
length per record,
+   * generating one byte array per record. Files must be written to the 
monitored directory
+   * by "moving" them from another location within the same file system. File 
names
+   * starting with . are ignored.
+   *
+   * '''Note:''' We ensure that the byte array for each record in the
+   * resulting RDDs of the DStream has the provided record length.
+   *
+   * @param directory HDFS directory to monitor for new file
+   * @param recordLength length of each record in bytes
+   */
+  @Experimental
+  def binaryRecordsStream(
+      directory: String,
+      recordLength: Int): DStream[Array[Byte]] = {
+    val conf = sc_.hadoopConfiguration
+    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, 
recordLength)
+    val br = fileStream[LongWritable, BytesWritable, 
FixedLengthBinaryInputFormat](
+      directory, FileInputDStream.defaultFilter : Path => Boolean, 
newFilesOnly=true, conf)
+    val data = br.map { case (k, v) =>
+      val bytes = v.getBytes
+      assert(bytes.length == recordLength, "Byte array does not have correct 
length")
+      bytes
+    }
+    data
+  }
+
+  /**
    * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    * @param queue      Queue of RDDs

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 9a2254b..0f7ae7a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2}
 import org.apache.spark.rdd.RDD
@@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
 
   /**
    * Create an input stream from network source hostname:port. Data is 
received using
-   * a TCP socket and the receive bytes it interepreted as object using the 
given
+   * a TCP socket and the receive bytes it interpreted as object using the 
given
    * converter.
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
@@ -210,6 +211,24 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
   }
 
   /**
+   * :: Experimental ::
+   *
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them as flat binary files with fixed record 
lengths,
+   * yielding byte arrays
+   *
+   * '''Note:''' We ensure that the byte array for each record in the
+   * resulting RDDs of the DStream has the provided record length.
+   *
+   * @param directory HDFS directory to monitor for new files
+   * @param recordLength The length at which to split the records
+   */
+  @Experimental
+  def binaryRecordsStream(directory: String, recordLength: Int): 
JavaDStream[Array[Byte]] = {
+    ssc.binaryRecordsStream(directory, recordLength)
+  }
+
+  /**
    * Create an input stream from network source hostname:port, where data is 
received
    * as serialized blocks (serialized using the Spark's serializer) that can 
be directly
    * pushed into the block manager without deserializing them. This is the 
most efficient
@@ -299,6 +318,37 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
   }
 
   /**
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input 
format.
+   * Files must be written to the monitored directory by "moving" them from 
another
+   * location within the same file system. File names starting with . are 
ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing 
files in the directory
+   * @param conf Hadoop configuration
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[K, V, F <: NewInputFormat[K, V]](
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F],
+      filter: JFunction[Path, JBoolean],
+      newFilesOnly: Boolean,
+      conf: Configuration): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
+    def fn = (x: Path) => filter.call(x).booleanValue()
+    ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
+  }
+
+  /**
    * Create an input stream with any arbitrary user implemented actor receiver.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e7c5639..6379b88 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
@@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
  *   processing semantics are undefined.
  */
 private[streaming]
-class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : 
ClassTag](
+class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
     @transient ssc_ : StreamingContext,
     directory: String,
     filter: Path => Boolean = FileInputDStream.defaultFilter,
-    newFilesOnly: Boolean = true)
+    newFilesOnly: Boolean = true,
+    conf: Option[Configuration] = None)
+    (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
   // This is a def so that it works during checkpoint recovery:
@@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
     val fileRDDs = files.map(file =>{
-      val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
+      val rdd = conf match {
+        case Some(config) => context.sparkContext.newAPIHadoopFile(
+          file,
+          fm.runtimeClass.asInstanceOf[Class[F]],
+          km.runtimeClass.asInstanceOf[Class[K]],
+          vm.runtimeClass.asInstanceOf[Class[V]],
+          config)
+        case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
+      }
       if (rdd.partitions.size == 0) {
         logError("File " + file + " has no data in it. Spark Streaming can 
only ingest " +
           "files that have been \"moved\" to the directory assigned to the 
file stream. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/242b4f02/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index bddf51e..01084a4 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     }
   }
 
+  test("binary records stream") {
+    val testDir: File = null
+    try {
+      val batchDuration = Seconds(2)
+      val testDir = Utils.createTempDir()
+      // Create a file that exists before the StreamingContext is created:
+      val existingFile = new File(testDir, "0")
+      Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified 
=== 10000)
+
+      // Set up the streaming context and input streams
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        // This `setTime` call ensures that the clock is past the creation 
time of `existingFile`
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
+        val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
+          with SynchronizedBuffer[Seq[Array[Byte]]]
+        val outputStream = new TestOutputStream(fileStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after 
StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.addToTime(batchDuration.milliseconds / 2)
+
+        val input = Seq(1, 2, 3, 4, 5)
+        input.foreach { i =>
+          Thread.sleep(batchDuration.milliseconds)
+          val file = new File(testDir, i.toString)
+          Files.write(Array[Byte](i.toByte), file)
+          assert(file.setLastModified(clock.currentTime()))
+          assert(file.lastModified === clock.currentTime)
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.addToTime(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === i)
+          }
+        }
+
+        val expectedOutput = input.map(i => i.toByte)
+        val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
+        assert(obtainedOutput === expectedOutput)
+      }
+    } finally {
+      if (testDir != null) Utils.deleteRecursively(testDir)
+    }
+  }
 
   test("file input stream - newFilesOnly = true") {
     testFileStream(newFilesOnly = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to