Repository: incubator-samza
Updated Branches:
  refs/heads/master 494feb0a5 -> 95cee714e


SAMZA-138; add a file system consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/95cee714
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/95cee714
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/95cee714

Branch: refs/heads/master
Commit: 95cee714ef1d017f948233396ca663064df28ab4
Parents: 494feb0
Author: Yan Fang <[email protected]>
Authored: Fri May 23 11:18:04 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Fri May 23 11:18:04 2014 -0700

----------------------------------------------------------------------
 .../filereader/FileReaderSystemAdmin.scala      | 133 +++++++++++++++
 .../filereader/FileReaderSystemConsumer.scala   | 171 +++++++++++++++++++
 .../filereader/FileReaderSystemFactory.scala    |  53 ++++++
 .../filereader/TestFileReaderSystemAdmin.scala  | 101 +++++++++++
 .../TestFileReaderSystemConsumer.scala          | 127 ++++++++++++++
 .../TestFileReaderSystemFactory.scala           |  42 +++++
 6 files changed, 627 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
new file mode 100644
index 0000000..9c99a59
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamPartition
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import scala.collection.JavaConversions._
+import java.io.RandomAccessFile
+import scala.util.control.Breaks
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+
+class FileReaderSystemAdmin extends SystemAdmin with Logging {
+  /**
+   * Given a list of streams, get their metadata. This method gets newest 
offset and upcoming
+   * offset by reading the file and then put them into 
SystemStreamPartitionMetadata. If the
+   * file is empty, it will use null for oldest and newest offset and "0" for 
upcoming offset.
+   * The metadata is a map (Partition, SystemStreamPartitionMetadata). Here, 
we only use one partition
+   * for each file. This method returns a map, whose key is stream name and 
whose value is the metadata.
+   *
+   * @see getNewestOffsetAndUpcomingOffset(RandomAccessFile)
+   */
+  def getSystemStreamMetadata(streams: java.util.Set[String]) = {
+    val allMetadata = streams.map(stream => {
+      val file = new RandomAccessFile(stream, "r")
+      val systemStreamPartitionMetadata = file.length match {
+        case 0 => new SystemStreamPartitionMetadata(null, null, "0")
+        case _ => {
+          val (newestOffset, upcomingOffset) = 
getNewestOffsetAndUpcomingOffset(file)
+          new SystemStreamPartitionMetadata("0", newestOffset, upcomingOffset)
+        }
+      }
+      file.close
+      val streamPartitionMetadata = Map(new Partition(0) -> 
systemStreamPartitionMetadata)
+      val systemStreamMetadata = new SystemStreamMetadata(stream, 
streamPartitionMetadata)
+      (stream, systemStreamMetadata)
+    }).toMap
+
+    info("Got metadata: %s" format allMetadata)
+
+    allMetadata
+  }
+
+  /**
+   * This method looks for the location of next newline in the file based on 
supplied offset.
+   * It first finds out the \n position of the line which starts with the 
supplied offset. The
+   * next newline's offset will be the \n position + 1.
+   *
+   * If we can not find the \n position in the supplied offset's line, throw a 
SamzaException. Because
+   * we are supposed to only call this method in fully consumed messages.
+   */
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
{
+    val offsetAfter = offsets.map {
+      case (systemStreamPartition, offset) => {
+        val file = new RandomAccessFile(systemStreamPartition.getStream, "r")
+        val newOffset = findNextEnter(file, offset.toLong, 1) match {
+          case Some(x) => x + 1
+          case None => throw new SamzaException("the line beginning with " + 
offset + " in " + systemStreamPartition.getStream + " has not been completed!")
+        }
+        (systemStreamPartition, newOffset.toString)
+      }
+    }
+    mapAsJavaMap(offsetAfter)
+  }
+
+  /**
+   * Get the newest offset and upcoming offset from a file. The newest offset 
is the offset of
+   * second-to-last \n in the file + 1. The upcoming offset is the offset of 
last \n + 1. If
+   * there are not enough \n in the file, default value is 0.
+   *
+   * This method reads file backwards until reach the second-to-last \n. The 
assumption is, in most cases,
+   * there are more bytes to second-to-last \n from beginning than from ending.
+   */
+  private def getNewestOffsetAndUpcomingOffset(file: RandomAccessFile): 
(String, String) = {
+    var newestOffset = 0
+    val upcomingOffset = findNextEnter(file, file.length - 1, -1) match {
+      case Some(x) => x + 1
+      case None => 0
+    }
+    // if we can not find upcomingOffset, we can not find newest offset either.
+    if (upcomingOffset != 0) {
+      // upcomingOffset - 2 is the offset of the byte before the last \n
+      newestOffset = findNextEnter(file, upcomingOffset - 2, -1) match {
+        case Some(x) => x + 1
+        case None => 0
+      }
+    }
+    (newestOffset.toString, upcomingOffset.toString)
+  }
+
+  /**
+   * This method is to find the next \n in the file according to the starting 
position provided.
+   * If the step is +1, it will look for the \n after this position; if the 
step is -1, it will
+   * look for the \n before this starting position. If it finds the \n, return 
the position of
+   * this \n, otherwise, it returns -1.
+   */
+  private def findNextEnter(file: RandomAccessFile, startingPosition: Long, 
step: Int): Option[Int] = {
+    var enterPosition: Option[Int] = None
+    var i = startingPosition
+    val loop = new Breaks
+    loop.breakable(
+      while (i < file.length && i > -1) {
+        file.seek(i)
+        val cha = file.read.toChar
+        if (cha == '\n') {
+          enterPosition = Some(i.toInt)
+          loop.break
+        }
+        i = i + step
+      })
+    enterPosition
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
new file mode 100644
index 0000000..c0e1bb6
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.mutable.Map
+import java.io.RandomAccessFile
+import org.apache.samza.system.IncomingMessageEnvelope
+import java.util.concurrent.LinkedBlockingQueue
+import org.apache.samza.Partition
+import collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import java.util.concurrent.Executors
+import java.util.concurrent.ExecutorService
+import org.apache.samza.util.DaemonThreadFactory
+import org.apache.samza.SamzaException
+import grizzled.slf4j.Logging
+
+object FileReaderSystemConsumer {
+  /**
+   * prefix for the file reader system thread names
+   */
+  val FILE_READER_SYSTEM_THREAD_PREFIX = "filereader-"
+}
+
+class FileReaderSystemConsumer(
+  systemName: String,
+  metricsRegistry: MetricsRegistry,
+
+  /**
+   * Threshold used to determine when there are too many 
IncomingMessageEnvelopes to be put onto
+   * the BlockingQueue.
+   */
+  queueSize: Int = 10000,
+
+  /**
+   * the sleep interval of checking the file length. Unit of the time is 
milliseconds.
+   */
+  pollingSleepMs: Int = 500) extends BlockingEnvelopeMap with Logging {
+
+  /**
+   * a map for storing a systemStreamPartition and its starting offset.
+   */
+  var systemStreamPartitionAndStartingOffset = Map[SystemStreamPartition, 
String]()
+
+  /**
+   * a thread pool for the threads reading files.
+   * The size of the pool equals to the number of files to read.
+   */
+  var pool: ExecutorService = null
+
+  /**
+   * register the systemStreamPartition and put they SystemStreampartition and 
its starting offset
+   * into the systemStreamPartitionAndStartingOffset map
+   */
+  override def register(systemStreamPartition: SystemStreamPartition, 
startingOffset: String) {
+    super.register(systemStreamPartition, startingOffset)
+    systemStreamPartitionAndStartingOffset += ((systemStreamPartition, 
startingOffset))
+  }
+
+  /**
+   * start one thread for each file reader
+   */
+  override def start {
+    pool = 
Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size, new 
DaemonThreadFactory(FileReaderSystemConsumer.FILE_READER_SYSTEM_THREAD_PREFIX))
+    systemStreamPartitionAndStartingOffset.map { case (ssp, offset) => 
pool.execute(readInputFiles(ssp, offset)) }
+  }
+
+  /**
+   * Stop all the running threads
+   */
+  override def stop {
+    pool.shutdown
+  }
+
+  /**
+   * The method returns a runnable object, which reads a file until reach the 
end of the file. It puts
+   * every line (ends with \n) and its offset (the beginning of the line) into 
BlockingQueue. If a line
+   * is not ended with \n, it is thought as uncompleted. Therefore the thread 
will wait until the line
+   * is completed and then put it into queue. The thread keeps comparing the 
file length with file pointer
+   * to read the latest/updated file content. If the file is read to the end 
of current content, setIsHead()
+   * is called to specify that the SystemStreamPartition has "caught up". The 
thread sleep time between
+   * two compares is determined by <code>pollingSleepMs</code>
+   */
+  private def readInputFiles(ssp: SystemStreamPartition, startingOffset: 
String) = {
+    new Runnable {
+      @volatile var shutdown = false // tag to indicate the thread should stop 
running
+
+      def run() {
+        val path = ssp.getStream
+        var file: RandomAccessFile = null
+        var filePointer = startingOffset.toLong
+        var line = "" // used to form a line of characters
+        var offset = filePointer // record the beginning offset of a line
+        try {
+          file = new RandomAccessFile(path, "r")
+          while (!shutdown) {
+            if (file.length <= filePointer) {
+              Thread.sleep(pollingSleepMs)
+              file.close
+              file = new RandomAccessFile(path, "r")
+            } else {
+              file.seek(filePointer)
+              var i = filePointer
+              while (i < file.length) {
+                val cha = file.read.toChar
+                if (cha == '\n') {
+                  // put into the queue. offset is the beginning of this line
+                  put(ssp, new IncomingMessageEnvelope(ssp, offset.toString, 
null, line));
+                  offset = i + 1 // the beginning of the newline
+                  line = ""
+                } else {
+                  line = line + cha
+                }
+                i += 1
+              }
+              filePointer = file.length
+              setIsAtHead(ssp, true)
+            }
+          }
+        } catch {
+          case ie: InterruptedException => {
+            // Swallow this exception since we don't need to clutter the logs 
+            // with interrupt exceptions when shutting down.
+            info("Received an interrupt while reading file. Shutting down.")
+          }
+        } finally {
+          if (file != null) {
+            file.close
+          }
+        }
+      }
+
+      // stop the thread gracefully by changing the variable's value
+      def stop() {
+        shutdown = true
+      }
+    }
+  }
+
+  /**
+   * Constructs a new bounded BlockingQueue of IncomingMessageEnvelopes. The 
bound is determined
+   * by the <code>BOUNDED_QUEUE_THRESHOLD</code> constant.
+   *
+   * @return A bounded queue used for queueing IncomingMessageEnvelopes to be 
sent to their
+   *         specified destinations.
+   */
+  override def newBlockingQueue = {
+    new LinkedBlockingQueue[IncomingMessageEnvelope](queueSize);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
new file mode 100644
index 0000000..9f2bb17
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.SamzaException
+
+class FileReaderSystemFactory extends SystemFactory {
+  /**
+   * get the FileReaderSystemConsumer. It also tries to get the queue size
+   * and polling sleep time from config file. If they do not exist, will use 
the default
+   * value.
+   */
+  def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
+    val queueSize = config.getInt("systems." + systemName + ".queue.size", 
10000)
+    val pollingSleepMs = config.getInt("systems." + systemName + 
".polling.sleep.ms", 500)
+    new FileReaderSystemConsumer(systemName, registry, queueSize, 
pollingSleepMs)
+  }
+
+  /**
+   * this system is not designed for writing to files. So disable the producer 
method.
+   * It throws Exception when the system tries to getProducer.
+   */
+  def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry) = {
+    throw new SamzaException("not supposed to write to files")
+  }
+
+  /**
+   * get the FileReaderSystemAdmin
+   */
+  def getAdmin(systemName: String, config: Config) = {
+    new FileReaderSystemAdmin
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
new file mode 100644
index 0000000..fb26bfc
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+import java.io.PrintWriter
+import java.io.File
+import org.scalatest.junit.AssertionsForJUnit
+import org.junit.Test
+import org.junit.Before
+import org.junit.After
+import java.io.RandomAccessFile
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import scala.collection.mutable.HashMap
+import org.apache.samza.SamzaException
+
+class TestFileReaderSystemAdmin extends AssertionsForJUnit {
+
+  val files = List("empty.txt", "noEnter.txt", "oneEnter.txt", "twoEnter.txt", 
"moreEnter.txt")
+
+  @Before
+  def createFiles {
+    files.foreach(file => {
+      val writer = new PrintWriter(new File(file))
+      file match {
+        case "empty.txt" =>
+        case "noEnter.txt" => writer.write("first line")
+        case "oneEnter.txt" => writer.write("first line \nsecond line")
+        case "twoEnter.txt" => writer.write("first line \nsecond line \nother 
lines")
+        case "moreEnter.txt" => writer.write("first line \nsecond line \nthird 
line \nother lines \n")
+      }
+      writer.close
+    })
+  }
+
+  @After
+  def deleteFiles {
+    files.foreach(file => (new File(file)).delete)
+  }
+
+  @Test
+  def testGetOffsetsAfter {
+    val fileReaderSystemAdmin = new FileReaderSystemAdmin
+    val ssp1 = new SystemStreamPartition("file-reader", files(0), new 
Partition(0))
+    val ssp2 = new SystemStreamPartition("file-reader", files(1), new 
Partition(0))
+    val ssp3 = new SystemStreamPartition("file-reader", files(2), new 
Partition(0))
+    val ssp4 = new SystemStreamPartition("file-reader", files(3), new 
Partition(0))
+    val ssp5 = new SystemStreamPartition("file-reader", files(4), new 
Partition(0))
+
+    val offsets: java.util.Map[SystemStreamPartition, String] =
+      HashMap(ssp3 -> "0", ssp4 -> "12", ssp5 -> "25")
+    val afterOffsets = fileReaderSystemAdmin.getOffsetsAfter(offsets)
+    assertEquals("12", afterOffsets.get(ssp3))
+    assertEquals("25", afterOffsets.get(ssp4))
+    assertEquals("37", afterOffsets.get(ssp5))
+  }
+
+  @Test
+  def testGetSystemStreamMetadata {
+    val fileReaderSystemAdmin = new FileReaderSystemAdmin
+    val allMetadata = 
fileReaderSystemAdmin.getSystemStreamMetadata(setAsJavaSet(files.toSet))
+    val expectedEmpty = new SystemStreamPartitionMetadata(null, null, "0")
+    val expectedNoEntry = new SystemStreamPartitionMetadata("0", "0", "0")
+    val expectedOneEntry = new SystemStreamPartitionMetadata("0", "0", "12")
+    val expectedTwoEntry = new SystemStreamPartitionMetadata("0", "12", "25")
+    val expectedMoreEntry = new SystemStreamPartitionMetadata("0", "37", "50")
+
+    allMetadata.foreach { entry =>
+      {
+        val result = (entry._2).getSystemStreamPartitionMetadata().get(new 
Partition(0))
+        entry._1 match {
+          case "empty.txt" => assertEquals(expectedEmpty, result)
+          case "noEnter.txt" => assertEquals(expectedNoEntry, result)
+          case "oneEnter.txt" => assertEquals(expectedOneEntry, result)
+          case "twoEnter.txt" => assertEquals(expectedTwoEntry, result)
+          case "moreEnter.txt" => assertEquals(expectedMoreEntry, result)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
new file mode 100644
index 0000000..b2e04a7
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.AfterClass
+import java.io.PrintWriter
+import java.io.File
+import org.apache.samza.Partition
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import org.junit.BeforeClass
+import java.io.FileWriter
+
+object TestFileReaderSystemConsumer {
+  val consumer = new FileReaderSystemConsumer("file-reader", null)
+  val files = List("empty.txt", "noEnter.txt", "oneEnter.txt", "twoEnter.txt", 
"moreEnter.txt")
+  val ssp1 = new SystemStreamPartition("file-reader", files(0), new 
Partition(0))
+  val ssp2 = new SystemStreamPartition("file-reader", files(1), new 
Partition(0))
+  val ssp3 = new SystemStreamPartition("file-reader", files(2), new 
Partition(0))
+  val ssp4 = new SystemStreamPartition("file-reader", files(3), new 
Partition(0))
+  val ssp5 = new SystemStreamPartition("file-reader", files(4), new 
Partition(0))
+
+  @BeforeClass
+  def beforeCreateFiles {
+    files.foreach(file => {
+      val writer = new PrintWriter(new File(file))
+      file match {
+        case "empty.txt" =>
+        case "noEnter.txt" => writer.write("first line")
+        case "oneEnter.txt" => writer.write("first line \nsecond line")
+        case "twoEnter.txt" => writer.write("first line \nsecond line \nother 
lines")
+        case "moreEnter.txt" => writer.write("first line \nsecond line \nthird 
line \nother lines \n")
+      }
+      writer.close
+    })
+  }
+
+  @AfterClass
+  def afterDeleteFiles {
+    files.foreach(file => (new File(file)).delete)
+  }
+
+  def appendFile {
+    val fileWriter = new FileWriter("moreEnter.txt", true);
+    fileWriter.write("This is a new line\n");
+    fileWriter.close
+  }
+}
+
+class TestFileReaderSystemConsumer {
+  import TestFileReaderSystemConsumer._
+
+  @Test
+  def testRegisterAndPutCorrectMessagesOffsetsToBlockingQueue {
+    consumer.register(ssp1, "0")
+    consumer.register(ssp2, "0")
+    consumer.register(ssp3, "0")
+    consumer.register(ssp4, "12")
+    consumer.register(ssp5, "25")
+
+    // test register correctly
+    assertEquals("0", 
consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp1, null))
+    assertEquals("0", 
consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp2, null))
+    assertEquals("0", 
consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp3, null))
+    assertEquals("12", 
consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp4, null))
+    assertEquals("25", 
consumer.systemStreamPartitionAndStartingOffset.getOrElse(ssp5, null))
+
+    consumer.start
+    Thread.sleep(500)
+
+    val number: Integer = 1000
+    val ssp1Number: java.util.Map[SystemStreamPartition, Integer] = 
HashMap(ssp1 -> number)
+    val ssp2Number: java.util.Map[SystemStreamPartition, Integer] = 
HashMap(ssp2 -> number)
+    val ssp3Number: java.util.Map[SystemStreamPartition, Integer] = 
HashMap(ssp3 -> number)
+    val ssp4Number: java.util.Map[SystemStreamPartition, Integer] = 
HashMap(ssp4 -> number)
+    val ssp5Number: java.util.Map[SystemStreamPartition, Integer] = 
HashMap(ssp5 -> number)
+
+    val ssp1Result = consumer.poll(ssp1Number, 1000)
+    val ssp2Result = consumer.poll(ssp2Number, 1000)
+    val ssp3Result = consumer.poll(ssp3Number, 1000)
+    val ssp4Result = consumer.poll(ssp4Number, 1000)
+
+    assertEquals(0, ssp1Result.size)
+    assertEquals(0, ssp2Result.size)
+
+    assertEquals(1, ssp3Result.size)
+    assertEquals("first line ", ssp3Result(0).getMessage)
+    assertEquals("0", ssp3Result(0).getOffset)
+
+    assertEquals(1, ssp4Result.size)
+    assertEquals("second line ", ssp4Result(0).getMessage)
+    assertEquals("12", ssp4Result(0).getOffset)
+
+    appendFile
+    Thread.sleep(1000)
+
+    // ssp5 should read the new lines
+    val ssp5Result = consumer.poll(ssp5Number, 1000)
+    assertEquals(3, ssp5Result.size)
+    assertEquals("This is a new line", ssp5Result(2).getMessage)
+    assertEquals("50", ssp5Result(2).getOffset)
+    assertEquals("other lines ", ssp5Result(1).getMessage)
+    assertEquals("37", ssp5Result(1).getOffset)
+
+    consumer.stop
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/95cee714/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
new file mode 100644
index 0000000..330df78
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.filereader
+
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+import org.scalatest.junit.AssertionsForJUnit
+import org.junit.Test
+import org.apache.samza.SamzaException
+
+class TestFileReaderSystemFactory extends AssertionsForJUnit {
+
+  @Test
+  def testGetProducerThrowCorrectException {
+    val fileReaderSystemFactory = new FileReaderSystemFactory
+    var correctException = false
+    try {
+      fileReaderSystemFactory.getProducer("", null, null)
+    } catch {
+      case e: SamzaException => correctException = true
+      case _: Throwable => correctException = false
+    }
+    assertTrue(correctException)
+  }
+}

Reply via email to