[
https://issues.apache.org/jira/browse/FLINK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633189#comment-15633189
]
ASF GitHub Bot commented on FLINK-4960:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2747#discussion_r86372831
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
---
@@ -309,6 +310,85 @@ public void initializeState(OperatorStateHandles
operatorStateHandles) throws Ex
initializeCalled = true;
}
+ /**
+ * Takes the different {@link OperatorStateHandles} created by calling
{@link #snapshot(long, long)}
+ * on different instances of {@link AbstractStreamOperatorTestHarness}
(each one representing one subtask)
+ * and repacks them into a single {@link OperatorStateHandles} so that
the parallelism of the test
+ * can change arbitrarily (i.e. be able to scale both up and down).
+ * <p/>
+ * After repacking the partial states, use {@link
#initializeState(OperatorStateHandles)} to initialize
+ * a new instance with the resulting state. Bare in mind that for
parallelism greater than one, you
+ * have to use the constructor {@link
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
+ *
+ * <p/>
+ * <b>NOTE: </b> each of the {@code handles} in the argument list is
assumed to be from a single task of a single
+ * operator (i.e. chain length of one).
+ *
+ * <p/>
+ * For an example of how to use it, have a look at
+ * {@link
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
+ *
+ * @param handles the different states to be merged.
+ * @return the resulting state, or {@code null} if no partial states
are specified.
+ */
+ public static OperatorStateHandles
repackageState(OperatorStateHandles... handles) throws Exception {
+
+ if (handles.length < 1) {
+ return null;
+ } else if (handles.length == 1) {
+ return handles[0];
+ }
+
+ List<OperatorStateHandle> mergedManagedOperatorState = new
ArrayList<>(handles.length);
+ List<OperatorStateHandle> mergedRawOperatorState = new
ArrayList<>(handles.length);
+
+ List<KeyGroupsStateHandle> mergedManagedKeyedState = new
ArrayList<>(handles.length);
+ List<KeyGroupsStateHandle> mergedRawKeyedState = new
ArrayList<>(handles.length);
+
+ for (OperatorStateHandles handle: handles) {
+
+ // each one of the collections are expected to have
+ // one member as they run with parallelism of 1
+
+ Collection<OperatorStateHandle> managedOperatorState =
handle.getManagedOperatorState();
+ Collection<OperatorStateHandle> rawOperatorState =
handle.getRawOperatorState();
+ Collection<KeyGroupsStateHandle> managedKeyedState =
handle.getManagedKeyedState();
+ Collection<KeyGroupsStateHandle> rawKeyedState =
handle.getRawKeyedState();
+
+
+ if ((managedOperatorState != null &&
managedOperatorState.size() > 1) ||
--- End diff --
Is this restriction necessary? I think it would also work if there are
several entries in the list and we just add them all together.
> Allow the AbstractStreamOperatorTestHarness to test scaling down
> ----------------------------------------------------------------
>
> Key: FLINK-4960
> URL: https://issues.apache.org/jira/browse/FLINK-4960
> Project: Flink
> Issue Type: New Feature
> Components: Tests
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently the AbstractStreamOperatorTestHarness allows for testing an
> operator when scaling up, through snapshot and restore. This is not enough
> as many interesting corner cases arise when scaling down or during
> arbitrary combinations of scaling up and down.
> This issue targets to add this functionality so that an operator can snapshot
> its state, restore with different parallelism, and later scale down or
> further up.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)