On Aug. 19, 2014, 7:34 a.m., Chris Riccomini wrote:
> > It looks good for me, except two not-used imports and one typo. :)
> > 
> > Overall, two thoughts:
> > 1. "collector" and "producers" seem convertible in the patch. Such as in 
> > TaskInstance.registerProducers method, we actually do "collector.register"; 
> > In TaskInstance.commit, use "Flushing producers for taskName:" as the log 
> > for collector.flush. So what is the real difference between "collectors" 
> > and "producers"?
> > 2. From my persepective, producerMultiplexer seems doing all the jobs 
> > (correct me if I miss some points). So why do we use TaskInstanceCollector 
> > here?

The reason that we need something like TaskInstanceCollector is because 
SystemProducers segments everything according to a "source" (a task name). This 
is so that if one TaskInstance (StreamTask) calls TaskCoordinator.commit, it 
only commits for *that* task, not all tasks in the container. Before we had 
this change, it was not intuitive, since calling things like commit/flush would 
actually commit and flush all output for all tasks in the container. This led 
to a lot of latency, and having to coordinate between StreamTasks to figure out 
when it was safe to commit.

We have SystemProducers.send(source, envelope) for this reason. This way, the 
producers can buffer and flush outgoing messages grouped by source, and thus 
flush only messages for a given source. This means that we now have an API 
mis-match between MessageCollector.send(envelope), whic has the source implicit 
(based on the StreamTask doing the sending), and the 
SystemProducers.send(source, envelope), which has the source explicitly 
defined. To get around this, you can either (1) wrap the SystemProducers.send 
call with a proper source defined on a per-TaskInstance basis, or (2) provide 
some callback mechanism, and use SystemProducers.register to "listen" to 
collectors that are registered by source.

I opted for approach (1), since (2) would cause us to have to register the 
SystemProducers prematurely in the SamzaContainer.main method (since 
StorageEngineFactory.getStorageEngine also requires the same collector as used 
for the TaskInstance).

I'm wondering if part of this is just naming? The TaskInstanceCollector is 
really more of a wrapper around the SystemProducers. It just so happens that it 
also implements the MessageCollector, and is used in that way in the 
task.process, and task.window calls?


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24820/#review50959
-----------------------------------------------------------


On Aug. 19, 2014, 5:46 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24820/
> -----------------------------------------------------------
> 
> (Updated Aug. 19, 2014, 5:46 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-384
>     https://issues.apache.org/jira/browse/SAMZA-384
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> fix useless imports and wrong javadoc words
> 
> 
> add javadocs. remove readable container. make all tests pass.
> 
> 
> add task instance collector that sends immediately.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3a264ad4bd9ea6ffa801c662dae09ebd7ff79d32 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> d574ac413c0ec81e12eb44b2d0cc0d9843aad434 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  44d5dffb36edd03032bbbd8c13541f18192f2ba2 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 9484ddb50a97eef52ab6ff7c932fdda0ff1ecb0a 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  aae3f8795ef9a12beaefa0917939107102e76b31 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala 
> 444bf37db259d4fccc8ca2d479096c109911d46c 
>   samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> 86b7f31d1a574f4ee66e8498ecb501a08b94dff5 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> e3c7fe3e2d329b0767eb439144b1ba419848bb96 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 9d5ff1309cd7ae6d41b48870ef7445d3710a2196 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  22c8f0c2954d070aab9ce8d204aa8220d9c7bd74 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
>  ad39157c6d052b2e14e51b2f8a61d740fc18a129 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  7d0b8db0c3bf1e70fd6af03abb594c7000e6666b 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  851aae6b347b0be2cd2d891fc45030c3e47189d4 
> 
> Diff: https://reviews.apache.org/r/24820/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to