[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-16 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r248240356
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##
 @@ -1032,75 +1040,157 @@ public int sizeOfNamespace(Object namespace) {
 
// StateEntryIterator  
-
 
+   @Override
+   public StateIteratorWithUpdate 
getStateEntryIteratorWithUpdate() {
+   return new StateEntryIteratorWithModifications();
+   }
+
/**
-* Iterator over the entries in a {@link CopyOnWriteStateTable}.
+* Iterator over state entries in a {@link CopyOnWriteStateTable}.
 */
-   class StateEntryIterator implements Iterator> {
-   private StateTableEntry[] activeTable;
-   private int nextTablePosition;
-   private StateTableEntry nextEntry;
-   private int expectedModCount;
+   class BaseStateEntryIterator implements Iterator> {
+   StateTableEntry[] activeTable;
+   int nextTablePosition;
+   StateTableEntry nextEntry;
+   StateTableEntry entryToReturn;
 
-   StateEntryIterator() {
+   BaseStateEntryIterator() {
this.activeTable = primaryTable;
this.nextTablePosition = 0;
-   this.expectedModCount = modCount;
this.nextEntry = getBootstrapEntry();
advanceIterator();
}
 
-   private StateTableEntry advanceIterator() {
-
-   StateTableEntry entryToReturn = nextEntry;
-   StateTableEntry next = entryToReturn.next;
-
-   // consider both sub-tables tables to cover the case of 
rehash
-   while (next == null) {
-
-   StateTableEntry[] tab = activeTable;
+   @Override
+   public boolean hasNext() {
+   return nextEntry != null;
+   }
 
-   while (nextTablePosition < tab.length) {
-   next = tab[nextTablePosition++];
+   @Override
+   public StateEntry next() {
+   if (!hasNext()) {
 
 Review comment:
   In the base class, this looks like it can be dropped, so maybe that should 
just go into the overriden next method of the new iterator before a 
`super.next()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-16 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r248240356
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##
 @@ -1032,75 +1040,157 @@ public int sizeOfNamespace(Object namespace) {
 
// StateEntryIterator  
-
 
+   @Override
+   public StateIteratorWithUpdate 
getStateEntryIteratorWithUpdate() {
+   return new StateEntryIteratorWithModifications();
+   }
+
/**
-* Iterator over the entries in a {@link CopyOnWriteStateTable}.
+* Iterator over state entries in a {@link CopyOnWriteStateTable}.
 */
-   class StateEntryIterator implements Iterator> {
-   private StateTableEntry[] activeTable;
-   private int nextTablePosition;
-   private StateTableEntry nextEntry;
-   private int expectedModCount;
+   class BaseStateEntryIterator implements Iterator> {
+   StateTableEntry[] activeTable;
+   int nextTablePosition;
+   StateTableEntry nextEntry;
+   StateTableEntry entryToReturn;
 
-   StateEntryIterator() {
+   BaseStateEntryIterator() {
this.activeTable = primaryTable;
this.nextTablePosition = 0;
-   this.expectedModCount = modCount;
this.nextEntry = getBootstrapEntry();
advanceIterator();
}
 
-   private StateTableEntry advanceIterator() {
-
-   StateTableEntry entryToReturn = nextEntry;
-   StateTableEntry next = entryToReturn.next;
-
-   // consider both sub-tables tables to cover the case of 
rehash
-   while (next == null) {
-
-   StateTableEntry[] tab = activeTable;
+   @Override
+   public boolean hasNext() {
+   return nextEntry != null;
+   }
 
-   while (nextTablePosition < tab.length) {
-   next = tab[nextTablePosition++];
+   @Override
+   public StateEntry next() {
+   if (!hasNext()) {
 
 Review comment:
   In the base class, this looks like it can be dropped, so maybe that should 
just go into the overriden next method of the new iterator before a 
`super.next()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-16 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r248206342
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##
 @@ -1139,43 +1112,84 @@ private void amendIterationIfNextStale() {
}
return null;
}
+   }
+
+   /**
+* Iterator over state entries in a {@link CopyOnWriteStateTable} which 
does not tolerate concurrent modifications.
+*/
+   class StateEntryIteratorWithoutModifications extends 
BaseStateEntryIterator {
+   private final int expectedModCount;
+
+   StateEntryIteratorWithoutModifications() {
+   super();
+   this.expectedModCount = modCount;
+   }
+
+   @Override
+   public StateEntry next() {
+   assertConcurrentModificationsIfNotAllowed();
 
 Review comment:
   You can drop the `IfNotAllowed` part from the name now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-15 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r247852062
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
 ##
 @@ -82,5 +88,19 @@ public void setCurrentNamespace(N namespace) {
@Override
public void clear() {
original.clear();
+   accessCallback.run();
+   }
+
+   /**
+* Check if state has expired or not and update it if it has partially 
expired.
+*
+* @return either non expired (possibly updated) state or null if the 
state has expired.
+*/
+   @Nullable
+   public abstract TTLSV checkIfExpiredOrUpdate(@Nonnull TTLSV ttlValue);
 
 Review comment:
   This method name sounds still like it should return a boolean as it did in 
the beginning, but now it does more so I would consider adjusting the name as 
well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-15 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r247843856
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##
 @@ -1032,75 +1040,143 @@ public int sizeOfNamespace(Object namespace) {
 
// StateEntryIterator  
-
 
+   @Override
+   public StateIteratorWithUpdate getStateEntryIterator() {
+   return new StateEntryIterator(true);
+   }
+
/**
-* Iterator over the entries in a {@link CopyOnWriteStateTable}.
+* Iterator over state entries in a {@link CopyOnWriteStateTable}.
+*
+* This iterator can optionally tolerate concurrent modifications and
+* support removing of underlying next key state.
 */
-   class StateEntryIterator implements Iterator> {
+   class StateEntryIterator implements StateIteratorWithUpdate {
 
 Review comment:
   I would suggest not to mix two iterators into one implementation and deceide 
behaviour on a boolean flag. Better have a common superclass and replace the 
boolean flag by polymorphic methods. Then you can also only have the subclass 
"with update" implement the full `StateIteratorWithUpdate` whereas the other 
can still simply work as `Iterator` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-15 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r247837409
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -269,5 +300,37 @@ public StateTtlConfig build() {
public boolean inFullSnapshot() {
return 
strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
+
+   public IncrementalCleanupStrategy 
getIncrementalCleanupStrategy() {
+   return (IncrementalCleanupStrategy) 
strategies.get(Strategies.INCREMENTAL_CLEANUP);
+   }
+   }
+
+   /** Configuration of cleanup strategy while taking the full snapshot.  
*/
+   public static class IncrementalCleanupStrategy implements 
CleanupStrategies.CleanupStrategy {
+   private static final long serialVersionUID = 
3109278696501988780L;
+
+   /** Max number of pulled from queue keys for clean up upon 
state touch for any key. */
+   private final int cleanupSize;
+
+   /** Whether to run incremental cleanup per state access. */
+   private final boolean runCleanupForEveryRecord;
 
 Review comment:
   So this should maybe called `runCleanupOnStateAccess`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-15 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r247837166
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -269,5 +300,37 @@ public StateTtlConfig build() {
public boolean inFullSnapshot() {
return 
strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
+
+   public IncrementalCleanupStrategy 
getIncrementalCleanupStrategy() {
+   return (IncrementalCleanupStrategy) 
strategies.get(Strategies.INCREMENTAL_CLEANUP);
+   }
+   }
+
+   /** Configuration of cleanup strategy while taking the full snapshot.  
*/
+   public static class IncrementalCleanupStrategy implements 
CleanupStrategies.CleanupStrategy {
+   private static final long serialVersionUID = 
3109278696501988780L;
+
+   /** Max number of pulled from queue keys for clean up upon 
state touch for any key. */
 
 Review comment:
   I think there is a grammar problem in the comment, maximum of what? So maybe 
max. number of keys pulled from...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246361402
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -269,5 +300,34 @@ public StateTtlConfig build() {
public boolean inFullSnapshot() {
return 
strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
+
+   public IncrementalCleanupStrategy 
getIncrementalCleanupStrategy() {
+   return (IncrementalCleanupStrategy) 
strategies.get(Strategies.INCREMENTAL_CLEANUP);
+   }
+   }
+
+   /** Configuration of cleanup strategy while taking the full snapshot.  
*/
+   public static class IncrementalCleanupStrategy implements 
CleanupStrategies.CleanupStrategy {
+   private static final long serialVersionUID = 
3109278696501988780L;
+
+   private final int cleanupSize;
 
 Review comment:
   I would suggest to add a comment about the meaning of the fields here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246774009
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * Incremental cleanup of state with TTL.
+ *
+ * @param  type of state key
+ * @param  type of state namespace
+ */
+class TtlIncrementalCleanup {
+   private static final Logger LOG = 
LoggerFactory.getLogger(TtlIncrementalCleanup.class);
+
+   @Nonnegative
+   private final int cleanupSize;
+
+   @Nonnull
+   private final KeyedStateBackend keyContext;
+
+   private AbstractTtlState ttlState;
+   private CloseableIterator> keyNamespaceIterator;
+   private boolean suppressCallback;
+
+   /**
+* TtlIncrementalCleanup constructor.
+*
+* @param cleanupSize max number of queued keys to incrementally 
cleanup upon state access
+* @param keyContext state backend to switch keys for cleanup
+*/
+   TtlIncrementalCleanup(@Nonnegative int cleanupSize, @Nonnull 
KeyedStateBackend keyContext) {
+   this.cleanupSize = cleanupSize;
+   this.keyContext = keyContext;
+   this.suppressCallback = false;
+   }
+
+   void stateAccessed() {
+   // key is changed during cleanup, but this should not be called 
recursively in this case
+   if (suppressCallback) {
+   return;
+   }
+   suppressCallback = true;
+   initIteratorIfNot();
+   K currentKey = keyContext.getCurrentKey();
+   N currentNamespace = ttlState.getCurrentNamespace();
+   try {
+   runCleanup();
+   } catch (ConcurrentModificationException e) {
 
 Review comment:
   Do we want to catch this exception? I think the iterator has to tolerate 
concurrent modification or else this will basically happen all the time, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246775496
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -175,9 +173,41 @@ private IS createReducingState() throws Exception {
ttlInitAcc,
new 
TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, 
timeProvider, initAcc),
new TtlSerializer<>(stateDesc.getSerializer()));
-   return (IS) new TtlFoldingState<>(
-   
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, 
getSnapshotTransformFactory()),
-   ttlConfig, timeProvider, stateDesc.getSerializer());
+   return (IS) new 
TtlFoldingState<>(createTtlStateContext(ttlDescriptor));
+   }
+
+   @SuppressWarnings("unchecked")
+   private  TTLCON createTtlStateContext(TTLDES 
ttlDescriptor) throws Exception {
 
 Review comment:
   Why are `TTLCON` and `TTLDES` generic types, it seems a bit odd.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246428260
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ##
 @@ -106,4 +106,23 @@
 
@Override
void dispose();
+
+   /** State backend will call {@link KeyChangeListener#keyChanged} when 
key context changes if supported. */
+   default void registerKeyChangeListener(KeyChangeListener listener) {
 
 Review comment:
   Again, I would suggest not to use `default` methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246419067
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##
 @@ -1103,4 +,82 @@ public void remove() {
throw new UnsupportedOperationException("Read-only 
iterator");
}
}
+
+   @Override
+   public CloseableIterator> getNamespaceKeyIterator() {
+   return CloseableIterator.adapterForIterator(new 
NamespaceKeyIterator());
+   }
+
+   /**
+* Iterator over (namespace, key) pairs in a {@link 
CopyOnWriteStateTable}.
+*
+* This iterator is lazy and has relaxed consistency.
+* It outputs (namespace, key) pairs which existed at some point in 
time and there can be duplicates.
+* It tolerates concurrent modifications and
+* supports removing of underlying next key state if it currently 
exists otherwise nothing is done.
+*/
+   class NamespaceKeyIterator implements Iterator> {
 
 Review comment:
   I wonder if we could have this based on the implementation of the existing 
entry iterator, maybe by extracting a common super class. Is there anything 
fundamental speaking against that, because it looks like they could share a lot 
of code if we move from `Tuple2` to `StateEntry`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246774607
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * Incremental cleanup of state with TTL.
+ *
+ * @param  type of state key
+ * @param  type of state namespace
+ */
+class TtlIncrementalCleanup {
+   private static final Logger LOG = 
LoggerFactory.getLogger(TtlIncrementalCleanup.class);
+
+   @Nonnegative
+   private final int cleanupSize;
+
+   @Nonnull
+   private final KeyedStateBackend keyContext;
+
+   private AbstractTtlState ttlState;
+   private CloseableIterator> keyNamespaceIterator;
+   private boolean suppressCallback;
 
 Review comment:
   Please add some comments, I would suggest at least on `suppressCallback` and 
`cleanupSize`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246785880
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -125,7 +125,7 @@ public void close() {
private boolean closed;
 
private RocksDBNativeMetricView(
-   @Nonnull ColumnFamilyHandle handle,
+   ColumnFamilyHandle handle,
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246427849
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ##
 @@ -147,10 +153,25 @@ public void dispose() {
 */
@Override
public void setCurrentKey(K newKey) {
+   notifyKeyChanged(newKey);
this.currentKey = newKey;
this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
}
 
+   private void notifyKeyChanged(K newKey) {
 
 Review comment:
   Strictly speaking, this is more like `notifyKeySet()`, because the key might 
be still the same object after the call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246379055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##
 @@ -166,6 +168,8 @@ public boolean isEmpty() {
 
public abstract Stream getKeys(N namespace);
 
+   public abstract CloseableIterator> 
getNamespaceKeyIterator();
 
 Review comment:
   I would suggest to decide for either `Stream` or `Iterator` here, the 
methods look very similar. The methods also have very different semantics : 
`getKeys` is failing on concurrent modifications, `getNamespaceKeyIterator` 
tolerates and embraces concurrent modifications.
   
   Furthermore I would suggest to use `StateEntry` over `Tuple2` so that we can 
stream objects directly from the Copy-on-write state table (the old state table 
implementation still would have to return new objects, but that seems ok)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246422514
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
 ##
 @@ -105,4 +114,8 @@
final TypeSerializer safeKeySerializer,
final TypeSerializer safeNamespaceSerializer,
final TypeSerializer safeValueSerializer) throws 
Exception;
+
+   default CloseableIterator> getNamespaceKeyIterator() {
 
 Review comment:
   I would suggest to remove the `default` with implementations in 
`AbstractHeapState` and `AbstractRocksDBState`, the second one could throw 
`UnsupportedOperation` for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246771871
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
 ##
 @@ -38,10 +37,12 @@
extends AbstractTtlDecorator
implements InternalKvState {
private final TypeSerializer valueSerializer;
+   final Runnable accessCallback;
 
 Review comment:
   I suggest a comment because it is not obvious for a reader of this class 
what this is good for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246378044
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlIncrementalCleanup.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * Incremental cleanup of state with TTL.
+ *
+ * @param  type of state key
+ * @param  type of state namespace
+ */
+class TtlIncrementalCleanup {
+   private static final Logger LOG = 
LoggerFactory.getLogger(TtlIncrementalCleanup.class);
+
+   @Nonnegative
+   private final int cleanupSize;
+
+   @Nonnull
+   private final KeyedStateBackend keyContext;
+
+   private AbstractTtlState ttlState;
+   private CloseableIterator> keyNamespaceIterator;
 
 Review comment:
   It seems a bit strange that we need the this iterator to enumerate entries 
and delete them, as well as the `AbstractTtlState` to check the entry for 
expiration. This seems a bit strange and also would force us to go through a 
whole ser/de cycle if we also want to offer this cleanup strategy for RocksDB.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246785747
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -54,7 +54,7 @@
private RocksDB rocksDB;
 
RocksDBNativeMetricMonitor(
-   @Nonnull RocksDB rocksDB,
+   RocksDB rocksDB,
 
 Review comment:
   Why is this change required, looks problematic to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State TTL] TTL state incremental cleanup for heap backend

2019-01-10 Thread GitBox
StefanRRichter commented on a change in pull request #7188: [FLINK-10473][State 
TTL] TTL state incremental cleanup for heap backend
URL: https://github.com/apache/flink/pull/7188#discussion_r246772202
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+class TtlStateContext {
+   /** Wrapped original state handler. */
+   final T original;
+   final StateTtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final TypeSerializer valueSerializer;
+   final Runnable accessCallback;
 
 Review comment:
   Please add some comments on the fields, in particular this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services