[ 
https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10148:
----------------------------------
    Description: 
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record 

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

So that, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic. 

In the response to the first fetch request to restore the local state store, 
one can see that the high watermark and the last stable offset diverge:

{code}
[2020-06-09 08:11:49,338] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Fetch READ_COMMITTED at offset 0 for partition 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1 returned fetch 
data (error=NONE, highWaterMark=2313, lastStableOffset = 1518, logStartOffset = 
0, preferredReadReplica = absent, abortedTransactions = [(producerId=1, 
firstOffset=708)], recordsSizeInBytes=45372) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{code}  

However, to ensure that everything in the changelog topic partition is written 
to the local state store during restoration they should not diverge.

  was:
System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.

Example:

The system test computes a minimum aggregate over records in an input topic and 
writes the results to an output topic. The input topic partition {{data-1}} 
contains the following records among others:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
headerKeys: [] key: 14920 payload: 9274
...
{code}

The output topic partition {{min-1}} contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code} 

The last record is obviously wrong because 1595 is less than 9215.

To test the resilience to an unexpected failure of a Streams client, the system 
tests aborts a Streams client, i.e., the client is closed in a dirty manner. 
This dirty close causes the Streams client to restore its local state store 
that maintains the minimum aggregate from the beginning of the changelog topic 
partitions {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. 
The partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
contains:

{code}
...
offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
headerKeys: [] key: 14920 payload: 9215
...
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
...
offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
headerKeys: [] key: 14920 payload: 9215
...
{code}

Also here the last record is wrong. 

During the restoration, the Streams client uses its Kafka consumer to issue a 
list offsets request to get the end offset of the changelog topic partition. 
The response to the list offsets request contains end offset 1518 for 
{{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be seen 
here:

{code}
[2020-06-09 08:11:49,250] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Received LIST_OFFSETS response from node 2 for request with 
header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
(org.apache.kafka.clients.NetworkClient)
{code}

Offset 1518 is before record 

{code}
offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
headerKeys: [] key: 14920 payload: 1595
{code}

So that, this record is not restored into the local state store. However, after 
the restoration the input topic partition {{data-1}} is read starting with 
offset 2094. That means that record

{code}
offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
headerKeys: [] key: 14920 payload: 1595
{code} 

is not read there either because it has a lower offset. Instead the following 
record with with key 14920 and value 9274 is read, but since 9274 is not less 
than 9215, value 9215 is written a second time to the output topic. 

In the response to the first fetch request to restore the local state store, 
one can see that the high watermark and the last stable offset diverge:

{code}
[2020-06-09 08:11:49,338] DEBUG [Consumer 
clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
 groupId=null] Fetch READ_COMMITTED at offset 0 for partition 
EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1 returned fetch 
data (error=NONE, highWaterMark=2313, lastStableOffset = 1518, logStartOffset = 
0, preferredReadReplica = absent, abortedTransactions = [(producerId=1, 
firstOffset=708)], recordsSizeInBytes=45372) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{code}  



> Kafka Streams Restores too few Records with eos-beta Enabled 
> -------------------------------------------------------------
>
>                 Key: KAFKA-10148
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10148
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Blocker
>             Fix For: 2.6.0
>
>
> System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes 
> a bug that results in wrong results in the output topic. The cause seems to 
> be a too low end offset during restoration of a state store.
> Example:
> The system test computes a minimum aggregate over records in an input topic 
> and writes the results to an output topic. The input topic partition 
> {{data-1}} contains the following records among others:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
> headerKeys: [] key: 14920 payload: 9274
> ...
> {code}
> The output topic partition {{min-1}} contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code} 
> The last record is obviously wrong because 1595 is less than 9215.
> To test the resilience to an unexpected failure of a Streams client, the 
> system tests aborts a Streams client, i.e., the client is closed in a dirty 
> manner. This dirty close causes the Streams client to restore its local state 
> store that maintains the minimum aggregate from the beginning of the 
> changelog topic partitions 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}}. The 
> partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} 
> contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code}
> Also here the last record is wrong. 
> During the restoration, the Streams client uses its Kafka consumer to issue a 
> list offsets request to get the end offset of the changelog topic partition. 
> The response to the list offsets request contains end offset 1518 for 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1}} as can be 
> seen here:
> {code}
> [2020-06-09 08:11:49,250] DEBUG [Consumer 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  groupId=null] Received LIST_OFFSETS response from node 2 for request with 
> header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
> responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
> EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Offset 1518 is before record 
> {code}
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> {code}
> So that, this record is not restored into the local state store. However, 
> after the restoration the input topic partition {{data-1}} is read starting 
> with offset 2094. That means that record
> {code}
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> {code} 
> is not read there either because it has a lower offset. Instead the following 
> record with with key 14920 and value 9274 is read, but since 9274 is not less 
> than 9215, value 9215 is written a second time to the output topic. 
> In the response to the first fetch request to restore the local state store, 
> one can see that the high watermark and the last stable offset diverge:
> {code}
> [2020-06-09 08:11:49,338] DEBUG [Consumer 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  groupId=null] Fetch READ_COMMITTED at offset 0 for partition 
> EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1 returned fetch 
> data (error=NONE, highWaterMark=2313, lastStableOffset = 1518, logStartOffset 
> = 0, preferredReadReplica = absent, abortedTransactions = [(producerId=1, 
> firstOffset=708)], recordsSizeInBytes=45372) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {code}  
> However, to ensure that everything in the changelog topic partition is 
> written to the local state store during restoration they should not diverge.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to