[ https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
tdp resolved KAFKA-8339. ------------------------ Resolution: Not A Problem Marking as resolved. This is working as expected from the streams side and the bug is in user-specific aggregation logic. > At-least-once delivery guarantee seemingly not met due to async commit / > produce failure race condition > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-8339 > URL: https://issues.apache.org/jira/browse/KAFKA-8339 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.1 > Reporter: tdp > Priority: Major > > We have hit a race condition several times now between the StreamThread > committing its offsets for a task before the task has fully processed the > record through the topology. > > Consider part of a topology that looks like this: > > TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > > KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC > T2 > > Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these > records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records > using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not > all necessary records from topic T1 have been consumed yet or an object > representing an aggregation of records if all necessary records from topic T1 > have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is > null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 > node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into > another object type. KSTREAM-SINK-NODE1 then attempts to produce this other > object to topic T2. > > The race condition occurs when the stream thread commits its offsets for > topic T1 after it consumes some or all of the necessary records from topic T1 > for an aggregation but before it gets the failure response back from the > async produce kicked off by KSTREAM-SINK-NODE1. > > We are running with a LogAndFailExceptionHandler, so when the stream thread > tries to commit the next time it fails and the stream thread shuts itself > down. The stream task is then reassigned to another stream thread, which > reads the offsets previously committed by the original stream thread. That > means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to > consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 > will never end up producing the required records to topic T2. This is why it > seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 > never successfully processed records and the stream application continued on > past it. > Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which > increases the likelihood of occurrence of the issue when all retries fail > since it widens the window at which the async offset commit can occur before > the produce record request is marked as failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)