> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 97
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line97>
> >
> >     I would find the documentation more helpful if it didn't describe what 
> > the code does to this variable (I can find that out by reading the code 
> > itself), but rather *why* it exists and what problem it solves.

I tried to describe more about "why" in the latest patch.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 166
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line166>
> >
> >     Not clear whether you mean that maxMsgsPerStreamPartition defaults to 
> > 1000, or refreshThreshold defaults to 1000.

Updated to be explicit.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 174
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line174>
> >
> >     I think this is a slight change of semantics. The previous computation 
> > returned the number of messages that had not yet been given to the chooser, 
> > whereas totalUnprocessedMessages is only decremented after the chooser has 
> > chosen the message. Don't think that's a problem, just wanted to point it 
> > out in case it wasn't deliberate.

Yea, it is. I didn't think it was really a big deal, though.


> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 300
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line300>
> >
> >     I have difficulty convincing myself that this logic is correct -- is it 
> > always the case that the partition is neededByChooser in this case? It's 
> > probably correct, but it's very subtle logic that's easy to get wrong. 
> > Would it be possible to express this in a way that's easier to reason about?
> >     
> >     For example, mapping each systemStreamPartition to one of enum { 
> > IN_CHOOSER, NEEDED_BY_CHOOSER, SKIPPING_CHOOSER } would make clear that 
> > each partition is in exactly one of those three states at any one time.

Moved logic into MessageChooserBuffer. Hopefully this makes things a little 
more clear.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review39588
-----------------------------------------------------------


On April 17, 2014, 5:30 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 17, 2014, 5:30 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> move coordinator to buffer, and add javadocs
> 
> 
> move unprocessed messages into coordinator
> 
> 
> adding a message chooser coordinator to try and extract some of the confusing 
> logic from system consumers into a separate class
> 
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to