-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21014/#review42168
-----------------------------------------------------------



samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
<https://reviews.apache.org/r/21014/#comment75914>

    What do you think about supporting the same style of commit that shutdown 
has (i.e. commit all/commit partition). Since you rolled in some of the 
SAMZA-23 work, it seems like we should discuss this part of the commit behavior.
    
    If we went this route, we might be able to generalize the ShutdownMethod 
enum a bit to work for both commit and shutdown.
    
    I'd have to see how it would look in practice, but commit has a very 
similar usage pattern: sometimes you want to commit all tasks in the container 
(e.g. before a long pause), but usually you just want to commit your partition.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/21014/#comment75913>

    I like the idea of moving this stuff out of TaskInstance, but I'd like to 
keep as much logic out of SamzaContainer as possible. In 0.6.0, we had a ton of 
logic in the TaskRunner (equivalent to SamzaContainer), and it turned into a 
total mess.
    
    Can we move all of the code you've added to SamzaContainer into a separate 
class (CoordinatorCoordinator? :P kidding...), and just have the container use 
that class in the appropriate spots? The new class could:
    
    1. handle ReadableCoordinator creation before each process/window/commit 
call.
    2. implement window/commit logic
    3. contain checkCoordinator
    4. keep all state for windowing/committing (these variables).
    
    The goal would be to remove all mentions of coordinator from the 
SamzaContainer, and put them all in this new class.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/21014/#comment75908>

    Moving lastCommitMs out of TaskInstance is a very subtle change in behavior 
that's not what we want, I think.
    
    Before, lastCommit pas per-TaskInstance. Calling coordinator.commit from a 
StreamTask would result in the lastCommitMs clock getting reset. This is no 
longer the case- the clock is only reset when the lastCommitMs expires now, 
even if a StreamTask calls coordinator.commit on every process call.
    
    If we want to maintain the same behavior as before, we still need 
per-TaskInstance commit clocks (even if we take them out of the TaskInstance 
class).
    
    This problem doesn't exist with window() since there's no way to trigger a 
window except through lastWindowMs timeout. No coordinator.window method exists.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/21014/#comment75897>

    Can still keep this block around since the task might not be windowable.



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/21014/#comment75894>

    Can remove this from TaskInstanceMetrics.


- Chris Riccomini


On May 2, 2014, 5:48 p.m., Martin Kleppmann wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21014/
> -----------------------------------------------------------
> 
> (Updated May 2, 2014, 5:48 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-253: Consensus shutdown API
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 192b226c9e6105cf666d484ac33bb0f854de7688 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 24364f4ad967eec9474225604b9cc4f830cc3b2e 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> aaf631ec7acd710ab8a5b288f696233223569b60 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 27b4ca5d7995536aa66f63b7329caddf41865bb4 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 
> 
> Diff: https://reviews.apache.org/r/21014/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Martin Kleppmann
> 
>

Reply via email to