John Roesler created KAFKA-13714:
------------------------------------

             Summary: Flaky test IQv2StoreIntegrationTest
                 Key: KAFKA-13714
                 URL: https://issues.apache.org/jira/browse/KAFKA-13714
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.2.0
            Reporter: John Roesler


I have observed multiple consistency violations in the 
IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
apparently a major flaw in the feature, we should not release with this bug 
outstanding. Depending on the time-table, we may want to block the release or 
pull the feature until the next release.

 

The first observation I have is from 23 Feb 2022. So far all observations point 
to the range query in particular, and all observations have been for RocksDB 
stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store 
built on RocksDB segments.

For reference, range queries were implemented on 16 Feb 2022: 
[https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]

The window-specific range query test has also failed once that I have seen. 
That feature was implemented on 2 Jan 2022: 
[https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]

 

Here are some stack traces I have seen:
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
     but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
     but: was <[1, 3]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
         {code}
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
 executionInfo=[], position=Position{position={input-topic={0=1}}}}, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
 executionInfo=[], position=Position{position={input-topic={1=1}}}}}, 
globalResult=null}
Expected: is <[1, 2, 3]>
     but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
 {code}
{code:java}
verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 

    java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
 executionInfo=[], position=Position{position={input-topic={0=1}}}}, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
 executionInfo=[], position=Position{position={input-topic={1=1}}}}}, 
globalResult=null}
    Expected: is <[0, 1, 2, 3]> 
         but: was <[0, 2, 3]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
 {code}
 

Some observations:
 * After I added the whole query result to the failure message, we can see that 
the results are always past the desired position, even though they don't 
include all the data that should have been present in that position.
 * All the observed failures have happened with caching=true, but that it 
probably a red herring, since range queries skip the cache (cf 
fe72187cb15bf7dcc16e8630ed379e979c101151)
 * For a while, I thought that it might be a thread visibility problem with the 
iterators, since the missing record was always at the end of the range for some 
partition, but the window range failure is missing record 1, which is at the 
beginning of the range in partition 1.

I have been able to reproduce the failure locally, but only occasionally. I 
made some hacks to narrow down the space of possibilities: 
[https://github.com/vvcephei/kafka/commit/2a0776e52e378f1c59e98f352e3fa4f79c55842d]

I didn't have success running that one test until failure in IDEA. It has never 
failed for me in IDEA, even after thousands of attempts. In my testing branch, 
I added a loop to repeat one test configuration a thousand times in Gradle, but 
it still didn't fail reliably.

I also added a test to specifically check that RocksDB is giving the desired 
serialization both in one thread and across threads, and that test passes for 
me. My next thought is to expand that Tmp test to do the same with the 
RocksDBIterator class, or maybe just with a standalone RocksDBStore to see if 
we can reproduce it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to