> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 41
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line41>
> >
> >     Can we move these variables to the constructor as default parameters? 
> > We can then update the FileReaderSystemFactory to get values out of the 
> > config object, and pass them in here.

changed the name to queueSize and move it to the constructor.


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 46
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line46>
> >
> >     Include unit of time in variable name and docs. Milliseconds, I'm 
> > assuming?

changed the name to pollingSleepMs and put it into constructor.


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 72
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line72>
> >
> >     Should use DaemonThreadFactory as second parameter here. This is a 
> > samza-core class that creates daemon threads in the executor, and names 
> > threads properly.

yes, done. all threads have a prefix "filereader"


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 93
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line93>
> >
> >     maybe call this "shutdown", and use while(!shutdown) in the while loop?

done. thanks.


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 101
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line101>
> >
> >     Probably want to surround everything in the loop in a try/catch, so we 
> > can properly catch ThreadInterruptException, and shut down the loop and 
> > close file handle (so we don't leak it).
> >     
> >     You can then set var file = null above, and just change the if to if 
> > file == null || file.length <= filePointer.

Added a try catch. and close the file when the thread is interrupted. But did 
not get the point of using var file = null here. So I did not set var file = 
null. What is the reason for that? 


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 103
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line103>
> >
> >     Should close file before creating a new one so we don't leak file 
> > handles.

yes, done. thank you.


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala,
> >  line 111
> > <https://reviews.apache.org/r/21581/diff/1/?file=583675#file583675line111>
> >
> >     It looks like you're putting the current value of i into the offset 
> > field, which is the offset of the last byte read. The offset should 
> > represent the point in the file that we need to seek to begin reading this 
> > message (i.e. the offset of the first byte read for this message). I think 
> > we ned a second variable that tracks the beginning offset for the current 
> > message (i + 1 when cha == '\n'). We should use this variable when creating 
> > the IncomingMessageEnvelope, and then reset it when we reset the line 
> > variable.
> >     
> >     This change also requires a re-implementation of getOffsetsAfter. We'll 
> > need to seek to the supplied offset, then read through to the next newline, 
> > then return the newline's offset + 1. In the case where there is no next 
> > newline, we'll have to throw an exception. This case should never happen in 
> > getOffsetsAfter's current usage, since we always call getOffsetsAfter on 
> > messages that we've fully consumed (i.e. there was a newline AFTER the 
> > message).

corrected it now.


> On May 20, 2014, 5:12 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala,
> >  line 32
> > <https://reviews.apache.org/r/21581/diff/1/?file=583676#file583676line32>
> >
> >     If you move the const variables in FileSystemReader (above) into the 
> > constructor, you can use the config object to get config variables from the 
> > config.
> >     
> >     For names, my vote would be for:
> >     
> >     systems.<system name>.queue.size
> >     systems.<system name>.polling.sleep.ms

getConsumer method in FileReaderSystemFactory now read the config variables.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21581/#review43489
-----------------------------------------------------------


On May 22, 2014, 1:56 a.m., Yan Fang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21581/
> -----------------------------------------------------------
> 
> (Updated May 22, 2014, 1:56 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> 1. moved from old request SAMZA-138
> 2. new patch based on Samza core repository
> 3. converted to Scala
> 4. added javadoc for explaining
> 5. FileReaderSystemAdmin implements SystemAdmin and provides metadata for a 
> file, including oldest offset, newest offset and upcming offset.
> 6. FileReaderConsumer implements BlockingEnvelopeMap. It creates one thread 
> for each file-read operation. After reading existing messages, the thread 
> keeps checking the new messages coming to the file until the job is stopped.
> 7. FileReaderFactory is implementing SystemFactory.
> 8. related unit tests 
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemFactory.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/21581/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yan Fang
> 
>

Reply via email to