[
https://issues.apache.org/jira/browse/KAFKA-18326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909090#comment-17909090
]
Guozhang Wang commented on KAFKA-18326:
---------------------------------------
I've just tested the modified unit test myself and confirmed it, thanks for
reporting! Will take a look at the PR soon.
> Cached stores may return deleted values
> ---------------------------------------
>
> Key: KAFKA-18326
> URL: https://issues.apache.org/jira/browse/KAFKA-18326
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Almog Gavra
> Assignee: Almog Gavra
> Priority: Critical
> Attachments: range-scan-fix.patch
>
>
> Reported in community Slack by Stanislav Savulchik. I've attached a patch
> fix, and am waiting for the reporter to submit a PR if they want to since
> they were the first to identify it!
> It affects basically every version of Kafka Streams out there...
> -----
> Hi everyone,
> I’m investigating an unexpected behavior of a KeyValueStore.prefixScan
> method that sometimes returns previously deleted keys if caching is enabled.
> Example pseudocode:
> {code:java}
> val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
> val store: KeyValueStore[(Int, String), String] = ???
> // store contents
> // (1, "A") -> "A"
> // (1, "B") -> "B"
> // using put instead of delete to avoid reading previous value
> store.put((1, "B"), null)
> // reading all key value pairs using key prefix
> val result: List[KeyValue[(Int, String), String]] =
> store.prefixScan(1, keyPrefixSerializer).asScala.toList
> // expected result
> // (1, "A") -> "A"
> // actual result
> // (1, "A") -> "A"
> // (1, "B") -> "B" (was previously deleted, but returned by the
> iterator){code}
> I tried to come up with a unit test for
> MergedSortedCacheKeyValueBytesStoreIterator (returned by
> KeyValueStore.prefixScan and other methods like range, all) in order to
> reproduce the behavior. And it also showed that the iterator returns more
> items than expected if I delete a larger key:
> {code:java}
> @Test
> public void shouldSkipAllDeletedFromCache1() {
> final byte[][] bytes = {{0}, {1}};
> for (final byte[] aByte : bytes) {
> store.put(Bytes.wrap(aByte), aByte);
> }
> cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
> // simulate key deletion from store that is cached
> try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
> createIterator()) {
> assertArrayEquals(bytes[0], iterator.next().key.get());
> assertFalse(iterator.hasNext()); //
> org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
> }
> }{code}
>
> But if I delete a smaller key the test is successful:
> {code:java}
> @Test
> public void shouldSkipAllDeletedFromCache0() {
> final byte[][] bytes = {{0}, {1}};
> for (final byte[] aByte : bytes) {
> store.put(Bytes.wrap(aByte), aByte);
> }
> cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
> try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
> createIterator()) {
> assertArrayEquals(bytes[1], iterator.next().key.get());
> assertFalse(iterator.hasNext());
> }
> }
> Could someone help me verify if it is a bug or am I missing something?
> Thank you.{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)