----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27649/#review61298 -----------------------------------------------------------
Ship it! Looks good overall. Added minor comments in the review samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/27649/#comment102848> Do we need to handle cases where messageMap might be empty ? (Not sure if some callers might do that instead of sending null) samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/27649/#comment102849> Maybe take key and value arguments for this constructor as well ? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/27649/#comment102850> Maybe return an unmodifiable map ? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java <https://reviews.apache.org/r/27649/#comment102857> Wait - shouldn't the type be something like "delete-config" ? Or am I missing something ? samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/27649/#comment102861> Maybe use CoordinatorSystemAdmin just for consistency ? (more of a cosmetic comment - feel free to ignore) samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java <https://reviews.apache.org/r/27649/#comment102858> Null check for systemStreamMetadataMap ? samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala <https://reviews.apache.org/r/27649/#comment102872> Add logging here to see the progress ? samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala <https://reviews.apache.org/r/27649/#comment102873> I guess I'm still confused - what does 'type' in a Delete message mean ? From this line it looks like the type is "set-config". Can you please elaborate ? samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala <https://reviews.apache.org/r/27649/#comment102874> Brief comment ? samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala <https://reviews.apache.org/r/27649/#comment102875> Spelling mistake : coordinatorSystemConfig - Chinmay Soman On Nov. 13, 2014, 5:34 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27649/ > ----------------------------------------------------------- > > (Updated Nov. 13, 2014, 5:34 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-448 > https://issues.apache.org/jira/browse/SAMZA-448 > > > Repository: samza > > > Description > ------- > > remove SystemAdmin.createCoordinatorStream, and create a > CoordinatorStreamAdmin instead. > > > more feedback from yan > > > partial fix of yan's review comments > > > add logging info > > > make deletes actually work > > > add javadocs to mock coordinator stream classes. > > > delete old configs when job runner publishes new configs > > > add rewriting into job coordinator > > > make process job properly set only coordinator stream config > > > all tests pass > > > fix samza container performance test > > > explicitly flush all buffers when closing the kafka producer. fix stateful > task test. > > > fix kafka tests > > > all core tests work > > > fix test checkpoint tool by adding a mock coordinator consumer that dumps the > entire config > > > working on fixing checkpoint tool tests > > > fleshing out the coordinator stream message javadocs > > > remove duplicate code from coordinator system factory > > > add more javadocs. clean up todos inkafka system admin. > > > remove yarn.container.count from yarn config, but use it as a fallback to > job.container.count > > > add some docs and headers to the coodinator stream and system admin > > > refactoring to add coordinator stream system consumer > > > cleanup source in job runne > > > abstract coordinator system producer creation into a factory > > > add todos > > > config stream works > > > create coordinator stream in system admin > > > connecting job coordinator to job runner via coordinator stream > > > add util and logging methods > > > adding coordinator message and system producer wrapper > > > Diffs > ----- > > build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 > samza-api/src/main/java/org/apache/samza/system/CoordinatorSystemAdmin.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 16345cd1c1354a0d25a0000d81a307dbe3abbe81 > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > 6985af6e7cc0d408fa07fbac60141d1126323777 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > 1eb0edab1bc792ccf8c503b03687284451ab0f34 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala > d589d762a18f9425aa8d8dd589011a151bcb59a4 > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > 91aff3c5e0a2bcea45120d794371fca1c638ccfe > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > cab5101c5c9e2a979bca545fa8046e93dcfe46e2 > > Diff: https://reviews.apache.org/r/27649/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
