-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23588/#review48438
-----------------------------------------------------------


General approach looks good to me, various small issues below.

An overall question: with the max fetch sizes removed, is there a risk of the 
container potentially running out of memory if the consumer fetches messages 
faster than the StreamTasks can process them?


samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
<https://reviews.apache.org/r/23588/#comment85034>

    I don't quite understand the use of Queue in the return type. What happens 
if you call a blocking operation on one of that queues? I thought all the 
blocking is supposed to be done in this poll() method, but the queues introduce 
another place where blocking could happen.
    
    If the intention is that you only ever call the non-blocking methods of the 
queue, why not just use List?



samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
<https://reviews.apache.org/r/23588/#comment85033>

    Prefer outgoingQueue.add(envelope) over offer(), as you're not checking the 
return value? (Not really a problem as you're not using a bounded-capacity 
queue, just seems like good hygiene)



samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
<https://reviews.apache.org/r/23588/#comment85035>

    Could you please also add this to the configuration table in the docs? 
Could you explain what the trade-off is here? (eg. shorter value reduces 
processing latency but potentially wastes cpu cycles/network io)
    
    I find this statement confusing: "SamzaContainer will always refresh for 
more messages immediately if there are no outstanding incoming messages to 
process". So when *does* this interval apply?



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/23588/#comment85036>

    Should this use SystemConsumers.DEFAULT_POLL_INTERVAL_MS?



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/23588/#comment85037>

    I think this is correct, but it confused me at first. I wondered: "why 
would there be any unprocessed messages then the consumer is started?"
    
    Perhaps call it unprocessedMessagesBySSP?



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/23588/#comment85040>

    Looks like DoublingBackOff is not used anywhere else and could be removed. 
(Also remove from the imports in this file.)



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
<https://reviews.apache.org/r/23588/#comment85049>

    Please don't catch Throwable. SAMZA-178



samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
<https://reviews.apache.org/r/23588/#comment85041>

    I think this should remain a gauge, not become a counter. I would normally 
think of a counter (in metrics terms) to be monotonically increasing, which is 
not the case here.


- Martin Kleppmann


On July 18, 2014, 9:38 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23588/
> -----------------------------------------------------------
> 
> (Updated July 18, 2014, 9:38 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-245
>     https://issues.apache.org/jira/browse/SAMZA-245
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> test one more piece of logic in poll
> 
> 
> set defaults in system consumers as variables. add a poll interval test.
> 
> 
> improve basic system consumers test
> 
> 
> add a test for refreshing in system consumers
> 
> 
> refactoring to make more scala-ish
> 
> 
> save a few operations by only getting a consumer in poll when we need one
> 
> 
> update should crank through bad deserialization messages rather than giving 
> up after one failure
> 
> 
> switch refreshThreshold to pollIntervalMs
> 
> 
> refresh consumers when system consumer is started
> 
> 
> add a refresh threshold config
> 
> 
> cleaning up metrics in system consumers
> 
> 
> add docs
> 
> 
> all tests pass. misc test fixes to make new api work. slight refactoring in 
> SystemConsumers.
> 
> 
> clean up erroneous import
> 
> 
> initial rebase without fixing file system conusmer.
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java 
> 591f8fbfe5784766078789b00d5e7d777f051e2b 
>   
> samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
>  9acfb108906e24cc3b9d589bce252c764e3ae2ce 
>   samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
> 9503739fc323f07aa156b4b18061b8a7ba838dd3 
>   
> samza-api/src/test/java/org/apache/samza/system/TestSystemStreamPartitionIterator.java
>  3ecabab9230f64dab203e3e19659828cbaefeb0e 
>   samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java 
> cb4d14854671ea602785581f7ed243dffde4ec26 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 8b881f2533975eea960e7a96b76763ef33af2a78 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> b303615ca0978a92febe227fcf88b5acecce4223 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 9eb70f298cbaef02e558c7a9dac0d0c5a914bd3f 
>   
> samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
>  b065ae6c9f1295d9036d0285e3fa8c5dd1042ec9 
>   
> samza-core/src/main/scala/org/apache/samza/system/chooser/BufferingMessageChooser.scala
>  c7ef6ef051dd694d7f5f80e780b02ac4152ac81b 
>   
> samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala
>  537412114105ee9124a32e7bda4ef95139612ec8 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 97e65eb2c4269f32decd364af07de2ddce784f97 
>   
> samza-core/src/test/scala/org/apache/samza/system/chooser/TestBufferingMessageChooser.scala
>  c96c53bcf5dc89147166319a29930b6961e2dd41 
>   
> samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
>  b2e04a74d690847e4494752b8d14deaaba5348b8 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
>  23d122e54fbefc6edfdfc1c0e3e6fa18f7141fcb 
>   
> samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
>  4016768d9dd0098d74e35b88bcf95d238edec427 
> 
> Diff: https://reviews.apache.org/r/23588/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to