[ 
https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10148:
----------------------------------
    Description: 
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record in 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

Hence, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic.

I ran the system tests 10x with eos_alpha and 10x with eos_beta and only 
eos_beta failed a couple of times.

  was:
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record in 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

Hence, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic.

I run the system tests 10x with eos_alpha and 10x with eos_beta and only 
eos_beta failed a couple of times.


> Kafka Streams Restores too few Records with eos-beta Enabled 
> -------------------------------------------------------------
>
>                 Key: KAFKA-10148
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10148
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Blocker
>             Fix For: 2.6.0
>
>
> System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes 
> a bug that results in wrong results in the output topic. The cause seems to 
> be a too low end offset during restoration of a state store.
> Example:
> The system test computes a minimum aggregate over records in an input topic 
> and writes the results to an output topic. The input topic partition 
> {{data-1}} contains the following records among others:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
> headerKeys: [] key: 14920 payload: 9274
> ...
> {code}
> The output topic partition {{min-1}} contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code} 
> The last record is obviously wrong because 1595 is less than 9215.
> To test the resilience to an unexpected failure of a Streams client, the 
> system tests aborts a Streams client, i.e., the client is closed in a dirty 
> manner. This dirty close causes the Streams client to restore its local state 
> store that maintains the minimum aggregate from the beginning of the 
> changelog topic partitions 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. The 
> partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
> contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code}
> Also here the last record is wrong. 
> During the restoration, the Streams client uses its Kafka consumer to issue a 
> list offsets request to get the end offset of the changelog topic partition. 
> The response to the list offsets request contains end offset 1518 for 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be 
> seen here:
> {code}
> [2020-06-09 08:11:49,250] DEBUG [Consumer 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  groupId=null] Received LIST_OFFSETS response from node 2 for request with 
> header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
> responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
> EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Offset 1518 is before record in 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}
> {code}
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> {code}
> Hence, this record is not restored into the local state store. However, after 
> the restoration the input topic partition {{data-1}} is read starting with 
> offset 2094. That means that record
> {code}
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> {code} 
> is not read there either because it has a lower offset. Instead the following 
> record with with key 14920 and value 9274 is read, but since 9274 is not less 
> than 9215, value 9215 is written a second time to the output topic.
> I ran the system tests 10x with eos_alpha and 10x with eos_beta and only 
> eos_beta failed a couple of times.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to