Repository: flink
Updated Branches:
  refs/heads/master 5f08e5359 -> 14c1941d8


[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14c1941d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14c1941d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14c1941d

Branch: refs/heads/master
Commit: 14c1941d8eaa583eb8f7eeb5478e605850c0d355
Parents: 5f08e53
Author: kl0u <kklou...@gmail.com>
Authored: Wed Mar 8 20:18:18 2017 +0100
Committer: kl0u <kklou...@gmail.com>
Committed: Mon Mar 13 17:29:03 2017 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   6 +
 .../operators/InternalTimeServiceManager.java   |   5 +
 .../InternalWatermarkCallbackService.java       | 112 +++++++++++++-----
 .../operators/AbstractStreamOperatorTest.java   | 117 +++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java |   9 ++
 5 files changed, 222 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6e6b147..ef23be9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -957,4 +957,10 @@ public abstract class AbstractStreamOperator<OUT>
                return timeServiceManager == null ? 0 :
                        timeServiceManager.numEventTimeTimers();
        }
+
+       @VisibleForTesting
+       public int numKeysForWatermarkCallback() {
+               return timeServiceManager == null ? 0 :
+                       timeServiceManager.numKeysForWatermarkCallback();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 71ffbd2..0b60232 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -188,4 +188,9 @@ class InternalTimeServiceManager<K, N> {
                }
                return count;
        }
