Hey TJ,

Also, is it possible that your upstream Kafka producer (the one sending
the RT messages) is set to async? In such a case, the messages will be
buffered for some period of time before they're sent to Kafka.

Cheers,
Chris

On 7/16/14 9:35 AM, "Chris Riccomini" <[email protected]> wrote:

>Hey TJ,
>
>To debug this, we're going to have to get our hands dirty with some
>metrics. Sorry. :(
>
>The first step is to establish whether the BrokerProxy code in Samza is
>actively getting data for the realtime topic when you're processing batch
>messages. To do this, please turn on the JMX reporter for your job and
>look at this metric:
>
>  %s-%s-messages-read" format (tp.topic, tp.partition))
>
>
>
>Even if your job is in the middle of processing batch data, this metric
>should increase when messages are sent to the realtime topic. If it
>increases, then the message is received in realtime, but being buffered
>somewhere in the SamzaContainer before being handed off to your
>StreamTask. If the number does not increase, then the BrokerProxy either
>isn't fetching the realtime data, or the realtime data is being buffered
>somewhere upstream (on the producer, or in the broker).
>
>Once we bisect here, we can continue narrowing down where the problem is.
>
>Cheers,
>Chris
>
>On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote:
>
>>Hey, Chris:
>>1.  Yes, batch size set to 1: systems.kafka.producer.batch.num.messages=1
>>2.   Building default chooser with: useBatching=false,
>>useBootstrapping=false, usePriority=true
>>3.  I just re-ran my test and the delay is about 16 seconds.  When I’m
>>really pumping data through the batch Kafka topics, I’ve seen around 1
>>minute.
>>
>>Thanks!
>>—T
>>
>>On Jul 15, 2014, at 10:11 AM, Chris Riccomini
>><[email protected]> wrote:
>>
>>> Hey TJ,
>>> 
>>> This sounds like a bug.
>>> 
>>> 1. Is the task.consumer.batch.size config set for your job?
>>> 2. What does this log line say: Building default chooser with:
>>> useBatching=%s, useBootstrapping=%s, usePriority=%s
>>> 3. How long is the delay that you're seeing?
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote:
>>> 
>>>> It seems for me that, the batch processing fetches many messages at
>>>>one
>>>> time and then takes too long time to process. My first thought is
>>>>that,
>>>> since we have the systems.system-name.samza.fetch.threshold
>>>> 
>>>><http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/confi
>>>>g
>>>>ur
>>>> ation-table.html>
>>>> ,
>>>> setting this number to a smaller (default is 50000) will force the
>>>>system
>>>> to fetch the messages more frequently.
>>>> 
>>>> Any other ideas?
>>>> 
>>>> Fang, Yan
>>>> [email protected]
>>>> +1 (206) 849-4108
>>>> 
>>>> 
>>>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli <[email protected]>
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> I have a stream processor that takes inputs from multiple streams,
>>>>>some
>>>>> are more batch, non-latency sensitive and others are real-time,
>>>>> infrequently have traffic and should be low-latency.  The real-time
>>>>> stream
>>>>> helps me interpret the batch stream, so I would ideally like any
>>>>> real-time
>>>>> stream envelopes delivered within some maximum latency from the time
>>>>>the
>>>>> message enters into a Kafka topic.
>>>>> 
>>>>> I have my stream processor configured to prioritize my real-time
>>>>>streams
>>>>> over the batch streams, but I consistently find that the real-time
>>>>> stream
>>>>> is delayed by traffic from the batch stream.  From tracing the Kafka
>>>>> consumer, it looks like my stream processor periodically fetches from
>>>>> Kafka, finds that the batch streams have a large chunk of messages
>>>>> waiting,
>>>>> doesn¹t find anything on the real-time topics, and processes away the
>>>>> batch
>>>>> messages for a few minutes.  During the batch processing, the Kafka
>>>>> consumer does not poll the real-time streams, so if a message is sent
>>>>> to a
>>>>> real-time topic, the message effectively doesn¹t arrive until the
>>>>>next
>>>>> time
>>>>> the Kafka consumer does another fetch.  When a real-time message is
>>>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly
>>>>> prioritizes traffic from the real-time streams over the batch
>>>>>streams.
>>>>> 
>>>>> Does anyone have recommendations on how to get infrequent but
>>>>>important
>>>>> messages within some maximum time bound?  Thanks!
>>>>> ‹T
>>> 
>>
>

Reply via email to