On Aug. 19, 2014, 7:34 a.m., Chris Riccomini wrote: > > It looks good for me, except two not-used imports and one typo. :) > > > > Overall, two thoughts: > > 1. "collector" and "producers" seem convertible in the patch. Such as in > > TaskInstance.registerProducers method, we actually do "collector.register"; > > In TaskInstance.commit, use "Flushing producers for taskName:" as the log > > for collector.flush. So what is the real difference between "collectors" > > and "producers"? > > 2. From my persepective, producerMultiplexer seems doing all the jobs > > (correct me if I miss some points). So why do we use TaskInstanceCollector > > here? > > Chris Riccomini wrote: > The reason that we need something like TaskInstanceCollector is because > SystemProducers segments everything according to a "source" (a task name). > This is so that if one TaskInstance (StreamTask) calls > TaskCoordinator.commit, it only commits for *that* task, not all tasks in the > container. Before we had this change, it was not intuitive, since calling > things like commit/flush would actually commit and flush all output for all > tasks in the container. This led to a lot of latency, and having to > coordinate between StreamTasks to figure out when it was safe to commit. > > We have SystemProducers.send(source, envelope) for this reason. This way, > the producers can buffer and flush outgoing messages grouped by source, and > thus flush only messages for a given source. This means that we now have an > API mis-match between MessageCollector.send(envelope), whic has the source > implicit (based on the StreamTask doing the sending), and the > SystemProducers.send(source, envelope), which has the source explicitly > defined. To get around this, you can either (1) wrap the SystemProducers.send > call with a proper source defined on a per-TaskInstance basis, or (2) provide > some callback mechanism, and use SystemProducers.register to "listen" to > collectors that are registered by source. > > I opted for approach (1), since (2) would cause us to have to register > the SystemProducers prematurely in the SamzaContainer.main method (since > StorageEngineFactory.getStorageEngine also requires the same collector as > used for the TaskInstance). > > I'm wondering if part of this is just naming? The TaskInstanceCollector > is really more of a wrapper around the SystemProducers. It just so happens > that it also implements the MessageCollector, and is used in that way in the > task.process, and task.window calls?
Thanks for the explanation. Using TaskInstanceCollector as a wrapper for SystemProducers to overcome the API mis-match makes sense to me. (yes, the name was a little confusing. I was thinking the "collector" should have more things to do than just a wrapper. ) Since this is not end-user-faced stuff, am not picky about that as long as new developers can understand it. Besides this, I am ok with the patch. Seems Chinmay opened an issue. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24820/#review50959 ----------------------------------------------------------- On Aug. 19, 2014, 5:46 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24820/ > ----------------------------------------------------------- > > (Updated Aug. 19, 2014, 5:46 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-384 > https://issues.apache.org/jira/browse/SAMZA-384 > > > Repository: samza > > > Description > ------- > > fix useless imports and wrong javadoc words > > > add javadocs. remove readable container. make all tests pass. > > > add task instance collector that sends immediately. > > > Diffs > ----- > > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > 3a264ad4bd9ea6ffa801c662dae09ebd7ff79d32 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > d574ac413c0ec81e12eb44b2d0cc0d9843aad434 > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala > 44d5dffb36edd03032bbbd8c13541f18192f2ba2 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 9484ddb50a97eef52ab6ff7c932fdda0ff1ecb0a > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > aae3f8795ef9a12beaefa0917939107102e76b31 > samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala > 444bf37db259d4fccc8ca2d479096c109911d46c > samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > 86b7f31d1a574f4ee66e8498ecb501a08b94dff5 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > e3c7fe3e2d329b0767eb439144b1ba419848bb96 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 9d5ff1309cd7ae6d41b48870ef7445d3710a2196 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 22c8f0c2954d070aab9ce8d204aa8220d9c7bd74 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala > ad39157c6d052b2e14e51b2f8a61d740fc18a129 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 7d0b8db0c3bf1e70fd6af03abb594c7000e6666b > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala > 851aae6b347b0be2cd2d891fc45030c3e47189d4 > > Diff: https://reviews.apache.org/r/24820/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
