Repository: flink Updated Branches: refs/heads/master 5f08e5359 -> 14c1941d8
[FLINK-6007] Allow key removal from within the watermark callback. When deleting a key from the InternalWatermarkCallbackService, the deleted key is put into a separate set, and the actual deletion happens after the iteration over all keys has finished. To avoid checkpointing the deletion set, the actual cleanup also happens upon checkpointing. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14c1941d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14c1941d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14c1941d Branch: refs/heads/master Commit: 14c1941d8eaa583eb8f7eeb5478e605850c0d355 Parents: 5f08e53 Author: kl0u <kklou...@gmail.com> Authored: Wed Mar 8 20:18:18 2017 +0100 Committer: kl0u <kklou...@gmail.com> Committed: Mon Mar 13 17:29:03 2017 +0100 ---------------------------------------------------------------------- .../api/operators/AbstractStreamOperator.java | 6 + .../operators/InternalTimeServiceManager.java | 5 + .../InternalWatermarkCallbackService.java | 112 +++++++++++++----- .../operators/AbstractStreamOperatorTest.java | 117 +++++++++++++++++++ .../util/AbstractStreamOperatorTestHarness.java | 9 ++ 5 files changed, 222 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 6e6b147..ef23be9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -957,4 +957,10 @@ public abstract class AbstractStreamOperator<OUT> return timeServiceManager == null ? 0 : timeServiceManager.numEventTimeTimers(); } + + @VisibleForTesting + public int numKeysForWatermarkCallback() { + return timeServiceManager == null ? 0 : + timeServiceManager.numKeysForWatermarkCallback(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 71ffbd2..0b60232 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -188,4 +188,9 @@ class InternalTimeServiceManager<K, N> { } return count; } + + @VisibleForTesting + public int numKeysForWatermarkCallback() { + return watermarkCallbackService.numKeysForWatermarkCallback(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java index a4263e4..9a43853 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -38,8 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback} * and multiple keys, for which the callback will be invoked every time a new {@link Watermark} is received * (after the registration of the key). - * <p> - * <b>NOTE: </b> This service is only available to <b>keyed</b> operators. + * + * <p><b>NOTE: </b> This service is only available to <b>keyed</b> operators. * * @param <K> The type of key returned by the {@code KeySelector}. */ @@ -58,7 +59,17 @@ public class InternalWatermarkCallbackService<K> { * An array of sets of keys keeping the registered keys split * by the key-group they belong to. Each key-group has one set. */ - private final Set<K>[] keysByKeygroup; + private final Set<K>[] registeredKeysByKeyGroup; + + /** + * An array of sets of keys keeping the keys "to delete" split + * by the key-group they belong to. Each key-group has one set. + * + * <p>This is used to avoid potential concurrent modification + * exception when deleting keys from inside the + * {@link #invokeOnWatermarkCallback(Watermark)}. + */ + private final Set<K>[] deletedKeysByKeyGroup; /** A serializer for the registered keys. */ private TypeSerializer<K> keySerializer; @@ -84,7 +95,8 @@ public class InternalWatermarkCallbackService<K> { // the list of ids of the key-groups this task is responsible for int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups(); - this.keysByKeygroup = new Set[localKeyGroups]; + this.registeredKeysByKeyGroup = new Set[localKeyGroups]; + this.deletedKeysByKeyGroup = new Set[localKeyGroups]; } /** @@ -110,7 +122,7 @@ public class InternalWatermarkCallbackService<K> { * @param key The key to be registered. */ public boolean registerKeyForWatermarkCallback(K key) { - return getKeySetForKeyGroup(key).add(key); + return getRegisteredKeysForKeyGroup(key).add(key); } /** @@ -119,13 +131,7 @@ public class InternalWatermarkCallbackService<K> { * @param key The key to be unregistered. */ public boolean unregisterKeyFromWatermarkCallback(K key) { - Set<K> keys = getKeySetForKeyGroup(key); - boolean res = keys.remove(key); - - if (keys.isEmpty()) { - removeKeySetForKey(key); - } - return res; + return getDeletedKeysForKeyGroup(key).add(key); } /** @@ -134,8 +140,11 @@ public class InternalWatermarkCallbackService<K> { * @param watermark The watermark that triggered the invocation. */ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException { + // clean up any keys registered for deletion before calling the callback + cleanupRegisteredKeys(); + if (callback != null) { - for (Set<K> keySet : keysByKeygroup) { + for (Set<K> keySet : registeredKeysByKeyGroup) { if (keySet != null) { for (K key : keySet) { keyContext.setCurrentKey(key); @@ -147,15 +156,38 @@ public class InternalWatermarkCallbackService<K> { } /** + * Does the actual deletion of any keys registered for deletion using the + * {@link #unregisterKeyFromWatermarkCallback(Object)}. + */ + private void cleanupRegisteredKeys() { + for (int keyGroupIdx = 0; keyGroupIdx < registeredKeysByKeyGroup.length; keyGroupIdx++) { + + Set<K> deletedKeys = deletedKeysByKeyGroup[keyGroupIdx]; + if (deletedKeys != null) { + + Set<K> registeredKeys = registeredKeysByKeyGroup[keyGroupIdx]; + if (registeredKeys != null) { + + registeredKeys.removeAll(deletedKeys); + if (registeredKeys.isEmpty()) { + registeredKeysByKeyGroup[keyGroupIdx] = null; + } + } + deletedKeysByKeyGroup[keyGroupIdx] = null; + } + } + } + + /** * Retrieve the set of keys for the key-group this key belongs to. * * @param key the key whose key-group we are searching. * @return the set of registered keys for the key-group. */ - private Set<K> getKeySetForKeyGroup(K key) { + private Set<K> getRegisteredKeysForKeyGroup(K key) { checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups); - return getKeySetForKeyGroup(keyGroupIdx); + return getRegisteredKeysForKeyGroup(keyGroupIdx); } /** @@ -164,27 +196,36 @@ public class InternalWatermarkCallbackService<K> { * @param keyGroupIdx the index of the key group we are interested in. * @return the set of keys for the key-group. */ - private Set<K> getKeySetForKeyGroup(int keyGroupIdx) { + private Set<K> getRegisteredKeysForKeyGroup(int keyGroupIdx) { int localIdx = getIndexForKeyGroup(keyGroupIdx); - Set<K> keys = keysByKeygroup[localIdx]; + Set<K> keys = registeredKeysByKeyGroup[localIdx]; if (keys == null) { keys = new HashSet<>(); - keysByKeygroup[localIdx] = keys; + registeredKeysByKeyGroup[localIdx] = keys; } return keys; } - private void removeKeySetForKey(K key) { + private Set<K> getDeletedKeysForKeyGroup(K key) { checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups); - int localKeyGroupIdx = getIndexForKeyGroup(keyGroupIdx); - keysByKeygroup[localKeyGroupIdx] = null; + return getDeletedKeysForKeyGroup(keyGroupIdx); + } + + private Set<K> getDeletedKeysForKeyGroup(int keyGroupIdx) { + int localIdx = getIndexForKeyGroup(keyGroupIdx); + Set<K> keys = deletedKeysByKeyGroup[localIdx]; + if (keys == null) { + keys = new HashSet<>(); + deletedKeysByKeyGroup[localIdx] = keys; + } + return keys; } /** * Computes the index of the requested key-group in the local datastructures. - * <li/> - * Currently we assume that each task is assigned a continuous range of key-groups, + * + * <p>Currently we assume that each task is assigned a continuous range of key-groups, * e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states * key-grouped in arrays instead of maps, where the offset for each key-group is * the key-group id (an int) minus the id of the first key-group in the local range. @@ -199,7 +240,11 @@ public class InternalWatermarkCallbackService<K> { ////////////////// Fault Tolerance Methods /////////////////// public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception { - Set<K> keySet = getKeySetForKeyGroup(keyGroupIdx); + + // we cleanup also here to avoid checkpointing the deletion set + cleanupRegisteredKeys(); + + Set<K> keySet = getRegisteredKeysForKeyGroup(keyGroupIdx); if (keySet != null) { stream.writeInt(keySet.size()); @@ -224,16 +269,29 @@ public class InternalWatermarkCallbackService<K> { TypeSerializer<K> tmpKeyDeserializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); if (keySerializer != null && !keySerializer.equals(tmpKeyDeserializer)) { - throw new IllegalArgumentException("Tried to restore timers " + - "for the same service with different serializers."); + throw new IllegalArgumentException("Tried to restore keys " + + "for the watermark callback service with mismatching serializers."); } this.keySerializer = tmpKeyDeserializer; - Set<K> keys = getKeySetForKeyGroup(keyGroupIdx); + Set<K> keys = getRegisteredKeysForKeyGroup(keyGroupIdx); for (int i = 0; i < numKeys; i++) { keys.add(keySerializer.deserialize(stream)); } } } + + ////////////////// Testing Methods /////////////////// + + @VisibleForTesting + public int numKeysForWatermarkCallback() { + int count = 0; + for (Set<K> keyGroup: registeredKeysByKeyGroup) { + if (keyGroup != null) { + count += keyGroup.size(); + } + } + return count; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 33def9e..eeee8dc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -788,6 +788,78 @@ public class AbstractStreamOperatorTest { testHarness5.close(); } + @Test + public void testWatermarkCallbackServiceKeyDeletion() throws Exception { + final int MAX_PARALLELISM = 10; + + Tuple2<Integer, String> element1 = new Tuple2<>(7, "start"); + Tuple2<Integer, String> element2 = new Tuple2<>(45, "start"); + Tuple2<Integer, String> element3 = new Tuple2<>(90, "start"); + + TestOperatorWithDeletingKeyCallback op = new TestOperatorWithDeletingKeyCallback(45); + + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness1 = + new KeyedOneInputStreamOperatorTestHarness<>( + op, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 1, + 0); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>(element1)); + testHarness1.processElement(new StreamRecord<>(element2)); + + testHarness1.processWatermark(10L); + + assertEquals(3L, testHarness1.getOutput().size()); + verifyElement(testHarness1.getOutput().poll(), 7); + verifyElement(testHarness1.getOutput().poll(), 45); + verifyWatermark(testHarness1.getOutput().poll(), 10); + + testHarness1.processElement(new StreamRecord<>(element3)); + testHarness1.processWatermark(20L); + + // because at the first watermark the operator removed key 45 + assertEquals(3L, testHarness1.getOutput().size()); + verifyElement(testHarness1.getOutput().poll(), 7); + verifyElement(testHarness1.getOutput().poll(), 90); + verifyWatermark(testHarness1.getOutput().poll(), 20); + + testHarness1.processWatermark(25L); + + verifyElement(testHarness1.getOutput().poll(), 7); + verifyElement(testHarness1.getOutput().poll(), 90); + verifyWatermark(testHarness1.getOutput().poll(), 25); + + // unregister key and then fail + op.unregisterKey(90); + + // take a snapshot with some elements in internal sorting queue + OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + testHarness1.close(); + + testHarness1 = new KeyedOneInputStreamOperatorTestHarness<>( + new TestOperatorWithDeletingKeyCallback(45), + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + MAX_PARALLELISM, + 1, + 0); + testHarness1.setup(); + testHarness1.initializeState(snapshot); + testHarness1.open(); + + testHarness1.processWatermark(30L); + + assertEquals(2L, testHarness1.getOutput().size()); + verifyElement(testHarness1.getOutput().poll(), 7); + verifyWatermark(testHarness1.getOutput().poll(), 30); + + testHarness1.close(); + } + private KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> getTestHarness( int maxParallelism, int noOfTasks, int taskIdx) throws Exception { @@ -868,6 +940,51 @@ public class AbstractStreamOperatorTest { } } + private static class TestOperatorWithDeletingKeyCallback + extends AbstractStreamOperator<Integer> + implements OneInputStreamOperator<Tuple2<Integer, String>, Integer> { + + private static final long serialVersionUID = 9215057823264582305L; + + private final int keyToDelete; + + public TestOperatorWithDeletingKeyCallback(int keyToDelete) { + this.keyToDelete = keyToDelete; + } + + @Override + public void open() throws Exception { + super.open(); + + InternalWatermarkCallbackService<Integer> callbackService = getInternalWatermarkCallbackService(); + + callbackService.setWatermarkCallback(new OnWatermarkCallback<Integer>() { + + @Override + public void onWatermark(Integer integer, Watermark watermark) throws IOException { + + // this is to simulate the case where we may have a concurrent modification + // exception as we iterate over the list of registered keys and we concurrently + // delete the key. + + if (integer.equals(keyToDelete)) { + getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(integer); + } + output.collect(new StreamRecord<>(integer)); + } + }, IntSerializer.INSTANCE); + } + + @Override + public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception { + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(element.getValue().f0); + } + + public void unregisterKey(int key) { + getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(key); + } + } + /** * Testing operator that can respond to commands by either setting/deleting state, emitting * state or setting timers. http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index c984eed..f0a4c42 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -597,6 +597,15 @@ public class AbstractStreamOperatorTestHarness<OUT> { } } + @VisibleForTesting + public int numKeysForWatermarkCallback() { + if (operator instanceof AbstractStreamOperator) { + return ((AbstractStreamOperator) operator).numKeysForWatermarkCallback(); + } else { + throw new UnsupportedOperationException(); + } + } + private class MockOutput implements Output<StreamRecord<OUT>> { private TypeSerializer<OUT> outputSerializer;