> On Nov. 14, 2014, 5:32 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java, line 59 > > <https://reviews.apache.org/r/28016/diff/1/?file=762999#file762999line59> > > > > Should use "stream" instead of "topic" in this API. Samza doesn't use > > "topic". > > > > I think this method should be createChangelogSream(String streamName, > > int partitions). The reason for keeping it changelog-specific is that we > > might want custom configs for changelog streams (e.g. custom replication > > count, segment size in Kafka, etc). If we don't make the API specific to > > the changelog, there's no way to have custom configs for changelogs (since > > we won't know if the topic being created is a changelog or not). > > > > Also, the pattern that I'm following for SAMZA-448 is to add a new > > interface for create*Stream methods in the SystemAdmin is to have an > > interface that extends SystemAdmin with just the create method in it (e.g. > > ChangelogSystemAdmin). Then, only KafkaSystemAdmin would implement the new > > interface, whereas the FileReaderSystemAdmin would not. This was based on > > Yan's feedback. > > Naveen Somasundaram wrote: > Makes sense, if we have more and more stream creations coming up, then we > can maybe generalize it by having a custom configuration object for this API > and refactor our code (with resonable defaults). But as of now, we don't have > a necessity (we probably have 2-3 stream creation needs). So I am not going > to try to code for the future :). I'll change it to changelogstream specific > function. > > The latter, as we discussed, I am not going to make the change to have a > separate interface. While I agree, every interface implementer will have to > deal with it (even though they are not interested in it), the only > alternative to the proposed solution from 448 is for the caller to deal with > it (check if the instance is an appropriate instance before using it), which > makes it slighly harder to use the interface IMO. So I am still inclined to > sticking with this implementation.
There's a useful distinction here: some systems are able to implement a changelog, and some are not. For example, Amazon Kinesis currently only supports time-based retention of messages for 1 day, and doesn't have log compaction, so it's not suitable for changelog streams. Now what happens if you try to use a system that's not changelog-compatible with a changelog? With the current implementation, assuming that systems which don't support changelogs throw an exception on createChangelogStream(), you'll get an exception at job startup. A sub-interface could allow slightly more graceful handling of this situation (since an early config verification step could detect whether the appropriate sub-interface is implemented, without calling the method). - Martin ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28016/#review61445 ----------------------------------------------------------- On Nov. 20, 2014, 9:56 p.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/28016/ > ----------------------------------------------------------- > > (Updated Nov. 20, 2014, 9:56 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > I have added an new method to the system admin as discussed in the jira, the > task storage manager fetches all the information necessary and creates the > change log topic using the system admin. > > PENDING: I have to update the Samza docs with the new configurations added, > will update the rb with docs updates > > > Diffs > ----- > > .reviewboardrc 9339119e248e41f954d47e1d01a0f2e1130d349c > docs/learn/documentation/versioned/jobs/configuration-table.html > 4266a137ae003e946e11c122d94061c31d643c77 > samza-api/src/main/java/org/apache/samza/config/Config.java > 2048e90e80e21086eb59be57f3bcd5ebf92b2811 > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 571c60631357ea9a0b4fa24e7253008619ef2f32 > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 38e313f3c39454110efd354e6ca025869fa930cd > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > d91d6d7940bd07a145dd3b782a9239f24bb5cf2e > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > b8719c36c2b9346bcd3f291e23b33d2c00cebfa9 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > 98e92bc12f3e2827cdec02f1ce94d7e2314e4b4e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > a79eccaa8fc18d197b77f9363f1814fefc4ac40d > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 9fc1f56d4404ec7722c0d34fde2804e981b41309 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 5ac33ea36da451250655d9dd373692b964322b41 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > 4ed5e881031e019d8df6de259cabb658820a3ba0 > samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala > d660b91fb7a1029a47d5e083759b8971ad97e617 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > fa1d51b290013a3913d64884dc43907a76670849 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > 118f5eee22016db3b802c32fb26c5d72fa61f1a7 > > Diff: https://reviews.apache.org/r/28016/diff/ > > > Testing > ------- > > Modoified TestStatefulTask to disable auto creation of topics and the test > seems to work. > > > Thanks, > > Naveen Somasundaram > >
