[ 
https://issues.apache.org/jira/browse/SAMZA-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14065180#comment-14065180
 ] 

Chris Riccomini commented on SAMZA-342:
---------------------------------------

I believe the problem is in the way SystemConsumers is currently implemented. 
It has a config:

{code}
  /**
   * How low totalUnprocessedMessages has to get before the consumers are
   * polled for more data. This is defined to be 10% of
   * maxMsgsPerStreamPartition. Since maxMsgsPerStreamPartition defaults to
   * 10000, the default refreshThreshold is 1000.
   */
  val refreshThreshold = maxMsgsPerStreamPartition * .1
{code}

This parameter defaults to 1000. What this means is that SystemConsumers will 
not poll for any messages as long as there are at least 1000 unprocessed 
messages.

{code}
    if (envelopeFromChooser == null || buffer.totalUnprocessedMessages <= 
refreshThreshold) {
      refresh.maybeCall()
    }
{code}

Irritatingly, the maxMsgsPerStreamPartition is not currently configurable:

{code}
    val consumerMultiplexer = new SystemConsumers(
      // TODO add config values for no new message timeout and max msgs per 
stream partition
      chooser = chooser,
      consumers = consumers,
      serdeManager = serdeManager,
      metrics = systemConsumersMetrics,
      dropDeserializationError = dropDeserializationError)
{code}

TJ, as a test, is it possible for you to try re-compiling with a hard coded 
refresh threshold to validate? If you set a very high value, you should see 
more frequent polling for the realtime topics. Note that this will decrease 
throughput slightly, since it's effectively shrinking the amount of messages 
you're processing in a single batch between polls.

I'm working through a ticket (SAMZA-245) to address this issue (as well as 
other performance issues) right now. If you want to provide feedback on the 
redesign in that ticket, you're welcome to do so. One proposal that I have is 
to define a maximum-wait config that caps how long we go between polling a 
system. If you set this to something like 100ms, you'd be guaranteed to have at 
maximum 100ms of latency added, vs. what you're seeing now, which is much 
higher.

> Priority streams experience large latencies before being consumed by the 
> stream processor
> -----------------------------------------------------------------------------------------
>
>                 Key: SAMZA-342
>                 URL: https://issues.apache.org/jira/browse/SAMZA-342
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.7.0
>         Environment: ubuntu 13.10
>            Reporter: TJ Giuli
>
> I have a stream processor that takes inputs from multiple streams, some are 
> more batch, non-latency sensitive and others are real-time, infrequently have 
> traffic and should be low-latency.  The real-time stream helps me interpret 
> the batch stream, so I would ideally like any real-time stream envelopes 
> delivered within some maximum latency from the time the message enters into a 
> Kafka topic.  
> I have my stream processor configured to prioritize my real-time streams over 
> the batch streams, but I consistently find that the real-time stream is 
> delayed by traffic from the batch stream.  From tracing the Kafka consumer, 
> it looks like my stream processor periodically fetches from Kafka, finds that 
> the batch streams have a large chunk of messages waiting, doesn’t find 
> anything on the real-time topics, and processes away the batch messages for a 
> few minutes. During the batch processing, the Kafka consumer does not poll 
> the real-time streams, so if a message is sent to a real-time topic, the 
> message effectively doesn’t arrive until the next time the Kafka consumer 
> does another fetch.  When a real-time message is consumed by the Kafka 
> consumer, the TieredPriorityChooser correctly prioritizes traffic from the 
> real-time streams over the batch streams.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to