[
https://issues.apache.org/jira/browse/KAFKA-18326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907138#comment-17907138
]
Almog Gavra commented on KAFKA-18326:
-------------------------------------
Note that existing tests did not capture this because they first inserted
non-tombstone entries into the cache. The following change causes the test to
fail.
{code:java}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index a678908b04..70e9ca8786 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -154,7 +154,6 @@ public class
MergedSortedCacheKeyValueBytesStoreIteratorTest {
for (final byte[] aByte : bytes) {
final Bytes aBytes = Bytes.wrap(aByte);
store.put(aBytes, aByte);
- cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
}
cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
{code}
> Cached stores may return deleted values
> ---------------------------------------
>
> Key: KAFKA-18326
> URL: https://issues.apache.org/jira/browse/KAFKA-18326
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Almog Gavra
> Priority: Critical
> Attachments: range-scan-fix.patch
>
>
> Reported in community Slack by Stanislav Savulchik. I've attached a patch
> fix, and am waiting for the reporter to submit a PR if they want to since
> they were the first to identify it!
> It affects basically every version of Kafka Streams out there...
> -----
> Hi everyone,
> I’m investigating an unexpected behavior of a KeyValueStore.prefixScan
> method that sometimes returns previously deleted keys if caching is enabled.
> Example pseudocode:
> {code:java}
> val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
> val store: KeyValueStore[(Int, String), String] = ???
> // store contents
> // (1, "A") -> "A"
> // (1, "B") -> "B"
> // using put instead of delete to avoid reading previous value
> store.put((1, "B"), null)
> // reading all key value pairs using key prefix
> val result: List[KeyValue[(Int, String), String]] =
> store.prefixScan(1, keyPrefixSerializer).asScala.toList
> // expected result
> // (1, "A") -> "A"
> // actual result
> // (1, "A") -> "A"
> // (1, "B") -> "B" (was previously deleted, but returned by the
> iterator){code}
> I tried to come up with a unit test for
> MergedSortedCacheKeyValueBytesStoreIterator (returned by
> KeyValueStore.prefixScan and other methods like range, all) in order to
> reproduce the behavior. And it also showed that the iterator returns more
> items than expected if I delete a larger key:
> {code:java}
> @Test
> public void shouldSkipAllDeletedFromCache1() {
> final byte[][] bytes = {{0}, {1}};
> for (final byte[] aByte : bytes) {
> store.put(Bytes.wrap(aByte), aByte);
> }
> cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
> // simulate key deletion from store that is cached
> try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
> createIterator()) {
> assertArrayEquals(bytes[0], iterator.next().key.get());
> assertFalse(iterator.hasNext()); //
> org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
> }
> }{code}
>
> But if I delete a smaller key the test is successful:
> {code:java}
> @Test
> public void shouldSkipAllDeletedFromCache0() {
> final byte[][] bytes = {{0}, {1}};
> for (final byte[] aByte : bytes) {
> store.put(Bytes.wrap(aByte), aByte);
> }
> cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
> try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
> createIterator()) {
> assertArrayEquals(bytes[1], iterator.next().key.get());
> assertFalse(iterator.hasNext());
> }
> }
> Could someone help me verify if it is a bug or am I missing something?
> Thank you.{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)