[ 
https://issues.apache.org/jira/browse/SAMZA-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14069378#comment-14069378
 ] 

TJ Giuli commented on SAMZA-342:
--------------------------------

Hi, Chris, ok, I've run several experiments and unfortunately don't have much 
of a resolution.  Your explanation makes a lot of sense, but when I recompiled 
and ran with refreshThreshold set to 20000000 (much larger than the volume of 
messages I'm seeing on my bulk topics), I find little difference than when I 
set refreshThreshold to 1.

I'm including the Kafka and Samza logs below -- they (I think) show when Kafka 
records a message as appearing on the realtime topic and when it's finally 
observed in my stream processor.  I run the experiment 3 times each for 
refreshThreshold = 1 and 20000000.

{noformat}
SystemConsumer refreshThreshold=1:

[2014-07-21 14:23:21,797] TRACE Appended message set to log REALTIME-0 with 
first offset: 28, next offset: 30, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
136883696, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3336 
cap=3336]),28), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
2978032049, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),29)) (kafka.log.Log)

2014-07-21 14:23:36 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]


**************

[2014-07-21 14:26:53,210] TRACE Appended message set to log REALTIME-0 with 
first offset: 30, next offset: 32, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
4093869568, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3555 
cap=3555]),30), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
400086399, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),31)) (kafka.log.Log)

2014-07-21 14:27:07 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]

***************

[2014-07-21 14:29:38,453] TRACE Appended message set to log REALTIME-0 with 
first offset: 32, next offset: 34, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
2001288351, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3774 
cap=3774]),32), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
492894339, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),33)) (kafka.log.Log)

2014-07-21 14:29:53 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]

SystemConsumer: refreshThreshold = 20000000


[2014-07-21 14:34:14,468] TRACE Appended message set to log REALTIME-0 with 
first offset: 34, next offset: 36, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
2045673790, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=3993 
cap=3993]),34), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
3101088091, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),35)) (kafka.log.Log)

2014-07-21 14:34:29 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]

**********************

[2014-07-21 14:39:27,711] TRACE Appended message set to log REALTIME-0 with 
first offset: 36, next offset: 38, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
1761075894, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=4212 
cap=4212]),36), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
3315050524, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),37)) (kafka.log.Log)

2014-07-21 14:39:42 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]

****************************

[2014-07-21 14:42:40,181] TRACE Appended message set to log REALTIME-0 with 
first offset: 38, next offset: 40, and messages: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
2267447584, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=4431 
cap=4431]),38), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
3484175016, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1647 
cap=1647]),39)) (kafka.log.Log)

2014-07-21 14:42:51 TaskInstance [TRACE] Processing incoming message envelope 
for partition: Partition [partition=0], SystemStreamPartition 
[partition=Partition [partition=0], system=kafka, stream=REALTIME]
{noformat}

Strange!

> Priority streams experience large latencies before being consumed by the 
> stream processor
> -----------------------------------------------------------------------------------------
>
>                 Key: SAMZA-342
>                 URL: https://issues.apache.org/jira/browse/SAMZA-342
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.7.0
>         Environment: ubuntu 13.10
>            Reporter: TJ Giuli
>
> I have a stream processor that takes inputs from multiple streams, some are 
> more batch, non-latency sensitive and others are real-time, infrequently have 
> traffic and should be low-latency.  The real-time stream helps me interpret 
> the batch stream, so I would ideally like any real-time stream envelopes 
> delivered within some maximum latency from the time the message enters into a 
> Kafka topic.  
> I have my stream processor configured to prioritize my real-time streams over 
> the batch streams, but I consistently find that the real-time stream is 
> delayed by traffic from the batch stream.  From tracing the Kafka consumer, 
> it looks like my stream processor periodically fetches from Kafka, finds that 
> the batch streams have a large chunk of messages waiting, doesn’t find 
> anything on the real-time topics, and processes away the batch messages for a 
> few minutes. During the batch processing, the Kafka consumer does not poll 
> the real-time streams, so if a message is sent to a real-time topic, the 
> message effectively doesn’t arrive until the next time the Kafka consumer 
> does another fetch.  When a real-time message is consumed by the Kafka 
> consumer, the TieredPriorityChooser correctly prioritizes traffic from the 
> real-time streams over the batch streams.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to