Hey TJ, I've commented on your ticket:
https://issues.apache.org/jira/browse/SAMZA-342 You'll also want to take a look at: https://issues.apache.org/jira/browse/SAMZA-245 Cheers, Chris On 7/16/14 3:46 PM, "TJ Giuli" <[email protected]> wrote: >Hi, Chris, there may be some latency between the producer and Kafka, but >it looks like there is still a significant delay from when Kafka logs the >receipt of a new message committed to a real-time topic and when the >Samza stream processor receives it (11 seconds in my latest test run). >Thanks, >—T > >On Jul 16, 2014, at 9:40 AM, Chris Riccomini ><[email protected]> wrote: > >> Hey TJ, >> >> Also, is it possible that your upstream Kafka producer (the one sending >> the RT messages) is set to async? In such a case, the messages will be >> buffered for some period of time before they're sent to Kafka. >> >> Cheers, >> Chris >> >> On 7/16/14 9:35 AM, "Chris Riccomini" <[email protected]> wrote: >> >>> Hey TJ, >>> >>> To debug this, we're going to have to get our hands dirty with some >>> metrics. Sorry. :( >>> >>> The first step is to establish whether the BrokerProxy code in Samza is >>> actively getting data for the realtime topic when you're processing >>>batch >>> messages. To do this, please turn on the JMX reporter for your job and >>> look at this metric: >>> >>> %s-%s-messages-read" format (tp.topic, tp.partition)) >>> >>> >>> >>> Even if your job is in the middle of processing batch data, this metric >>> should increase when messages are sent to the realtime topic. If it >>> increases, then the message is received in realtime, but being buffered >>> somewhere in the SamzaContainer before being handed off to your >>> StreamTask. If the number does not increase, then the BrokerProxy >>>either >>> isn't fetching the realtime data, or the realtime data is being >>>buffered >>> somewhere upstream (on the producer, or in the broker). >>> >>> Once we bisect here, we can continue narrowing down where the problem >>>is. >>> >>> Cheers, >>> Chris >>> >>> On 7/15/14 12:50 PM, "TJ Giuli" <[email protected]> wrote: >>> >>>> Hey, Chris: >>>> 1. Yes, batch size set to 1: >>>>systems.kafka.producer.batch.num.messages=1 >>>> 2. Building default chooser with: useBatching=false, >>>> useBootstrapping=false, usePriority=true >>>> 3. I just re-ran my test and the delay is about 16 seconds. When I’m >>>> really pumping data through the batch Kafka topics, I’ve seen around 1 >>>> minute. >>>> >>>> Thanks! >>>> —T >>>> >>>> On Jul 15, 2014, at 10:11 AM, Chris Riccomini >>>> <[email protected]> wrote: >>>> >>>>> Hey TJ, >>>>> >>>>> This sounds like a bug. >>>>> >>>>> 1. Is the task.consumer.batch.size config set for your job? >>>>> 2. What does this log line say: Building default chooser with: >>>>> useBatching=%s, useBootstrapping=%s, usePriority=%s >>>>> 3. How long is the delay that you're seeing? >>>>> >>>>> Cheers, >>>>> Chris >>>>> >>>>> On 7/14/14 4:57 PM, "Yan Fang" <[email protected]> wrote: >>>>> >>>>>> It seems for me that, the batch processing fetches many messages at >>>>>> one >>>>>> time and then takes too long time to process. My first thought is >>>>>> that, >>>>>> since we have the systems.system-name.samza.fetch.threshold >>>>>> >>>>>> >>>>>><http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/con >>>>>>fi >>>>>> g >>>>>> ur >>>>>> ation-table.html> >>>>>> , >>>>>> setting this number to a smaller (default is 50000) will force the >>>>>> system >>>>>> to fetch the messages more frequently. >>>>>> >>>>>> Any other ideas? >>>>>> >>>>>> Fang, Yan >>>>>> [email protected] >>>>>> +1 (206) 849-4108 >>>>>> >>>>>> >>>>>> On Mon, Jul 14, 2014 at 1:12 PM, TJ Giuli >>>>>><[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a stream processor that takes inputs from multiple streams, >>>>>>> some >>>>>>> are more batch, non-latency sensitive and others are real-time, >>>>>>> infrequently have traffic and should be low-latency. The real-time >>>>>>> stream >>>>>>> helps me interpret the batch stream, so I would ideally like any >>>>>>> real-time >>>>>>> stream envelopes delivered within some maximum latency from the >>>>>>>time >>>>>>> the >>>>>>> message enters into a Kafka topic. >>>>>>> >>>>>>> I have my stream processor configured to prioritize my real-time >>>>>>> streams >>>>>>> over the batch streams, but I consistently find that the real-time >>>>>>> stream >>>>>>> is delayed by traffic from the batch stream. From tracing the >>>>>>>Kafka >>>>>>> consumer, it looks like my stream processor periodically fetches >>>>>>>from >>>>>>> Kafka, finds that the batch streams have a large chunk of messages >>>>>>> waiting, >>>>>>> doesn¹t find anything on the real-time topics, and processes away >>>>>>>the >>>>>>> batch >>>>>>> messages for a few minutes. During the batch processing, the Kafka >>>>>>> consumer does not poll the real-time streams, so if a message is >>>>>>>sent >>>>>>> to a >>>>>>> real-time topic, the message effectively doesn¹t arrive until the >>>>>>> next >>>>>>> time >>>>>>> the Kafka consumer does another fetch. When a real-time message is >>>>>>> consumed by the Kafka consumer, the TieredPriorityChooser correctly >>>>>>> prioritizes traffic from the real-time streams over the batch >>>>>>> streams. >>>>>>> >>>>>>> Does anyone have recommendations on how to get infrequent but >>>>>>> important >>>>>>> messages within some maximum time bound? Thanks! >>>>>>> ‹T >>>>> >>>> >>> >> >
