-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19384/#review38411
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/19384/#comment70559>

    This isn't required if we just take a StreamMetadataCache object (see 
comments below).



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/19384/#comment70558>

    I think I prefer having a StreamMetadataCache object get passed in via the 
constructor, rather than using statics. It makes things more mockable. Can we 
make StreamMetadataCache work like a normal object, and just have 
SamzaContainer create one, and pass it into all the TaskInstances?



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/19384/#comment70556>

    For process, window, and commit, can we just take a PartitionCoordinator 
here, and have SamzaContainer call TaskCoordinators.coordinatorForPartition to 
pass in the appropriate PartitionCoordinator?



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
<https://reviews.apache.org/r/19384/#comment70566>

    If we take my suggestion above about converting from TaskCoordinators to 
PartitionCoordinator, we lose access to isShutdownAgreed. We could have 
SamzaContainer pass a shutdown: Boolean parameter instead.
    
    The goal I'm trying to achieve is to keep variables in TaskInstance at the 
per-TaskInstance level. In the past, introducing shared variables between 
TaskInstances (like the ones we have in the constructor) has lead to really 
hard to reason about code. When we did the 0.7.0 refactor, the TaskInstance was 
setup to be much more careful about this.
    
    Also, switching isCommitRequested to being per-TaskInstance instead of 
global (in TaskCoordinators) is actually a good change I think. If a 
StreamTask.process method says coordinator.commit right now, it's not obvious 
that EVERY partition will commit. If the StreamTask.process method has 
coordinator.commit in it, and you have 10 TaskInstances in the container, and 
you process 10 messages, you commit 100 times, not 10 times (10 TaskInstance 
commits for each message). This has led to performance issues in the past for 
containers that call commit frequently.



samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
<https://reviews.apache.org/r/19384/#comment70562>

    I think it might be better to make this just a normal class, and create an 
instance in SamzaContainer that gets passed everywhere. It will make everything 
more mockable, plus the TTL, clock, and SystemAdmins can be taken as parameters.



samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
<https://reviews.apache.org/r/19384/#comment70563>

    Do we need to lock here? I get needing to lock on write so we don't 
accidentally lose a CacheEntry, but on read, it seems like locking might not be 
required since we're using an immutable map.



samza-core/src/main/scala/org/apache/samza/task/TaskCoordinators.scala
<https://reviews.apache.org/r/19384/#comment70560>

    TaskInstanceCoordinator might be a better name for this. SAMZA-71 is going 
to break the tie between a single partition and a TaskInstance, so 
PartitionCoordinator will be a misnomer in the future.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/19384/#comment70569>

    Can you do one dot per line here?
    
    metadata
      .getSystemStreamPartitionMetadata
      .keys
      .map



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/19384/#comment70570>

    I think you can just do new SystemStreamPartition(systemStream, _)



samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
<https://reviews.apache.org/r/19384/#comment70568>

    Do we need this change since we kept the shutdown method now?


- Chris Riccomini


On March 22, 2014, 8:18 p.m., Martin Kleppmann wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19384/
> -----------------------------------------------------------
> 
> (Updated March 22, 2014, 8:18 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-179 Add caching of system metadata lookups
> 
> 
> SAMZA-179 Rename ReadableCoordinator to TaskCoordinators; improve shutdown 
> method signature
> 
> 
> SAMZA-179 Allow a task to detect when it has caught up, and shut down 
> gracefully.
> 
> 
> Diffs
> -----
> 
>   build.gradle fc596267e38c53cfc44f1cf52f8d6acedc848da8 
>   gradle/dependency-versions.gradle 612670dded2c2d290f1bc36ece1f3d53ffa4e971 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
> 611507ed340d9a3fb7b8cd5e0e0ce37b5561da32 
>   samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java 
> 192b226c9e6105cf666d484ac33bb0f854de7688 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> c101b59f3e476dcc2e3b7870d53d0d36002f2434 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c4b135c0f46edaa7fc0e4c0bf909e1ffa9515242 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> aaf631ec7acd710ab8a5b288f696233223569b60 
>   samza-core/src/main/scala/org/apache/samza/task/TaskCoordinators.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 5b429dfece38d877175fa0495db6450e01d82689 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> a2d5820e9eeb7590d208cf7fb7025c589f451bca 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 27b4ca5d7995536aa66f63b7329caddf41865bb4 
>   
> samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> c45ed9bb1b3916cd4c4043e36272b4e3508bfb87 
>   samza-core/src/test/scala/org/apache/samza/task/TestTaskCoordinators.scala 
> PRE-CREATION 
>   
> samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
>  3dc263011b955cfccac83e71384b865f8fc2b722 
> 
> Diff: https://reviews.apache.org/r/19384/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Martin Kleppmann
> 
>

Reply via email to