Almog Gavra created KAFKA-18326:
-----------------------------------
Summary: Cached stores may return deleted values
Key: KAFKA-18326
URL: https://issues.apache.org/jira/browse/KAFKA-18326
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Almog Gavra
Attachments: range-scan-fix.patch
Reported in community Slack by Stanislav Savulchik. I've attached a patch fix,
and am waiting for the reporter to submit a PR if they want to since they were
the first to identify it!
It affects basically every version of Kafka Streams out there...
-----
Hi everyone,
I’m investigating an unexpected behavior of a KeyValueStore.prefixScan method
that sometimes returns previously deleted keys if caching is enabled. Example
pseudocode:
{code:java}
val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
val store: KeyValueStore[(Int, String), String] = ???
// store contents
// (1, "A") -> "A"
// (1, "B") -> "B"
// using put instead of delete to avoid reading previous value
store.put((1, "B"), null)
// reading all key value pairs using key prefix
val result: List[KeyValue[(Int, String), String]] =
store.prefixScan(1, keyPrefixSerializer).asScala.toList
// expected result
// (1, "A") -> "A"
// actual result
// (1, "A") -> "A"
// (1, "B") -> "B" (was previously deleted, but returned by the iterator){code}
I tried to come up with a unit test for
MergedSortedCacheKeyValueBytesStoreIterator (returned by
KeyValueStore.prefixScan and other methods like range, all) in order to
reproduce the behavior. And it also showed that the iterator returns more items
than expected if I delete a larger key:
{code:java}
@Test
public void shouldSkipAllDeletedFromCache1() {
final byte[][] bytes = {{0}, {1}};
for (final byte[] aByte : bytes) {
store.put(Bytes.wrap(aByte), aByte);
}
cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); //
simulate key deletion from store that is cached
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
assertArrayEquals(bytes[0], iterator.next().key.get());
assertFalse(iterator.hasNext()); //
org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
}
}{code}
But if I delete a smaller key the test is successful:
{code:java}
@Test
public void shouldSkipAllDeletedFromCache0() {
final byte[][] bytes = {{0}, {1}};
for (final byte[] aByte : bytes) {
store.put(Bytes.wrap(aByte), aByte);
}
cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
try (final MergedSortedCacheKeyValueBytesStoreIterator iterator =
createIterator()) {
assertArrayEquals(bytes[1], iterator.next().key.get());
assertFalse(iterator.hasNext());
}
}
Could someone help me verify if it is a bug or am I missing something?
Thank you.{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)