[
https://issues.apache.org/jira/browse/SAMZA-245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071940#comment-14071940
]
Chris Riccomini commented on SAMZA-245:
---------------------------------------
bq. Also, the same concern as Martin's: will it be out-of-memory if processing
is slower than polling interval?
I don't think this should happen. We only add SystemStreamPartitions to the
emptySystemStreamPartitionsBySystem variable when the unprocessMessages buffer
is empty for the SSP (i.e. the update method returns false for the SSP):
{code}
if (!update(systemStreamPartition)) {
emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
}
{code}
Then, we move the SSP from emptySystemStreamPartitionsBySystem whenever we get
at least one message for it:
{code}
if (numEnvelopes > 0) {
unprocessedMessages.put(systemStreamPartition, envelopes)
// Update the chooser if it needs a message for this SSP.
if
(emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition))
{
update(systemStreamPartition)
}
}
{code}
In this way, we should only poll for SSPs that are totally empty. Thus the
theoretical max memory used is the max amount of messages/SSP * number of SSPs
* 2. The max amount of messages/SSP is defined by the underlying consumer. In
Kafka's case, it's defined by the fetch size. The number of SSPs is simply a
function of task.inputs' input streams. The 2 is required because we have two
buffers: BlockingEnvelopeMap's queue, and SystemConsumers' unprocessedMessages
buffer.
If a StreamTask is processing slower than the messages are coming in,
SystemConsumers should not poll for any new messages since all incoming
SystemStreamPartitions have messages in unprocessedMessages. If one or more
SystemStreamPartitions have empty buffers in unprocessedMessages, then ONLY
those SSPs should be refreshed when the pollIntervalMs expires.
> Improve SystemConsumers performance
> -----------------------------------
>
> Key: SAMZA-245
> URL: https://issues.apache.org/jira/browse/SAMZA-245
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.8.0
>
> Attachments: SAMZA-245-1.patch, SAMZA-245-3.patch, SAMZA-245-4.patch,
> SAMZA-245-5.patch, SAMZA-245.0.patch,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.SAMZA-245-3.html,
> org.apache.samza.test.performance.TestSamzaContainerPerformance.master.html
>
>
> As part of SAMZA-220, a more radical patch was proposed. This patch appears
> to improve SystemConsumers' performance pretty significantly, while also
> reducing its complexity. The decision was made to move this change into the
> 0.8.0 release, rather than the 0.7.0 release, since it's a fairly risky
> change.
> This ticket is to explore updating SystemConsumers to eliminate almost all
> loops in order to increase performance in the Samza container.
--
This message was sent by Atlassian JIRA
(v6.2#6252)