On Aug. 19, 2014, 7:34 a.m., Chris Riccomini wrote: > > It looks good for me, except two not-used imports and one typo. :) > > > > Overall, two thoughts: > > 1. "collector" and "producers" seem convertible in the patch. Such as in > > TaskInstance.registerProducers method, we actually do "collector.register"; > > In TaskInstance.commit, use "Flushing producers for taskName:" as the log > > for collector.flush. So what is the real difference between "collectors" > > and "producers"? > > 2. From my persepective, producerMultiplexer seems doing all the jobs > > (correct me if I miss some points). So why do we use TaskInstanceCollector > > here?
The reason that we need something like TaskInstanceCollector is because SystemProducers segments everything according to a "source" (a task name). This is so that if one TaskInstance (StreamTask) calls TaskCoordinator.commit, it only commits for *that* task, not all tasks in the container. Before we had this change, it was not intuitive, since calling things like commit/flush would actually commit and flush all output for all tasks in the container. This led to a lot of latency, and having to coordinate between StreamTasks to figure out when it was safe to commit. We have SystemProducers.send(source, envelope) for this reason. This way, the producers can buffer and flush outgoing messages grouped by source, and thus flush only messages for a given source. This means that we now have an API mis-match between MessageCollector.send(envelope), whic has the source implicit (based on the StreamTask doing the sending), and the SystemProducers.send(source, envelope), which has the source explicitly defined. To get around this, you can either (1) wrap the SystemProducers.send call with a proper source defined on a per-TaskInstance basis, or (2) provide some callback mechanism, and use SystemProducers.register to "listen" to collectors that are registered by source. I opted for approach (1), since (2) would cause us to have to register the SystemProducers prematurely in the SamzaContainer.main method (since StorageEngineFactory.getStorageEngine also requires the same collector as used for the TaskInstance). I'm wondering if part of this is just naming? The TaskInstanceCollector is really more of a wrapper around the SystemProducers. It just so happens that it also implements the MessageCollector, and is used in that way in the task.process, and task.window calls? - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24820/#review50959 ----------------------------------------------------------- On Aug. 19, 2014, 5:46 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24820/ > ----------------------------------------------------------- > > (Updated Aug. 19, 2014, 5:46 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-384 > https://issues.apache.org/jira/browse/SAMZA-384 > > > Repository: samza > > > Description > ------- > > fix useless imports and wrong javadoc words > > > add javadocs. remove readable container. make all tests pass. > > > add task instance collector that sends immediately. > > > Diffs > ----- > > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > 3a264ad4bd9ea6ffa801c662dae09ebd7ff79d32 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > d574ac413c0ec81e12eb44b2d0cc0d9843aad434 > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala > 44d5dffb36edd03032bbbd8c13541f18192f2ba2 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 9484ddb50a97eef52ab6ff7c932fdda0ff1ecb0a > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > aae3f8795ef9a12beaefa0917939107102e76b31 > samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala > 444bf37db259d4fccc8ca2d479096c109911d46c > samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > 86b7f31d1a574f4ee66e8498ecb501a08b94dff5 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > e3c7fe3e2d329b0767eb439144b1ba419848bb96 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 9d5ff1309cd7ae6d41b48870ef7445d3710a2196 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 22c8f0c2954d070aab9ce8d204aa8220d9c7bd74 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala > ad39157c6d052b2e14e51b2f8a61d740fc18a129 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 7d0b8db0c3bf1e70fd6af03abb594c7000e6666b > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala > 851aae6b347b0be2cd2d891fc45030c3e47189d4 > > Diff: https://reviews.apache.org/r/24820/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >
