[ 
https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836065#comment-16836065
 ] 

tdp commented on KAFKA-8339:
----------------------------

Hi Matthias,

Thanks for the quick reply! After digging into this a bit more, I think the 
issue is in our aggregation logic. We aggregate records and put the 
sub-aggregation result into the local state store. When the aggregation is 
"full" and the non-null value is returned by KSTREAM-TRANSFORMVALUES-NODE1, the 
transform node logic also deletes the sub-aggregation from the local state 
store. This is the flaw in our logic, since only the last record from topic T1 
is consumed by the new stream thread and it does not see the sub-aggregation in 
the local state store since it was previously deleted. I was incorrectly 
assuming that the earlier records should have been re-consumed. The logs below 
explain in more detail.

I also should have specified that we are using a customized exception handler 
that looks like this:
{code:java}
public class CustomLogAndFailExceptionHandler implements 
ProductionExceptionHandler, DeserializationExceptionHandler {
    // ...
    
    @Override public ProductionExceptionHandlerResponse 
handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
        // ...
        return ProductionExceptionHandlerResponse.FAIL;
    }

    @Override public DeserializationHandlerResponse handle(ProcessorContext 
context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
        // ...
        return DeserializationHandlerResponse.FAIL;
    }
}
{code}
For clarity, here's some selected logs (with additional custom debug/info logs 
added):

Record 1 (of 3) aggregated and sub-aggregation written to local state store but 
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,953 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,954 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '1' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,955 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !0d81fc45-f485-4676-901f-6c1ced7042b0 for 
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
Record 2 (of 3) aggregated and sub-aggregation written to local state store but 
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,969 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,969 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,971 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,974 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,978 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '2' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,981 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for 
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
Record 3 (of 3) aggregated, completed-sub-aggregation deleted from the local 
state store, and not filtered out by KSTREAM-FILTER-NODE1 since the aggregation 
is complete:
{noformat}
07 May 2019 14:03:10,996 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 and offset 7 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,996 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 and offset 7 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,997 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,999 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:11,001 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '3' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:11,004 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for 
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
Record is written to TOPIC T2:
{noformat}
07 May 2019 14:03:11,005 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SinkNode: KSTREAM-SINK-NODE1 
writing key #data#foobar# to topic TOPIC-T2 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
StreamThread-18 commits offsets for task 1_10 before the production above fails:
{noformat}
07 May 2019 14:03:12,177 [DEBUG] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Committing
07 May 2019 14:03:12,177 [DEBUG] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorStateManager: task [1_10] 
Flushing all stores registered in the state manager
07 May 2019 14:03:12,181 [DEBUG] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl: task [1_10] 
Flushing producer
{noformat}
Production fails:
{noformat}
07 May 2019 14:03:22,184 [ERROR] (kafka-producer-network-thread | 
StreamThread-18-producer) CustomLogAndFailExceptionHandler: [FATAL] Production 
of message failed. Record:Record(topic=TOPIC-T2, partition=null, 
headers=[RecordHeader(key=RequestId, 
value=554a65bc-e0bd-486d-abfa-ed3a3ac75af1)], key=#data#foobar#, value={...}, 
timestamp=1557237790079) org.apache.kafka.common.errors.NetworkException: The 
server disconnected before a response was received.
07 May 2019 14:03:22,185 [ERROR] (kafka-producer-network-thread | 
StreamThread-18-producer) 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl: task [1_10] 
Error sending record (key #data#foobar# value java.nio.HeapByteBuffer[pos=0 
lim=170 cap=170] timestamp 1557237790079) to topic TOPIC-T2 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.; No more records will be sent and no more offsets will 
be recorded for this task.
07 May 2019 14:03:23,189 [ERROR] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: 
stream-thread [StreamThread-18] Failed to commit stream task 1_10 due to the 
following error:org.apache.kafka.streams.errors.StreamsException: task [1_10] 
Abort sending since an error caught with a previous record (key #data#foobar# 
value java.nio.HeapByteBuffer[pos=0 lim=170 cap=170] timestamp 1557237790079) 
to topic TOPIC-T2 due to org.apache.kafka.common.errors.NetworkException: The 
server disconnected before a response was received.
| You can increase producer parameter `retries` and `retry.backoff.ms` to avoid 
this error.
| at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
| at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
| at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
| at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
| at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
| at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
| at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
| at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
| at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
| at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
| at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
| at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
| at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
| at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
| at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
| at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
| at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
| at java.lang.Thread.run(Thread.java:748)
| Caused by: org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.
{noformat}
StreamThread-18 shuts itself down due to the CustomLogAndFailExceptionHandler 
returning FAIL:
{noformat}
07 May 2019 14:03:23,237 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Shutting down
07 May 2019 14:03:23,237 [DEBUG] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.TaskManager: stream-thread 
[StreamThread-18] Shutting down all active tasks [1_10, 2_10], standby tasks 
[], suspended tasks [], and suspended standby tasks []
07 May 2019 14:03:23,302 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Shutdown complete
{noformat}
StreamThread-15 picks up task 1_10:
{noformat}
07 May 2019 14:03:39,916 [DEBUG] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.TaskManager: stream-thread 
[StreamThread-15] Adding assigned tasks as active: {1_10=[TOPIC-T1-10]}
07 May 2019 14:03:40,302 [DEBUG] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: 
stream-thread [StreamThread-15] transitioning stream task 1_10 to running
{noformat}
StreamThread-15 consumes the record starting at offset 7 instead of offset 5 as 
I _incorrectly_ assumed would happen:
{noformat}
07 May 2019 14:03:40,407 [INFO] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-15] Polled record for topic TOPIC-T1 and partition 10 with key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 and offset 7 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:40,407 [INFO] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 and offset 7 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:40,407 [INFO] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:40,407 [INFO] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
and since our logic wipes the sub-aggregations from the local state store when 
all expected records have been processed, there's no way for the same logic to 
work when it starts over from the last record in the aggregation (offset 7):
{noformat}
07 May 2019 14:03:40,408 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '1' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:40,737 [INFO] (StreamThread-15) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !afcb3cb6-edf2-4f44-8f0a-2a8a7a408252 for 
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
It seems that session windows may be more appropriate than the custom 
aggregation we've built on top of the local state store.

> At-least-once delivery guarantee seemingly not met due to async commit / 
> produce failure race condition
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8339
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: tdp
>            Priority: Major
>
> We have hit a race condition several times now between the StreamThread 
> committing its offsets for a task before the task has fully processed the 
> record through the topology.
>  
> Consider part of a topology that looks like this:
>  
> TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > 
> KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC 
> T2
>  
> Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these 
> records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records 
> using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not 
> all necessary records from topic T1 have been consumed yet or an object 
> representing an aggregation of records if all necessary records from topic T1 
> have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is 
> null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 
> node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into 
> another object type. KSTREAM-SINK-NODE1 then attempts to produce this other 
> object to topic T2.
>  
> The race condition occurs when the stream thread commits its offsets for 
> topic T1 after it consumes some or all of the necessary records from topic T1 
> for an aggregation but before it gets the failure response back from the 
> async produce kicked off by KSTREAM-SINK-NODE1.
>  
> We are running with a LogAndFailExceptionHandler, so when the stream thread 
> tries to commit the next time it fails and the stream thread shuts itself 
> down. The stream task is then reassigned to another stream thread, which 
> reads the offsets previously committed by the original stream thread. That 
> means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to 
> consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 
> will never end up producing the required records to topic T2. This is why it 
> seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 
> never successfully processed records and the stream application continued on 
> past it.
> Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which 
> increases the likelihood of occurrence of the issue when all retries fail 
> since it widens the window at which the async offset commit can occur before 
> the produce record request is marked as failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to