> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 41 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line41> > > > > Can we move these variables to the constructor as default parameters? > > We can then update the FileReaderSystemFactory to get values out of the > > config object, and pass them in here.
changed the name to queueSize and move it to the constructor. > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 46 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line46> > > > > Include unit of time in variable name and docs. Milliseconds, I'm > > assuming? changed the name to pollingSleepMs and put it into constructor. > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 72 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line72> > > > > Should use DaemonThreadFactory as second parameter here. This is a > > samza-core class that creates daemon threads in the executor, and names > > threads properly. yes, done. all threads have a prefix "filereader" > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 93 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line93> > > > > maybe call this "shutdown", and use while(!shutdown) in the while loop? done. thanks. > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 101 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line101> > > > > Probably want to surround everything in the loop in a try/catch, so we > > can properly catch ThreadInterruptException, and shut down the loop and > > close file handle (so we don't leak it). > > > > You can then set var file = null above, and just change the if to if > > file == null || file.length <= filePointer. Added a try catch. and close the file when the thread is interrupted. But did not get the point of using var file = null here. So I did not set var file = null. What is the reason for that? > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 103 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line103> > > > > Should close file before creating a new one so we don't leak file > > handles. yes, done. thank you. > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala, > > line 111 > > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line111> > > > > It looks like you're putting the current value of i into the offset > > field, which is the offset of the last byte read. The offset should > > represent the point in the file that we need to seek to begin reading this > > message (i.e. the offset of the first byte read for this message). I think > > we ned a second variable that tracks the beginning offset for the current > > message (i + 1 when cha == '\n'). We should use this variable when creating > > the IncomingMessageEnvelope, and then reset it when we reset the line > > variable. > > > > This change also requires a re-implementation of getOffsetsAfter. We'll > > need to seek to the supplied offset, then read through to the next newline, > > then return the newline's offset + 1. In the case where there is no next > > newline, we'll have to throw an exception. This case should never happen in > > getOffsetsAfter's current usage, since we always call getOffsetsAfter on > > messages that we've fully consumed (i.e. there was a newline AFTER the > > message). corrected it now. > On May 20, 2014, 5:12 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala, > > line 32 > > <https://reviews.apache.org/r/21581/diff/1/?file=583676#file583676line32> > > > > If you move the const variables in FileSystemReader (above) into the > > constructor, you can use the config object to get config variables from the > > config. > > > > For names, my vote would be for: > > > > systems.<system name>.queue.size > > systems.<system name>.polling.sleep.ms getConsumer method in FileReaderSystemFactory now read the config variables. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21581/#review43489 ----------------------------------------------------------- On May 22, 2014, 1:56 a.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/21581/ > ----------------------------------------------------------- > > (Updated May 22, 2014, 1:56 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > 1. moved from old request SAMZA-138 > 2. new patch based on Samza core repository > 3. converted to Scala > 4. added javadoc for explaining > 5. FileReaderSystemAdmin implements SystemAdmin and provides metadata for a > file, including oldest offset, newest offset and upcming offset. > 6. FileReaderConsumer implements BlockingEnvelopeMap. It creates one thread > for each file-read operation. After reading existing messages, the thread > keeps checking the new messages coming to the file until the job is stopped. > 7. FileReaderFactory is implementing SystemFactory. > 8. related unit tests > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala > PRE-CREATION > > Diff: https://reviews.apache.org/r/21581/diff/ > > > Testing > ------- > > > Thanks, > > Yan Fang > >
