----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21581/#review43489 -----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala <https://reviews.apache.org/r/21581/#comment77641> private def, since this is an implementation detail. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77643> Can we move these variables to the constructor as default parameters? We can then update the FileReaderSystemFactory to get values out of the config object, and pass them in here. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77644> Include unit of time in variable name and docs. Milliseconds, I'm assuming? samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77661> Should use DaemonThreadFactory as second parameter here. This is a samza-core class that creates daemon threads in the executor, and names threads properly. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77645> maybe call this "shutdown", and use while(!shutdown) in the while loop? samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77648> Probably want to surround everything in the loop in a try/catch, so we can properly catch ThreadInterruptException, and shut down the loop and close file handle (so we don't leak it). You can then set var file = null above, and just change the if to if file == null || file.length <= filePointer. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77647> Should close file before creating a new one so we don't leak file handles. samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala <https://reviews.apache.org/r/21581/#comment77666> It looks like you're putting the current value of i into the offset field, which is the offset of the last byte read. The offset should represent the point in the file that we need to seek to begin reading this message (i.e. the offset of the first byte read for this message). I think we ned a second variable that tracks the beginning offset for the current message (i + 1 when cha == '\n'). We should use this variable when creating the IncomingMessageEnvelope, and then reset it when we reset the line variable. This change also requires a re-implementation of getOffsetsAfter. We'll need to seek to the supplied offset, then read through to the next newline, then return the newline's offset + 1. In the case where there is no next newline, we'll have to throw an exception. This case should never happen in getOffsetsAfter's current usage, since we always call getOffsetsAfter on messages that we've fully consumed (i.e. there was a newline AFTER the message). samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala <https://reviews.apache.org/r/21581/#comment77649> If you move the const variables in FileSystemReader (above) into the constructor, you can use the config object to get config variables from the config. For names, my vote would be for: systems.<system name>.queue.size systems.<system name>.polling.sleep.ms samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala <https://reviews.apache.org/r/21581/#comment77650> nit: suppose->supposed - Chris Riccomini On May 16, 2014, 9:15 p.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/21581/ > ----------------------------------------------------------- > > (Updated May 16, 2014, 9:15 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > 1. moved from old request SAMZA-138 > 2. new patch based on Samza core repository > 3. converted to Scala > 4. added javadoc for explaining > 5. FileReaderSystemAdmin implements SystemAdmin and provides metadata for a > file, including oldest offset, newest offset and upcming offset. > 6. FileReaderConsumer implements BlockingEnvelopeMap. It creates one thread > for each file-read operation. After reading existing messages, the thread > keeps checking the new messages coming to the file until the job is stopped. > 7. FileReaderFactory is implementing SystemFactory. > 8. related unit tests > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/21581/diff/ > > > Testing > ------- > > > Thanks, > > Yan Fang > >
