----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34974/#review88308 -----------------------------------------------------------
Mostly looks good. Have some questions: * Have you tried the message "filtering" logic to the container level instead of the task level ? Not sure which is simpler in terms of code change. Since the container has access to all the task Instances and the systemAdmins, it seems convenient to have the caughtUp map within containerContext. I could be wrong :) * I want to test the patch locally before confirming a ship it. Looks awesome for a first draft! samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 55) <https://reviews.apache.org/r/34974/#comment140753> This still does not handle the case of partition range. Please add the range handling or correct the exception message. samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java (line 45) <https://reviews.apache.org/r/34974/#comment140785> nit: spacing samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 405) <https://reviews.apache.org/r/34974/#comment140947> The exception message is inaccurate. It can also happen when the taskName is not in startingOffsets map (although I am not sure if such a case will happen). samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 131) <https://reviews.apache.org/r/34974/#comment140958> instead of getOrElse(null), try .orNull samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 143) <https://reviews.apache.org/r/34974/#comment140959> Should we a have different metric for number of messages received by process() than the number of messages actually processed? We need to clarify the semantics of all our metrics, in perhaps a separate RB samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 216) <https://reviews.apache.org/r/34974/#comment140960> Looks good. nit: Can you change method to checkCaughtUp, instead of checkCatchedUp? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 397) <https://reviews.apache.org/r/34974/#comment140967> Can you add some doc here saying this comparator is used in the context of broadcast streams (to detect impedence mismatch between tasks when consuming from broadcast stream) ? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala (line 109) <https://reviews.apache.org/r/34974/#comment140972> We are registering with the offset in the method invocation in Line 105. Why do we need to update the topicPartitionsAndOffsets map with the replaced offset ? I understand that all tasks within the same container may be at different offset for broadcast stream ssps. But looks like consumer.register is being invoked in multiple places - TaskStorageManager & CoordinatorStreamSystemConsumer . Will the change impact these other components ? samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala (line 98) <https://reviews.apache.org/r/34974/#comment140973> nit: typo 'resiter' - Navina Ramesh On June 16, 2015, 9:23 p.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34974/ > ----------------------------------------------------------- > > (Updated June 16, 2015, 9:23 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-676 > https://issues.apache.org/jira/browse/SAMZA-676 > > > Repository: samza > > > Description > ------- > > 1. added offsetComparator method in SystemAdmin Interface > > 2. added "task.global.inputs" config > > 3. rewrote Grouper classes using Java; allows to assign global streams during > grouping > > 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer > to preserve messages order > > 5. added taskNames to the offsets in OffsetManager > > 6. allowed to assign one SSP to multiple taskInstances > > 7. skipped already-processed messages in RunLoop > > 8. unit tests for all changes > > > Diffs > ----- > > checkstyle/import-control.xml 3374f0c > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 249b8ae > samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 20e5d26 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > cbacd18 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > c5a5ea5 > > samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala > 44e95fc > > samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala > 3c0acad > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > 097f410 > samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 8d54c46 > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > 64a5844 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 9fb1aa9 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 7caad28 > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala > a14169b > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala > 74daf72 > > samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala > deb3895 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > d9ae187 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 35086f5 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > de00320 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 1629035 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala > 2a84328 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > b063366 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 1e936b4 > > Diff: https://reviews.apache.org/r/34974/diff/ > > > Testing > ------- > > > Thanks, > > Yan Fang > >