> On April 4, 2014, 8:30 p.m., Martin Kleppmann wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 168 > > <https://reviews.apache.org/r/20022/diff/2/?file=548469#file548469line168> > > > > At first I didn't understand how this was different from > > fetchThresholdPct (had to read the code to understand that > > fetchThresholdPct was per topic-partition and refreshThreshold was for > > total unprocessed messages). I wonder if this could be made clearer > > somehow, or even folded into a single parameter.
I think we might be able to completely eliminate fetchThresholdPct and depletedQueueSizeThreshold. The refresh.maybeCall method is only triggered if buffer.totalUnprocessedMessages <= refreshThreshold. The default refreshThreshold is 1000, which means that a refresh is triggered as soon as there are < 1000 total unprocessed messages. The default fetchThresholdPct is 0, which means depletedQueueSizeThreshold is set to 10000, which means that fetches for any given SSP are ONLY triggered when the SSP's queue is completely empty. If we eliminate the fetch threshold and depleted queue size, and just do fetches when the queue size for an SSP is < refreshThreshold, I think we should be just as well off. In this scenario, a refresh is triggered when the total unprocessed messages < 1000 (by default). In turn, each individual SSP is added to the fetch request when it's queue size is < 1000. Thus, a worst-case scenario of on SSP in the container will still be fetched every time maybeCall is invoked, and maybeCall will only be invoked when the queue is depleted to <= 10% of capacity. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/20022/#review39588 ----------------------------------------------------------- On April 17, 2014, 5:30 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/20022/ > ----------------------------------------------------------- > > (Updated April 17, 2014, 5:30 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > move coordinator to buffer, and add javadocs > > > move unprocessed messages into coordinator > > > adding a message chooser coordinator to try and extract some of the confusing > logic from system consumers into a separate class > > > don't hard code the refresh threshold > > > bump default max msgs per stream partition up to 10k > > > add an unprocessed message counter > > > black list empty system stream partitions > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/system/MessageChooserBuffer.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > bbbacb59866c2374853052f7cc11826552f5fb01 > > Diff: https://reviews.apache.org/r/20022/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
