Hey TJ, To debug this, we're going to have to get our hands dirty with some metrics. Sorry. :(
The first step is to establish whether the BrokerProxy code in Samza is actively getting data for the realtime topic when you're processing batch messages. To do this, please turn on the JMX reporter for your job and look at this metric: %s-%s-messages-read" format (tp.topic, tp.partition)) Even if your job is in the middle of processing batch data, this metric should increase when messages are sent to the realtime topic. If it increases, then the message is received in realtime, but being buffered somewhere in the SamzaContainer before being handed off to your StreamTask. If the number does not increase, then the BrokerProxy either isn't fetching the realtime data, or the realtime data is being buffered somewhere upstream (on the producer, or in the broker). Once we bisect here, we can continue narrowing down where the problem is. Cheers, Chris On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote: >Hey, Chris: >1. Yes, batch size set to 1: systems.kafka.producer.batch.num.messages=1 >2. Building default chooser with: useBatching=false, >useBootstrapping=false, usePriority=true >3. I just re-ran my test and the delay is about 16 seconds. When I’m >really pumping data through the batch Kafka topics, I’ve seen around 1 >minute. > >Thanks! >—T > >On Jul 15, 2014, at 10:11 AM, Chris Riccomini ><[email protected]> wrote: > >> Hey TJ, >> >> This sounds like a bug. >> >> 1. Is the task.consumer.batch.size config set for your job? >> 2. What does this log line say: Building default chooser with: >> useBatching=%s, useBootstrapping=%s, usePriority=%s >> 3. How long is the delay that you're seeing? >> >> Cheers, >> Chris >> >> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote: >> >>> It seems for me that, the batch processing fetches many messages at one >>> time and then takes too long time to process. My first thought is that, >>> since we have the systems.system-name.samza.fetch.threshold >>> >>><http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/config >>>ur >>> ation-table.html> >>> , >>> setting this number to a smaller (default is 50000) will force the >>>system >>> to fetch the messages more frequently. >>> >>> Any other ideas? >>> >>> Fang, Yan >>> [email protected] >>> +1 (206) 849-4108 >>> >>> >>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli <[email protected]> >>> wrote: >>> >>>> Hi, >>>> >>>> I have a stream processor that takes inputs from multiple streams, >>>>some >>>> are more batch, non-latency sensitive and others are real-time, >>>> infrequently have traffic and should be low-latency. The real-time >>>> stream >>>> helps me interpret the batch stream, so I would ideally like any >>>> real-time >>>> stream envelopes delivered within some maximum latency from the time >>>>the >>>> message enters into a Kafka topic. >>>> >>>> I have my stream processor configured to prioritize my real-time >>>>streams >>>> over the batch streams, but I consistently find that the real-time >>>> stream >>>> is delayed by traffic from the batch stream. From tracing the Kafka >>>> consumer, it looks like my stream processor periodically fetches from >>>> Kafka, finds that the batch streams have a large chunk of messages >>>> waiting, >>>> doesn¹t find anything on the real-time topics, and processes away the >>>> batch >>>> messages for a few minutes. During the batch processing, the Kafka >>>> consumer does not poll the real-time streams, so if a message is sent >>>> to a >>>> real-time topic, the message effectively doesn¹t arrive until the next >>>> time >>>> the Kafka consumer does another fetch. When a real-time message is >>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly >>>> prioritizes traffic from the real-time streams over the batch streams. >>>> >>>> Does anyone have recommendations on how to get infrequent but >>>>important >>>> messages within some maximum time bound? Thanks! >>>> ‹T >> >
