> On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 108 > > <https://reviews.apache.org/r/27649/diff/2/?file=762370#file762370line108> > > > > Do we need to handle cases where messageMap might be empty ? (Not sure > > if some callers might do that instead of sending null)
Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 161 > > <https://reviews.apache.org/r/27649/diff/2/?file=762370#file762370line161> > > > > Maybe return an unmodifiable map ? Good catch. Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 111 > > <https://reviews.apache.org/r/27649/diff/2/?file=762370#file762370line111> > > > > Maybe take key and value arguments for this constructor as well ? Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java, > > line 255 > > <https://reviews.apache.org/r/27649/diff/2/?file=762370#file762370line255> > > > > Wait - shouldn't the type be something like "delete-config" ? > > > > Or am I missing something ? The type has to be the same type that the message wishes to delete. This is somewhat bleeding implementation details in to the API, but thinking of Kafka's log compaction, the keys must match exactly: k=>{version=1, key=job.name, type=set-config} .. v=> {..some stuff..} v=>{version=1, key=job.name, type=set-config} .. v=> null So, Delete's type must be the same as the thing it's deleting. Also, it occurs to me that we're not protecting against field ordering, which was discussed in the ticket with Martin. I'll have to update with Jackson annotations or something to fix this. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java, > > line 60 > > <https://reviews.apache.org/r/27649/diff/2/?file=762371#file762371line60> > > > > Maybe use CoordinatorSystemAdmin just for consistency ? (more of a > > cosmetic comment - feel free to ignore) Going to skip for now. Reasoning is that I think we might want to use the FileReaderSystemAdmin for mocking at some point, and I don't want to restrict us from doing that (since it doesn't implement CoordinatorSystemAdmin right now). > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java, > > line 82 > > <https://reviews.apache.org/r/27649/diff/2/?file=762371#file762371line82> > > > > Null check for systemStreamMetadataMap ? Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, > > line 72 > > <https://reviews.apache.org/r/27649/diff/2/?file=762376#file762376line72> > > > > Add logging here to see the progress ? Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala, line 87 > > <https://reviews.apache.org/r/27649/diff/2/?file=762378#file762378line87> > > > > I guess I'm still confused - what does 'type' in a Delete message mean > > ? > > > > From this line it looks like the type is "set-config". Can you please > > elaborate ? Done. Added java docs. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 46 > > <https://reviews.apache.org/r/27649/diff/2/?file=762389#file762389line46> > > > > Brief comment ? Done. > On Nov. 13, 2014, 9:40 p.m., Chinmay Soman wrote: > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala, line 68 > > <https://reviews.apache.org/r/27649/diff/2/?file=762398#file762398line68> > > > > Spelling mistake : coordinatorSystemConfig Doh, again! Fixed. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27649/#review61298 ----------------------------------------------------------- On Nov. 13, 2014, 5:34 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27649/ > ----------------------------------------------------------- > > (Updated Nov. 13, 2014, 5:34 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-448 > https://issues.apache.org/jira/browse/SAMZA-448 > > > Repository: samza > > > Description > ------- > > remove SystemAdmin.createCoordinatorStream, and create a > CoordinatorStreamAdmin instead. > > > more feedback from yan > > > partial fix of yan's review comments > > > add logging info > > > make deletes actually work > > > add javadocs to mock coordinator stream classes. > > > delete old configs when job runner publishes new configs > > > add rewriting into job coordinator > > > make process job properly set only coordinator stream config > > > all tests pass > > > fix samza container performance test > > > explicitly flush all buffers when closing the kafka producer. fix stateful > task test. > > > fix kafka tests > > > all core tests work > > > fix test checkpoint tool by adding a mock coordinator consumer that dumps the > entire config > > > working on fixing checkpoint tool tests > > > fleshing out the coordinator stream message javadocs > > > remove duplicate code from coordinator system factory > > > add more javadocs. clean up todos inkafka system admin. > > > remove yarn.container.count from yarn config, but use it as a fallback to > job.container.count > > > add some docs and headers to the coodinator stream and system admin > > > refactoring to add coordinator stream system consumer > > > cleanup source in job runne > > > abstract coordinator system producer creation into a factory > > > add todos > > > config stream works > > > create coordinator stream in system admin > > > connecting job coordinator to job runner via coordinator stream > > > add util and logging methods > > > adding coordinator message and system producer wrapper > > > Diffs > ----- > > build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 > samza-api/src/main/java/org/apache/samza/system/CoordinatorSystemAdmin.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 16345cd1c1354a0d25a0000d81a307dbe3abbe81 > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > 6985af6e7cc0d408fa07fbac60141d1126323777 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > 1eb0edab1bc792ccf8c503b03687284451ab0f34 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala > d589d762a18f9425aa8d8dd589011a151bcb59a4 > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > 91aff3c5e0a2bcea45120d794371fca1c638ccfe > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > cab5101c5c9e2a979bca545fa8046e93dcfe46e2 > > Diff: https://reviews.apache.org/r/27649/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
