----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21014/#review42168 -----------------------------------------------------------
samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java <https://reviews.apache.org/r/21014/#comment75914> What do you think about supporting the same style of commit that shutdown has (i.e. commit all/commit partition). Since you rolled in some of the SAMZA-23 work, it seems like we should discuss this part of the commit behavior. If we went this route, we might be able to generalize the ShutdownMethod enum a bit to work for both commit and shutdown. I'd have to see how it would look in practice, but commit has a very similar usage pattern: sometimes you want to commit all tasks in the container (e.g. before a long pause), but usually you just want to commit your partition. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/21014/#comment75913> I like the idea of moving this stuff out of TaskInstance, but I'd like to keep as much logic out of SamzaContainer as possible. In 0.6.0, we had a ton of logic in the TaskRunner (equivalent to SamzaContainer), and it turned into a total mess. Can we move all of the code you've added to SamzaContainer into a separate class (CoordinatorCoordinator? :P kidding...), and just have the container use that class in the appropriate spots? The new class could: 1. handle ReadableCoordinator creation before each process/window/commit call. 2. implement window/commit logic 3. contain checkCoordinator 4. keep all state for windowing/committing (these variables). The goal would be to remove all mentions of coordinator from the SamzaContainer, and put them all in this new class. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/21014/#comment75908> Moving lastCommitMs out of TaskInstance is a very subtle change in behavior that's not what we want, I think. Before, lastCommit pas per-TaskInstance. Calling coordinator.commit from a StreamTask would result in the lastCommitMs clock getting reset. This is no longer the case- the clock is only reset when the lastCommitMs expires now, even if a StreamTask calls coordinator.commit on every process call. If we want to maintain the same behavior as before, we still need per-TaskInstance commit clocks (even if we take them out of the TaskInstance class). This problem doesn't exist with window() since there's no way to trigger a window except through lastWindowMs timeout. No coordinator.window method exists. samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/21014/#comment75897> Can still keep this block around since the task might not be windowable. samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala <https://reviews.apache.org/r/21014/#comment75894> Can remove this from TaskInstanceMetrics. - Chris Riccomini On May 2, 2014, 5:48 p.m., Martin Kleppmann wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/21014/ > ----------------------------------------------------------- > > (Updated May 2, 2014, 5:48 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA-253: Consensus shutdown API > > > Diffs > ----- > > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 192b226c9e6105cf666d484ac33bb0f854de7688 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 24364f4ad967eec9474225604b9cc4f830cc3b2e > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > aaf631ec7acd710ab8a5b288f696233223569b60 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 27b4ca5d7995536aa66f63b7329caddf41865bb4 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 > > Diff: https://reviews.apache.org/r/21014/diff/ > > > Testing > ------- > > > Thanks, > > Martin Kleppmann > >
