[ https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna updated KAFKA-10148: ---------------------------------- Description: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 ... offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 headerKeys: [] key: 14920 payload: 9274 ... {code} The output topic partition {{min-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} The last record is obviously wrong because 1595 is less than 9215. To test the resilience to an unexpected failure of a Streams client, the system tests aborts a Streams client, i.e., the client is closed in a dirty manner. This dirty close causes the Streams client to restore its local state store that maintains the minimum aggregate from the beginning of the changelog topic partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} Also here the last record is wrong. During the restoration, the Streams client uses its Kafka consumer to issue a list offsets request to get the end offset of the changelog topic partition. The response to the list offsets request contains end offset 1518 for {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen here: {code} [2020-06-09 08:11:49,250] DEBUG [Consumer clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, groupId=null] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode: 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode: 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) (org.apache.kafka.clients.NetworkClient) {code} Offset 1518 is before record in {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} {code} offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 {code} Hence, this record is not restored into the local state store. However, after the restoration the input topic partition {{data-1}} is read starting with offset 2094. That means that record {code} offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 {code} is not read there either because it has a lower offset. Instead the following record with with key 14920 and value 9274 is read, but since 9274 is not less than 9215, value 9215 is written a second time to the output topic. I ran the system tests 10x with eos_alpha and 10x with eos_beta and only eos_beta failed a couple of times. was: System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store. Example: The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition {{data-1}} contains the following records among others: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 ... offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 headerKeys: [] key: 14920 payload: 9274 ... {code} The output topic partition {{min-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} The last record is obviously wrong because 1595 is less than 9215. To test the resilience to an unexpected failure of a Streams client, the system tests aborts a Streams client, i.e., the client is closed in a dirty manner. This dirty close causes the Streams client to restore its local state store that maintains the minimum aggregate from the beginning of the changelog topic partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} contains: {code} ... offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215 ... offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 ... offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215 ... {code} Also here the last record is wrong. During the restoration, the Streams client uses its Kafka consumer to issue a list offsets request to get the end offset of the changelog topic partition. The response to the list offsets request contains end offset 1518 for {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen here: {code} [2020-06-09 08:11:49,250] DEBUG [Consumer clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, groupId=null] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode: 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode: 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) (org.apache.kafka.clients.NetworkClient) {code} Offset 1518 is before record in {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} {code} offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595 {code} Hence, this record is not restored into the local state store. However, after the restoration the input topic partition {{data-1}} is read starting with offset 2094. That means that record {code} offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595 {code} is not read there either because it has a lower offset. Instead the following record with with key 14920 and value 9274 is read, but since 9274 is not less than 9215, value 9215 is written a second time to the output topic. I run the system tests 10x with eos_alpha and 10x with eos_beta and only eos_beta failed a couple of times. > Kafka Streams Restores too few Records with eos-beta Enabled > ------------------------------------------------------------- > > Key: KAFKA-10148 > URL: https://issues.apache.org/jira/browse/KAFKA-10148 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Bruno Cadonna > Assignee: Bruno Cadonna > Priority: Blocker > Fix For: 2.6.0 > > > System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes > a bug that results in wrong results in the output topic. The cause seems to > be a too low end offset during restoration of a state store. > Example: > The system test computes a minimum aggregate over records in an input topic > and writes the results to an output topic. The input topic partition > {{data-1}} contains the following records among others: > {code} > ... > offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 > headerKeys: [] key: 14920 payload: 9215 > ... > offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 > headerKeys: [] key: 14920 payload: 1595 > ... > offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 > headerKeys: [] key: 14920 payload: 9274 > ... > {code} > The output topic partition {{min-1}} contains: > {code} > ... > offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 > headerKeys: [] key: 14920 payload: 9215 > ... > offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 > headerKeys: [] key: 14920 payload: 1595 > ... > offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 > headerKeys: [] key: 14920 payload: 9215 > ... > {code} > The last record is obviously wrong because 1595 is less than 9215. > To test the resilience to an unexpected failure of a Streams client, the > system tests aborts a Streams client, i.e., the client is closed in a dirty > manner. This dirty close causes the Streams client to restore its local state > store that maintains the minimum aggregate from the beginning of the > changelog topic partitions > {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. The > partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} > contains: > {code} > ... > offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 > headerKeys: [] key: 14920 payload: 9215 > ... > offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 > headerKeys: [] key: 14920 payload: 1595 > ... > offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 > headerKeys: [] key: 14920 payload: 9215 > ... > {code} > Also here the last record is wrong. > During the restoration, the Streams client uses its Kafka consumer to issue a > list offsets request to get the end offset of the changelog topic partition. > The response to the list offsets request contains end offset 1518 for > {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be > seen here: > {code} > [2020-06-09 08:11:49,250] DEBUG [Consumer > clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, > groupId=null] Received LIST_OFFSETS response from node 2 for request with > header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, > clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, > correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, > responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode: > 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), > EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode: > 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) > (org.apache.kafka.clients.NetworkClient) > {code} > Offset 1518 is before record in > {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} > {code} > offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 > headerKeys: [] key: 14920 payload: 1595 > {code} > Hence, this record is not restored into the local state store. However, after > the restoration the input topic partition {{data-1}} is read starting with > offset 2094. That means that record > {code} > offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 > headerKeys: [] key: 14920 payload: 1595 > {code} > is not read there either because it has a lower offset. Instead the following > record with with key 14920 and value 9274 is read, but since 9274 is not less > than 9215, value 9215 is written a second time to the output topic. > I ran the system tests 10x with eos_alpha and 10x with eos_beta and only > eos_beta failed a couple of times. -- This message was sent by Atlassian Jira (v8.3.4#803005)