+
+       @VisibleForTesting
+       public int numKeysForWatermarkCallback() {
+               return watermarkCallbackService.numKeysForWatermarkCallback();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
index a4263e4..9a43853 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -38,8 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * The watermark callback service allows to register a {@link 
OnWatermarkCallback OnWatermarkCallback}
  * and multiple keys, for which the callback will be invoked every time a new 
{@link Watermark} is received
  * (after the registration of the key).
- * <p>
- * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
+ *
+ * <p><b>NOTE: </b> This service is only available to <b>keyed</b> operators.
  *
  *  @param <K> The type of key returned by the {@code KeySelector}.
  */
@@ -58,7 +59,17 @@ public class InternalWatermarkCallbackService<K> {
         * An array of sets of keys keeping the registered keys split
         * by the key-group they belong to. Each key-group has one set.
         */
-       private final Set<K>[] keysByKeygroup;
+       private final Set<K>[] registeredKeysByKeyGroup;
+
+       /**
+        * An array of sets of keys keeping the keys "to delete" split
+        * by the key-group they belong to. Each key-group has one set.
+        *
+        * <p>This is used to avoid potential concurrent modification
+        * exception when deleting keys from inside the
+        * {@link #invokeOnWatermarkCallback(Watermark)}.
+        */
+       private final Set<K>[] deletedKeysByKeyGroup;
 
        /** A serializer for the registered keys. */
        private TypeSerializer<K> keySerializer;
@@ -84,7 +95,8 @@ public class InternalWatermarkCallbackService<K> {
 
                // the list of ids of the key-groups this task is responsible 
for
                int localKeyGroups = 
this.localKeyGroupRange.getNumberOfKeyGroups();
-               this.keysByKeygroup = new Set[localKeyGroups];
+               this.registeredKeysByKeyGroup = new Set[localKeyGroups];
+               this.deletedKeysByKeyGroup = new Set[localKeyGroups];
        }
 
        /**
@@ -110,7 +122,7 @@ public class InternalWatermarkCallbackService<K> {
         * @param key The key to be registered.
         */
        public boolean registerKeyForWatermarkCallback(K key) {
-               return getKeySetForKeyGroup(key).add(key);
+               return getRegisteredKeysForKeyGroup(key).add(key);
        }
 
        /**
@@ -119,13 +131,7 @@ public class InternalWatermarkCallbackService<K> {
         * @param key The key to be unregistered.
         */
        public boolean unregisterKeyFromWatermarkCallback(K key) {
-               Set<K> keys = getKeySetForKeyGroup(key);
-               boolean res = keys.remove(key);
-
-               if (keys.isEmpty()) {
-                       removeKeySetForKey(key);
-               }
-               return res;
+               return getDeletedKeysForKeyGroup(key).add(key);
        }
 
        /**
@@ -134,8 +140,11 @@ public class InternalWatermarkCallbackService<K> {
         * @param watermark The watermark that triggered the invocation.
         */
        public void invokeOnWatermarkCallback(Watermark watermark) throws 
IOException {
+               // clean up any keys registered for deletion before calling the 
callback
+               cleanupRegisteredKeys();
+
                if (callback != null) {
-                       for (Set<K> keySet : keysByKeygroup) {
+                       for (Set<K> keySet : registeredKeysByKeyGroup) {
                                if (keySet != null) {
                                        for (K key : keySet) {
                                                keyContext.setCurrentKey(key);
@@ -147,15 +156,38 @@ public class InternalWatermarkCallbackService<K> {
        }
 
        /**
+        * Does the actual deletion of any keys registered for deletion using 
the
+        * {@link #unregisterKeyFromWatermarkCallback(Object)}.
+        */
+       private void cleanupRegisteredKeys() {
+               for (int keyGroupIdx = 0; keyGroupIdx < 
registeredKeysByKeyGroup.length; keyGroupIdx++) {
+
+                       Set<K> deletedKeys = deletedKeysByKeyGroup[keyGroupIdx];
+                       if (deletedKeys != null) {
+
+                               Set<K> registeredKeys = 
registeredKeysByKeyGroup[keyGroupIdx];
+                               if (registeredKeys != null) {
+
+                                       registeredKeys.removeAll(deletedKeys);
+                                       if (registeredKeys.isEmpty()) {
+                                               
registeredKeysByKeyGroup[keyGroupIdx] = null;
+                                       }
+                               }
+                               deletedKeysByKeyGroup[keyGroupIdx] = null;
+                       }
+               }
+       }
+
+       /**
         * Retrieve the set of keys for the key-group this key belongs to.
         *
         * @param key the key whose key-group we are searching.
         * @return the set of registered keys for the key-group.
         */
-       private Set<K> getKeySetForKeyGroup(K key) {
+       private Set<K> getRegisteredKeysForKeyGroup(K key) {
                checkArgument(localKeyGroupRange != null, "The operator has not 
been initialized.");
                int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, 
totalKeyGroups);
-               return getKeySetForKeyGroup(keyGroupIdx);
+               return getRegisteredKeysForKeyGroup(keyGroupIdx);
        }
 
        /**
@@ -164,27 +196,36 @@ public class InternalWatermarkCallbackService<K> {
         * @param keyGroupIdx the index of the key group we are interested in.
         * @return the set of keys for the key-group.
         */
-       private Set<K> getKeySetForKeyGroup(int keyGroupIdx) {
+       private Set<K> getRegisteredKeysForKeyGroup(int keyGroupIdx) {
                int localIdx = getIndexForKeyGroup(keyGroupIdx);
-               Set<K> keys = keysByKeygroup[localIdx];
+               Set<K> keys = registeredKeysByKeyGroup[localIdx];
                if (keys == null) {
                        keys = new HashSet<>();
-                       keysByKeygroup[localIdx] = keys;
+                       registeredKeysByKeyGroup[localIdx] = keys;
                }
                return keys;
        }
 
-       private void removeKeySetForKey(K key) {
+       private Set<K> getDeletedKeysForKeyGroup(K key) {
                checkArgument(localKeyGroupRange != null, "The operator has not 
been initialized.");
                int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, 
totalKeyGroups);
-               int localKeyGroupIdx = getIndexForKeyGroup(keyGroupIdx);
-               keysByKeygroup[localKeyGroupIdx] = null;
+               return getDeletedKeysForKeyGroup(keyGroupIdx);
+       }
+
+       private Set<K> getDeletedKeysForKeyGroup(int keyGroupIdx) {
+               int localIdx = getIndexForKeyGroup(keyGroupIdx);
+               Set<K> keys = deletedKeysByKeyGroup[localIdx];
+               if (keys == null) {
+                       keys = new HashSet<>();
+                       deletedKeysByKeyGroup[localIdx] = keys;
+               }
+               return keys;
        }
 
        /**
         * Computes the index of the requested key-group in the local 
datastructures.
-        * <li/>
-        * Currently we assume that each task is assigned a continuous range of 
key-groups,
+        *
+        * <p>Currently we assume that each task is assigned a continuous range 
of key-groups,
         * e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different 
states
         * key-grouped in arrays instead of maps, where the offset for each 
key-group is
         * the key-group id (an int) minus the id of the first key-group in the 
local range.
@@ -199,7 +240,11 @@ public class InternalWatermarkCallbackService<K> {
        //////////////////                              Fault Tolerance Methods 
                        ///////////////////
 
        public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, 
int keyGroupIdx) throws Exception {
-               Set<K> keySet = getKeySetForKeyGroup(keyGroupIdx);
+
+               // we cleanup also here to avoid checkpointing the deletion set
+               cleanupRegisteredKeys();
+
+               Set<K> keySet = getRegisteredKeysForKeyGroup(keyGroupIdx);
                if (keySet != null) {
                        stream.writeInt(keySet.size());
 
@@ -224,16 +269,29 @@ public class InternalWatermarkCallbackService<K> {
                        TypeSerializer<K> tmpKeyDeserializer = 
InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
 
                        if (keySerializer != null && 
!keySerializer.equals(tmpKeyDeserializer)) {
-                               throw new IllegalArgumentException("Tried to 
restore timers " +
-                                       "for the same service with different 
serializers.");
+                               throw new IllegalArgumentException("Tried to 
restore keys " +
+                                       "for the watermark callback service 
with mismatching serializers.");
                        }
 
                        this.keySerializer = tmpKeyDeserializer;
 
-                       Set<K> keys = getKeySetForKeyGroup(keyGroupIdx);
+                       Set<K> keys = getRegisteredKeysForKeyGroup(keyGroupIdx);
                        for (int i = 0; i < numKeys; i++) {
                                keys.add(keySerializer.deserialize(stream));
                        }
                }
        }
+
+       //////////////////                              Testing Methods         
                ///////////////////
+
+       @VisibleForTesting
+       public int numKeysForWatermarkCallback() {
+               int count = 0;
+               for (Set<K> keyGroup: registeredKeysByKeyGroup) {
+                       if (keyGroup != null) {
+                               count += keyGroup.size();
+                       }
+               }
+               return count;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 33def9e..eeee8dc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -788,6 +788,78 @@ public class AbstractStreamOperatorTest {
                testHarness5.close();
        }
 
+       @Test
+       public void testWatermarkCallbackServiceKeyDeletion() throws Exception {
+               final int MAX_PARALLELISM = 10;
+
+               Tuple2<Integer, String> element1 = new Tuple2<>(7, "start");
+               Tuple2<Integer, String> element2 = new Tuple2<>(45, "start");
+               Tuple2<Integer, String> element3 = new Tuple2<>(90, "start");
+
+               TestOperatorWithDeletingKeyCallback op = new 
TestOperatorWithDeletingKeyCallback(45);
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, Integer> testHarness1 =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               op,
+                               new TestKeySelector(),
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               MAX_PARALLELISM,
+                               1,
+                               0);
+               testHarness1.open();
+
+               testHarness1.processElement(new StreamRecord<>(element1));
+               testHarness1.processElement(new StreamRecord<>(element2));
+
+               testHarness1.processWatermark(10L);
+
+               assertEquals(3L, testHarness1.getOutput().size());
+               verifyElement(testHarness1.getOutput().poll(), 7);
+               verifyElement(testHarness1.getOutput().poll(), 45);
+               verifyWatermark(testHarness1.getOutput().poll(), 10);
+
+               testHarness1.processElement(new StreamRecord<>(element3));
+               testHarness1.processWatermark(20L);
+
+               // because at the first watermark the operator removed key 45
+               assertEquals(3L, testHarness1.getOutput().size());
+               verifyElement(testHarness1.getOutput().poll(), 7);
+               verifyElement(testHarness1.getOutput().poll(), 90);
+               verifyWatermark(testHarness1.getOutput().poll(), 20);
+
+               testHarness1.processWatermark(25L);
+
+               verifyElement(testHarness1.getOutput().poll(), 7);
+               verifyElement(testHarness1.getOutput().poll(), 90);
+               verifyWatermark(testHarness1.getOutput().poll(), 25);
+
+               // unregister key and then fail
+               op.unregisterKey(90);
+
+               // take a snapshot with some elements in internal sorting queue
+               OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+               testHarness1.close();
+
+               testHarness1 = new KeyedOneInputStreamOperatorTestHarness<>(
+                               new TestOperatorWithDeletingKeyCallback(45),
+                               new TestKeySelector(),
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               MAX_PARALLELISM,
+                               1,
+                               0);
+               testHarness1.setup();
+               testHarness1.initializeState(snapshot);
+               testHarness1.open();
+
+               testHarness1.processWatermark(30L);
+
+               assertEquals(2L, testHarness1.getOutput().size());
+               verifyElement(testHarness1.getOutput().poll(), 7);
+               verifyWatermark(testHarness1.getOutput().poll(), 30);
+
+               testHarness1.close();
+       }
+
        private KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, Integer> getTestHarness(
                        int maxParallelism, int noOfTasks, int taskIdx) throws 
Exception {
 
@@ -868,6 +940,51 @@ public class AbstractStreamOperatorTest {
                }
        }
 
+       private static class TestOperatorWithDeletingKeyCallback
+                       extends AbstractStreamOperator<Integer>
+                       implements OneInputStreamOperator<Tuple2<Integer, 
String>, Integer> {
+
+               private static final long serialVersionUID = 
9215057823264582305L;
+
+               private final int keyToDelete;
+
+               public TestOperatorWithDeletingKeyCallback(int keyToDelete) {
+                       this.keyToDelete = keyToDelete;
+               }
+
+               @Override
+               public void open() throws Exception {
+                       super.open();
+
+                       InternalWatermarkCallbackService<Integer> 
callbackService = getInternalWatermarkCallbackService();
+
+                       callbackService.setWatermarkCallback(new 
OnWatermarkCallback<Integer>() {
+
+                               @Override
+                               public void onWatermark(Integer integer, 
Watermark watermark) throws IOException {
+
+                                       // this is to simulate the case where 
we may have a concurrent modification
+                                       // exception as we iterate over the 
list of registered keys and we concurrently
+                                       // delete the key.
+
+                                       if (integer.equals(keyToDelete)) {
+                                               
getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(integer);
+                                       }
+                                       output.collect(new 
StreamRecord<>(integer));
+                               }
+                       }, IntSerializer.INSTANCE);
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<Integer, 
String>> element) throws Exception {
+                       
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(element.getValue().f0);
+               }
+
+               public void unregisterKey(int key) {
+                       
getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(key);
+               }
+       }
+
        /**
         * Testing operator that can respond to commands by either 
setting/deleting state, emitting
         * state or setting timers.

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index c984eed..f0a4c42 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -597,6 +597,15 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                }
        }
 
+       @VisibleForTesting
+       public int numKeysForWatermarkCallback() {
+               if (operator instanceof AbstractStreamOperator) {
+                       return ((AbstractStreamOperator) 
operator).numKeysForWatermarkCallback();
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
        private class MockOutput implements Output<StreamRecord<OUT>> {
 
                private TypeSerializer<OUT> outputSerializer;

Reply via email to