[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r248240356 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ## @@ -1032,75 +1040,157 @@ public int sizeOfNamespace(Object namespace) { // StateEntryIterator - + @Override + public StateIteratorWithUpdate getStateEntryIteratorWithUpdate() { + return new StateEntryIteratorWithModifications(); + } + /** -* Iterator over the entries in a {@link CopyOnWriteStateTable}. +* Iterator over state entries in a {@link CopyOnWriteStateTable}. */ - class StateEntryIterator implements Iterator> { - private StateTableEntry[] activeTable; - private int nextTablePosition; - private StateTableEntry nextEntry; - private int expectedModCount; + class BaseStateEntryIterator implements Iterator> { + StateTableEntry[] activeTable; + int nextTablePosition; + StateTableEntry nextEntry; + StateTableEntry entryToReturn; - StateEntryIterator() { + BaseStateEntryIterator() { this.activeTable = primaryTable; this.nextTablePosition = 0; - this.expectedModCount = modCount; this.nextEntry = getBootstrapEntry(); advanceIterator(); } - private StateTableEntry advanceIterator() { - - StateTableEntry entryToReturn = nextEntry; - StateTableEntry next = entryToReturn.next; - - // consider both sub-tables tables to cover the case of rehash - while (next == null) { - - StateTableEntry[] tab = activeTable; + @Override + public boolean hasNext() { + return nextEntry != null; + } - while (nextTablePosition < tab.length) { - next = tab[nextTablePosition++]; + @Override + public StateEntry next() { + if (!hasNext()) { Review comment: In the base class, this looks like it can be dropped, so maybe that should just go into the overriden next method of the new iterator before a `super.next()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r248240356 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ## @@ -1032,75 +1040,157 @@ public int sizeOfNamespace(Object namespace) { // StateEntryIterator - + @Override + public StateIteratorWithUpdate getStateEntryIteratorWithUpdate() { + return new StateEntryIteratorWithModifications(); + } + /** -* Iterator over the entries in a {@link CopyOnWriteStateTable}. +* Iterator over state entries in a {@link CopyOnWriteStateTable}. */ - class StateEntryIterator implements Iterator> { - private StateTableEntry[] activeTable; - private int nextTablePosition; - private StateTableEntry nextEntry; - private int expectedModCount; + class BaseStateEntryIterator implements Iterator> { + StateTableEntry[] activeTable; + int nextTablePosition; + StateTableEntry nextEntry; + StateTableEntry entryToReturn; - StateEntryIterator() { + BaseStateEntryIterator() { this.activeTable = primaryTable; this.nextTablePosition = 0; - this.expectedModCount = modCount; this.nextEntry = getBootstrapEntry(); advanceIterator(); } - private StateTableEntry advanceIterator() { - - StateTableEntry entryToReturn = nextEntry; - StateTableEntry next = entryToReturn.next; - - // consider both sub-tables tables to cover the case of rehash - while (next == null) { - - StateTableEntry[] tab = activeTable; + @Override + public boolean hasNext() { + return nextEntry != null; + } - while (nextTablePosition < tab.length) { - next = tab[nextTablePosition++]; + @Override + public StateEntry next() { + if (!hasNext()) { Review comment: In the base class, this looks like it can be dropped, so maybe that should just go into the overriden next method of the new iterator before a `super.next()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r248206342 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ## @@ -1139,43 +1112,84 @@ private void amendIterationIfNextStale() { } return null; } + } + + /** +* Iterator over state entries in a {@link CopyOnWriteStateTable} which does not tolerate concurrent modifications. +*/ + class StateEntryIteratorWithoutModifications extends BaseStateEntryIterator { + private final int expectedModCount; + + StateEntryIteratorWithoutModifications() { + super(); + this.expectedModCount = modCount; + } + + @Override + public StateEntry next() { + assertConcurrentModificationsIfNotAllowed(); Review comment: You can drop the `IfNotAllowed` part from the name now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r247852062 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java ## @@ -82,5 +88,19 @@ public void setCurrentNamespace(N namespace) { @Override public void clear() { original.clear(); + accessCallback.run(); + } + + /** +* Check if state has expired or not and update it if it has partially expired. +* +* @return either non expired (possibly updated) state or null if the state has expired. +*/ + @Nullable + public abstract TTLSV checkIfExpiredOrUpdate(@Nonnull TTLSV ttlValue); Review comment: This method name sounds still like it should return a boolean as it did in the beginning, but now it does more so I would consider adjusting the name as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r247843856 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ## @@ -1032,75 +1040,143 @@ public int sizeOfNamespace(Object namespace) { // StateEntryIterator - + @Override + public StateIteratorWithUpdate getStateEntryIterator() { + return new StateEntryIterator(true); + } + /** -* Iterator over the entries in a {@link CopyOnWriteStateTable}. +* Iterator over state entries in a {@link CopyOnWriteStateTable}. +* +* This iterator can optionally tolerate concurrent modifications and +* support removing of underlying next key state. */ - class StateEntryIterator implements Iterator> { + class StateEntryIterator implements StateIteratorWithUpdate { Review comment: I would suggest not to mix two iterators into one implementation and deceide behaviour on a boolean flag. Better have a common superclass and replace the boolean flag by polymorphic methods. Then you can also only have the subclass "with update" implement the full `StateIteratorWithUpdate` whereas the other can still simply work as `Iterator` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r247837409 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -269,5 +300,37 @@ public StateTtlConfig build() { public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } + + public IncrementalCleanupStrategy getIncrementalCleanupStrategy() { + return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP); + } + } + + /** Configuration of cleanup strategy while taking the full snapshot. */ + public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy { + private static final long serialVersionUID = 3109278696501988780L; + + /** Max number of pulled from queue keys for clean up upon state touch for any key. */ + private final int cleanupSize; + + /** Whether to run incremental cleanup per state access. */ + private final boolean runCleanupForEveryRecord; Review comment: So this should maybe called `runCleanupOnStateAccess`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r247837166 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -269,5 +300,37 @@ public StateTtlConfig build() { public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } + + public IncrementalCleanupStrategy getIncrementalCleanupStrategy() { + return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP); + } + } + + /** Configuration of cleanup strategy while taking the full snapshot. */ + public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy { + private static final long serialVersionUID = 3109278696501988780L; + + /** Max number of pulled from queue keys for clean up upon state touch for any key. */ Review comment: I think there is a grammar problem in the comment, maximum of what? So maybe max. number of keys pulled from... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246361402 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -269,5 +300,34 @@ public StateTtlConfig build() { public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } + + public IncrementalCleanupStrategy getIncrementalCleanupStrategy() { + return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP); + } + } + + /** Configuration of cleanup strategy while taking the full snapshot. */ + public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy { + private static final long serialVersionUID = 3109278696501988780L; + + private final int cleanupSize; Review comment: I would suggest to add a comment about the meaning of the fields here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246774009 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.ConcurrentModificationException; + +/** + * Incremental cleanup of state with TTL. + * + * @param type of state key + * @param type of state namespace + */ +class TtlIncrementalCleanup { + private static final Logger LOG = LoggerFactory.getLogger(TtlIncrementalCleanup.class); + + @Nonnegative + private final int cleanupSize; + + @Nonnull + private final KeyedStateBackend keyContext; + + private AbstractTtlState ttlState; + private CloseableIterator> keyNamespaceIterator; + private boolean suppressCallback; + + /** +* TtlIncrementalCleanup constructor. +* +* @param cleanupSize max number of queued keys to incrementally cleanup upon state access +* @param keyContext state backend to switch keys for cleanup +*/ + TtlIncrementalCleanup(@Nonnegative int cleanupSize, @Nonnull KeyedStateBackend keyContext) { + this.cleanupSize = cleanupSize; + this.keyContext = keyContext; + this.suppressCallback = false; + } + + void stateAccessed() { + // key is changed during cleanup, but this should not be called recursively in this case + if (suppressCallback) { + return; + } + suppressCallback = true; + initIteratorIfNot(); + K currentKey = keyContext.getCurrentKey(); + N currentNamespace = ttlState.getCurrentNamespace(); + try { + runCleanup(); + } catch (ConcurrentModificationException e) { Review comment: Do we want to catch this exception? I think the iterator has to tolerate concurrent modification or else this will basically happen all the time, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246775496 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -175,9 +173,41 @@ private IS createReducingState() throws Exception { ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); - return (IS) new TtlFoldingState<>( - originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), - ttlConfig, timeProvider, stateDesc.getSerializer()); + return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor)); + } + + @SuppressWarnings("unchecked") + private TTLCON createTtlStateContext(TTLDES ttlDescriptor) throws Exception { Review comment: Why are `TTLCON` and `TTLDES` generic types, it seems a bit odd. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246428260 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ## @@ -106,4 +106,23 @@ @Override void dispose(); + + /** State backend will call {@link KeyChangeListener#keyChanged} when key context changes if supported. */ + default void registerKeyChangeListener(KeyChangeListener listener) { Review comment: Again, I would suggest not to use `default` methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246419067 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ## @@ -1103,4 +,82 @@ public void remove() { throw new UnsupportedOperationException("Read-only iterator"); } } + + @Override + public CloseableIterator> getNamespaceKeyIterator() { + return CloseableIterator.adapterForIterator(new NamespaceKeyIterator()); + } + + /** +* Iterator over (namespace, key) pairs in a {@link CopyOnWriteStateTable}. +* +* This iterator is lazy and has relaxed consistency. +* It outputs (namespace, key) pairs which existed at some point in time and there can be duplicates. +* It tolerates concurrent modifications and +* supports removing of underlying next key state if it currently exists otherwise nothing is done. +*/ + class NamespaceKeyIterator implements Iterator> { Review comment: I wonder if we could have this based on the implementation of the existing entry iterator, maybe by extracting a common super class. Is there anything fundamental speaking against that, because it looks like they could share a lot of code if we move from `Tuple2` to `StateEntry` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246774607 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.ConcurrentModificationException; + +/** + * Incremental cleanup of state with TTL. + * + * @param type of state key + * @param type of state namespace + */ +class TtlIncrementalCleanup { + private static final Logger LOG = LoggerFactory.getLogger(TtlIncrementalCleanup.class); + + @Nonnegative + private final int cleanupSize; + + @Nonnull + private final KeyedStateBackend keyContext; + + private AbstractTtlState ttlState; + private CloseableIterator> keyNamespaceIterator; + private boolean suppressCallback; Review comment: Please add some comments, I would suggest at least on `suppressCallback` and `cleanupSize` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246785880 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -125,7 +125,7 @@ public void close() { private boolean closed; private RocksDBNativeMetricView( - @Nonnull ColumnFamilyHandle handle, + ColumnFamilyHandle handle, Review comment: Same here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246427849 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ## @@ -147,10 +153,25 @@ public void dispose() { */ @Override public void setCurrentKey(K newKey) { + notifyKeyChanged(newKey); this.currentKey = newKey; this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups); } + private void notifyKeyChanged(K newKey) { Review comment: Strictly speaking, this is more like `notifyKeySet()`, because the key might be still the same object after the call. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246379055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ## @@ -166,6 +168,8 @@ public boolean isEmpty() { public abstract Stream getKeys(N namespace); + public abstract CloseableIterator> getNamespaceKeyIterator(); Review comment: I would suggest to decide for either `Stream` or `Iterator` here, the methods look very similar. The methods also have very different semantics : `getKeys` is failing on concurrent modifications, `getNamespaceKeyIterator` tolerates and embraces concurrent modifications. Furthermore I would suggest to use `StateEntry` over `Tuple2` so that we can stream objects directly from the Copy-on-write state table (the old state table implementation still would have to return new objects, but that seems ok) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246422514 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java ## @@ -105,4 +114,8 @@ final TypeSerializer safeKeySerializer, final TypeSerializer safeNamespaceSerializer, final TypeSerializer safeValueSerializer) throws Exception; + + default CloseableIterator> getNamespaceKeyIterator() { Review comment: I would suggest to remove the `default` with implementations in `AbstractHeapState` and `AbstractRocksDBState`, the second one could throw `UnsupportedOperation` for now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246771871 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java ## @@ -38,10 +37,12 @@ extends AbstractTtlDecorator implements InternalKvState { private final TypeSerializer valueSerializer; + final Runnable accessCallback; Review comment: I suggest a comment because it is not obvious for a reader of this class what this is good for. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246378044 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.ConcurrentModificationException; + +/** + * Incremental cleanup of state with TTL. + * + * @param type of state key + * @param type of state namespace + */ +class TtlIncrementalCleanup { + private static final Logger LOG = LoggerFactory.getLogger(TtlIncrementalCleanup.class); + + @Nonnegative + private final int cleanupSize; + + @Nonnull + private final KeyedStateBackend keyContext; + + private AbstractTtlState ttlState; + private CloseableIterator> keyNamespaceIterator; Review comment: It seems a bit strange that we need the this iterator to enumerate entries and delete them, as well as the `AbstractTtlState` to check the entry for expiration. This seems a bit strange and also would force us to go through a whole ser/de cycle if we also want to offer this cleanup strategy for RocksDB. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246785747 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -54,7 +54,7 @@ private RocksDB rocksDB; RocksDBNativeMetricMonitor( - @Nonnull RocksDB rocksDB, + RocksDB rocksDB, Review comment: Why is this change required, looks problematic to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend URL: https://github.com/apache/flink/pull/7188#discussion_r246772202 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +class TtlStateContext { + /** Wrapped original state handler. */ + final T original; + final StateTtlConfig config; + final TtlTimeProvider timeProvider; + final TypeSerializer valueSerializer; + final Runnable accessCallback; Review comment: Please add some comments on the fields, in particular this one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services