> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 168
> > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line168>
> >
> >     At first I didn't understand how this was different from 
> > fetchThresholdPct (had to read the code to understand that 
> > fetchThresholdPct was per topic-partition and refreshThreshold was for 
> > total unprocessed messages). I wonder if this could be made clearer 
> > somehow, or even folded into a single parameter.

I think we might be able to completely eliminate fetchThresholdPct and 
depletedQueueSizeThreshold.

The refresh.maybeCall method is only triggered if 
buffer.totalUnprocessedMessages <= refreshThreshold. The default 
refreshThreshold is 1000, which means that a refresh is triggered as soon as 
there are < 1000 total unprocessed messages. The default fetchThresholdPct is 
0, which means depletedQueueSizeThreshold is set to 10000, which means that 
fetches for any given SSP are ONLY triggered when the SSP's queue is completely 
empty.

If we eliminate the fetch threshold and depleted queue size, and just do 
fetches when the queue size for an SSP is < refreshThreshold, I think we should 
be just as well off. In this scenario, a refresh is triggered when the total 
unprocessed messages < 1000 (by default). In turn, each individual SSP is added 
to the fetch request when it's queue size is < 1000. Thus, a worst-case 
scenario of on SSP in the container will still be fetched every time maybeCall 
is invoked, and maybeCall will only be invoked when the queue is depleted to <= 
10% of capacity. 


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/20022/#review39588
-----------------------------------------------------------


On April 17, 2014, 5:30 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/20022/
> -----------------------------------------------------------
> 
> (Updated April 17, 2014, 5:30 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> move coordinator to buffer, and add javadocs
> 
> 
> move unprocessed messages into coordinator
> 
> 
> adding a message chooser coordinator to try and extract some of the confusing 
> logic from system consumers into a separate class
> 
> 
> don't hard code the refresh threshold
> 
> 
> bump default max msgs per stream partition up to 10k
> 
> 
> add an unprocessed message counter
> 
> 
> black list empty system stream partitions
> 
> 
> Diffs
> -----
> 
>   
> samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> bbbacb59866c2374853052f7cc11826552f5fb01 
> 
> Diff: https://reviews.apache.org/r/20022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to