Hi, hope someone on kafka-streams team can help. Our application uses
KeyValueIterator it = KeyValueStore.all();
…..
it.remove()
This used to work but is now broken, causes our punctuate to fail and
StreamThread to die. The cause seems to be that there were changes in 0.10.2.0
to InMemoryKeyValueStoreSupplier:
public synchronized KeyValueIterator<K, V> all() {
final TreeMap<K, V> copy = new TreeMap<>(this.map);
return new MemoryStoreIterator<>(copy.entrySet().iterator());
}
@Override
public synchronized KeyValueIterator<K, V> all() {
final TreeMap<K, V> copy = new TreeMap<>(this.map);
return new DelegatingPeekingKeyValueIterator<>(name, new
MemoryStoreIterator<>(copy.entrySet().iterator()));
}
But the DelegatingPeekingKeyValueIterator has:
@Override
public void remove() {
throw new UnsupportedOperationException("remove not supported");
}
whereas the old direct call on MemoryStoreIterator allowed remove. For some
reason there is no call to underlying.remove() in the
DelegatingPeekingKeyValueIterator.
We don’t want to downgrade to 0.10.1.1 as there was a useful bug fix and
removing dependancy on zookeeper.
Thanks,
Tom