Hello,
I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and
found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is deleted,
after restoring from the topic using
**org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
* those formerly deleted records are once again returned by that store when
using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected
This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663,
in that I would like to be able to modify this restore behaviour.
However it is also different, because I think it is not documented anywhere and
it is unintuitive (to me) - since it changes how the application behaves after
restarting it even if the kafka cluster itself was not changed - so I think
it's more of a bug than missing documentation.
Some more information, the topic is configured like this
```java
cleanup.policy: compact
compression.type: producer
delete.retention.ms: 86400000
max.compaction.lag.ms: 9223372036854776000
min.compaction.lag.ms: 0
retention.bytes: -1
retention.ms: 86400000
```
I am adding the global store like so
```java
streamsBuilder.addGlobalStore(
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("foobar"),
Serdes.String(),
Serdes.String()),
"foo.bar.globaltopic",
Consumed.with(Serdes.String(), Serdes.String()),
() -> new FooBarUpdateHandler(timeService)
);
```
and here is the definition of 'FooBarUpdateHandler'
```java
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Internal class handling partFamily updates.
*/
public class FooBarUpdateHandler implements Processor<String, String, Void,
Void> {
private static final Logger logger =
LoggerFactory.getLogger(FooBarUpdateHandler.class);
private TimestampedKeyValueStore<String, String> store;
@Override
public void init(final
org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
store = context.getStateStore("foobar");
}
@Override
public void process(final Record<String, String> record) {
// handle tombstones from input topic
if (record.value() == null == record.value().equals("deleteme")) {
store.delete(record.key());
} else {
store.put(
record.key(),
ValueAndTimestamp.make(
record.key(),
Instant.now().toEpochMilli()
)
);
}
// this is not relevant
// it's only to show the issue when restarting and restoring the
final List<String> existingKeys = new ArrayList<>();
try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
store.all()) {
all.forEachRemaining((r) -> {
existingKeys.add(r.key);
});
}
logger.info("Got {} records in the store, with keys {}",
existingKeys.size(), String.join(",", existingKeys));
}
}
```
My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler'
```java
try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
store.all()) {
if (all == null) {
return;
}
logger.info("Removing already deleted records from rocksdb representing the
global store {}", storeName);
all.forEachRemaining(r -> {
if (r != null && r.key != null && store.get(r.key) == null) {
store.delete(r.key);
}
});
}
```
Now it is again consistent across restarts.
Kind Regards,
Patrick
Patrick D’Addona
Senior Lead IT Architect
Mobile: +49 151 544 22 161
[email protected]
Theresienhöhe 13, 80339 München
MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
____________________________________________________________