There is a difference between .delete() and it.remove(). .delete() can only be called in a Streams operator that is responsible to maintain the state. This is of course required to give the developer writing the operator has full control over the store.
However, it.remove() is called *outside* from the Streams part of your app. Thus, if a second developer queries a store, she should not be able to "mess" with the store -- she does not own the store. Does this make sense? -Matthias On 3/22/17 3:27 PM, Tom Dearman wrote: > Hi, > > What I was trying to accomplish was the normal usage of the iterator > interface to enable safe remove while iterating over a collection. I > have used iterator.remove since kafka streams was released, so this > has been the real functionality since release and in the absence of > documentation to say otherwise feels like a bug has been introduced > now. If KeyValueStore#delete doesn't mess up the internal state > during the single threaded access to the store I'm not sure why > iterator.remove would.j > Having said that, I will save the keys for removal during iteration > and delete after. > > Thanks for you help. > > Tom > > On 22 March 2017 at 19:34, Michael Noll <mich...@confluent.io> wrote: >> To add to what Matthias said, in case the following isn't clear: >> >> - You should not (and, in 0.10.2, cannot any longer) call the iterator's >> remove() method, i.e. `KeyValueIterator#remove()` when iterating through a >> `KeyValueStore`. Perhaps this is something we should add to the >> `KeyValueIterator` javadocs. >> >> - You can of course call the store's delete() method: >> `KeyValueStore#delete(K key)`. >> >> Just mentioning this because, when reading the thread quickly, I missed the >> "iterator" part and thought removal/deletion on the store wasn't working. >> ;-) >> >> Best, >> Michael >> >> >> >> >> On Wed, Mar 22, 2017 at 8:18 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Hi, >>> >>> remove() should not be supported -- thus, it's actually a bug in 0.10.1 >>> that got fixed in 0.10.2. >>> >>> Stores should only be altered by Streams and iterator over the stores >>> should be read-only -- otherwise, you might mess up Streams internal state. >>> >>> I would highly recommend to reconsider the call to it.remove() in you >>> application. Not sure what you try to accomplish, but you should do it >>> differently. >>> >>> >>> -Matthias >>> >>> >>> On 3/22/17 8:00 AM, Tom Dearman wrote: >>>> Hi, hope someone on kafka-streams team can help. Our application uses >>>> >>>> KeyValueIterator it = KeyValueStore.all(); >>>> >>>> ….. >>>> it.remove() >>>> >>>> >>>> This used to work but is now broken, causes our punctuate to fail and >>> StreamThread to die. The cause seems to be that there were changes in >>> 0.10.2.0 to InMemoryKeyValueStoreSupplier: >>>> >>>> >>>> >>>> public synchronized KeyValueIterator<K, V> all() { >>>> final TreeMap<K, V> copy = new TreeMap<>(this.map); >>>> return new MemoryStoreIterator<>(copy.entrySet().iterator()); >>>> } >>>> >>>> @Override >>>> public synchronized KeyValueIterator<K, V> all() { >>>> final TreeMap<K, V> copy = new TreeMap<>(this.map); >>>> return new DelegatingPeekingKeyValueIterator<>(name, new >>> MemoryStoreIterator<>(copy.entrySet().iterator())); >>>> } >>>> But the DelegatingPeekingKeyValueIterator has: >>>> >>>> @Override >>>> public void remove() { >>>> throw new UnsupportedOperationException("remove not supported"); >>>> } >>>> whereas the old direct call on MemoryStoreIterator allowed remove. For >>> some reason there is no call to underlying.remove() in the >>> DelegatingPeekingKeyValueIterator. >>>> >>>> We don’t want to downgrade to 0.10.1.1 as there was a useful bug fix and >>> removing dependancy on zookeeper. >>>> >>>> Thanks, >>>> Tom >>>> >>> >>>
signature.asc
Description: OpenPGP digital signature