-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review88308
-----------------------------------------------------------


Mostly looks good. Have some questions:
* Have you tried the message "filtering" logic to the container level instead 
of the task level ? Not sure which is simpler in terms of code change. Since 
the container has access to all the task Instances and the systemAdmins, it 
seems convenient to have the caughtUp map within containerContext. I could be 
wrong :)
* I want to test the patch locally before confirming a ship it. Looks awesome 
for a first draft!


samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 55)
<https://reviews.apache.org/r/34974/#comment140753>

    This still does not handle the case of partition range. Please add the 
range handling or correct the exception message.



samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
 (line 45)
<https://reviews.apache.org/r/34974/#comment140785>

    nit: spacing



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 
405)
<https://reviews.apache.org/r/34974/#comment140947>

    The exception message is inaccurate. It can also happen when the taskName 
is not in startingOffsets map (although I am not sure if such a case will 
happen).



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
131)
<https://reviews.apache.org/r/34974/#comment140958>

    instead of getOrElse(null), try .orNull



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
143)
<https://reviews.apache.org/r/34974/#comment140959>

    Should we a have different metric for number of messages received by 
process() than the number of messages actually processed?
    We need to clarify the semantics of all our metrics, in perhaps a separate 
RB



samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 
216)
<https://reviews.apache.org/r/34974/#comment140960>

    Looks good. 
    nit: Can you change method to checkCaughtUp, instead of checkCatchedUp?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 
(line 397)
<https://reviews.apache.org/r/34974/#comment140967>

    Can you add some doc here saying this comparator is used in the context of 
broadcast streams (to detect impedence mismatch between tasks when consuming 
from broadcast stream) ?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 (line 109)
<https://reviews.apache.org/r/34974/#comment140972>

    We are registering with the offset in the method invocation in Line 105. 
Why do we need to update the topicPartitionsAndOffsets map with the replaced 
offset ?
    
    I understand that all tasks within the same container may be at different 
offset for broadcast stream ssps. But looks like consumer.register is being 
invoked in multiple places - TaskStorageManager & 
CoordinatorStreamSystemConsumer . Will the change impact these other components 
?



samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
 (line 98)
<https://reviews.apache.org/r/34974/#comment140973>

    nit: typo 'resiter'


- Navina Ramesh


On June 16, 2015, 9:23 p.m., Yan Fang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> -----------------------------------------------------------
> 
> (Updated June 16, 2015, 9:23 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-676
>     https://issues.apache.org/jira/browse/SAMZA-676
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> 1. added offsetComparator method in SystemAdmin Interface
> 
> 2. added "task.global.inputs" config
> 
> 3. rewrote Grouper classes using Java; allows to assign global streams during 
> grouping
> 
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer 
> to preserve messages order
> 
> 5. added taskNames to the offsets in OffsetManager
> 
> 6. allowed to assign one SSP to multiple taskInstances
> 
> 7. skipped already-processed messages in RunLoop
> 
> 8. unit tests for all changes
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 3374f0c 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb 
>   
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
>  249b8ae 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 20e5d26 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cbacd18 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> c5a5ea5 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
>  44e95fc 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
>  3c0acad 
>   
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
>  097f410 
>   samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 8d54c46 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> 64a5844 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 9fb1aa9 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 7caad28 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
>  a14169b 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
>  74daf72 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
>  deb3895 
>   
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
>  d9ae187 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  35086f5 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
>  de00320 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  1629035 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
>  2a84328 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
> b063366 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  1e936b4 
> 
> Diff: https://reviews.apache.org/r/34974/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yan Fang
> 
>

Reply via email to