[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592051#comment-15592051 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2648 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585088#comment-15585088 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83822468 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** -* Calls -* {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** -* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also -* calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it +* was not called before. +*/ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think the idea of having a single method is nice, and if there is no special reason why we should keep the old signature, I suggest to do it the other way around. `OperatorSnapshotResult`is already a container for all operator states (except the legacy state that will be removed in the near future). Using this removed the need for the multiplexing. However, `OperatorSnapshotResult` does not contain the legacy state anymore, so for the time being, we might return a Tuple2 of both, or some special container class which could also strip away the `RunnableFuture` part. What do you think? > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585058#comment-15585058 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83819873 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.List; + +/** + * This class holds all state handles for one operator. + */ +public class OperatorStateHandles { --- End diff -- Agreed about `@Internal`. For the name, however, I think it should somehow reflect how this is related to `TaskStateHandles`. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584899#comment-15584899 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83807634 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -38,8 +38,8 @@ private final StreamStateHandle delegateStateHandle; public OperatorStateHandle( - StreamStateHandle delegateStateHandle, - Map stateNameToPartitionOffsets) { + Map stateNameToPartitionOffsets, --- End diff -- Changing the order here had the purpose of having the same order in `OperatorStateHandle` as in `KeyGroupStateHandle`, which was exactly the other way around :) > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584893#comment-15584893 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83807199 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.Entry taskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { - opPara
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582779#comment-15582779 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83666257 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java --- @@ -18,14 +18,18 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; import java.util.Set; /** - * Interface for a backend that manages operator state. + * This interface contains methods for registering operator state with a managed store. */ +@PublicEvolving public interface OperatorStateStore { + /** The default namespace for state in cases where no state name is provided */ String DEFAULT_OPERATOR_STATE_NAME = "_default_"; --- End diff -- This is an implementation detail that should not be exposed on this interface. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582782#comment-15582782 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; + +import java.util.concurrent.RunnableFuture; + +/** + * Encapsulates all runnable futures draw snapshots for a single subtask state of an in-flight checkpointing operation. + */ +public class SnapshotInProgressSubtaskState { --- End diff -- I think this could be changed to ``` /** * Result of {@link AbstractStreamOperator#snapshotState}. */ public class OperatorSnapshotResult { ... } ``` to make it more clearer what it is supposed to be. And it should probably be in the same module/package as `AbstractStreamOperator` but the code layout of the state classes seems a bit messy so not sure if it's possible. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582774#comment-15582774 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83664806 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -48,7 +43,7 @@ * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public -public interface RuntimeContext { +public interface RuntimeContext extends KeyedStateStore { --- End diff -- I think it would be better to not have `RuntimeContext` be a `KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably provide a `KeyedStateStore` or at least use one internally to implement the state methods. Properly separating the two now seems prudent. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582784#comment-15582784 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Output stream that allows to write state into several partitions. + * @param type of the returned state handle. + */ +public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream { --- End diff -- I think the javadoc and class name don't accurately describe what this does (possibly due to some refactoring). Now it should probably be called something like `NonClosingCheckpointOutputStream`. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582776#comment-15582776 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83670574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -38,8 +38,8 @@ private final StreamStateHandle delegateStateHandle; public OperatorStateHandle( - StreamStateHandle delegateStateHandle, - Map stateNameToPartitionOffsets) { + Map stateNameToPartitionOffsets, --- End diff -- This is only changing ordering but it's triggering some one-line changes in other files that make it hard to keep track of what changes are really changes. Could you maybe revert that and change it in a follow-up? > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582792#comment-15582792 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680393 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -2132,24 +2132,36 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex "non-partitioned state changed."); } + @Test --- End diff -- Very good additions! 👍 😺 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582777#comment-15582777 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673691 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java --- @@ -34,10 +37,14 @@ private static final long serialVersionUID = -2394696997971923995L; - private static final Logger LOG = LoggerFactory.getLogger(SubtaskState.class); - - /** The state of the parallel operator */ - private final ChainedStateHandle chainedStateHandle; + /** +* The state of the parallel operator +*/ + private final ChainedStateHandle nonPartitionableOperatorState; --- End diff -- I think these names don't match the names in the rest of the code base. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582791#comment-15582791 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679164 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +public interface KeyGroupsList extends Iterable { --- End diff -- This one could benefit from some Javadocs. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582786#comment-15582786 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83674157 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.util.CollectionUtil; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * This class encapsulates all state handles for a task. + */ +public class TaskStateHandles implements Serializable { --- End diff -- Very good addition for simplifying the handling of all the state handles that are flying around. 👍 😄 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582796#comment-15582796 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83685681 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** -* Calls -* {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** -* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also -* calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it +* was not called before. +*/ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think we can keep the old method signature by doing something like this: ``` /** * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. */ public final StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { synchronized (checkpointLock) { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); if (operator instanceof StreamCheckpointedOperator) { ((StreamCheckpointedOperator) operator).snapshotState( outStream, checkpointId, timestamp); } RunnableFuture snapshotRunnable = operator.snapshotState( checkpointId, timestamp, stateBackend.createStreamFactory(new JobID(), "test_op")); if (snapshotRunnable != null) { outStream.write(1); snapshotRunnable.run(); OperatorStateHandle operatorStateHandle = snapshotRunnable.get(); InstantiationUtil.serializeObject(outStream, operatorStateHandle); } else { outStream.write(0); } snapshotToStream(checkpointId, timestamp, outStream); return outStream.closeAndGetHandle(); } } ``` This multiplexes the results from the different operator snapshotting methods into the same stream. The restore method just tweezes out the correct items from the stream and hands them to the correct operator methods. This would let all tests use the same method and we can keep the name/signature the same if we evolve t
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582794#comment-15582794 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.concurrent.RunnableFuture; + +public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext { --- End diff -- Some Javadocs would probably be helpful. 😉 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582793#comment-15582793 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83682543 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -238,11 +294,51 @@ public void dispose() throws Exception { } @Override - public RunnableFuture snapshotState( + public SnapshotInProgressSubtaskState snapshotState( --- End diff -- This should probably be `final`, similarly to how `initializeState(OperatorStateHandles)` is `final`. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582787#comment-15582787 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83677603 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader userClassLoader) { } /** -* @see SnapshotProvider +* @see Snapshotable --- End diff -- I think an empty Javadoc does simply prevent tools from displaying the Javadoc of the overridden method so it's probably best to remove those. There are also more instances of that in this file. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582785#comment-15582785 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java --- @@ -25,6 +25,13 @@ import java.util.HashSet; import java.util.Set; +/** + * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed. + * --- End diff -- The correct way of separating paragraphs in Javadoc is this: ``` Paragraph one. Paragraph two ... ``` I know it's not proper HTML nowadays but that's how it's supposed to be ... 😜 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582788#comment-15582788 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * This interface provides a context in which operators that use managed state (i.e. state that is managed by state + * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface + * mainly provides meta information about the checkpoint. + */ +@PublicEvolving +public interface ManagedSnapshotContext { --- End diff -- Same comments as for `ManagedInitializationContext` hold here. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582789#comment-15582789 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; + +/** + * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that + * is managed by state backends). + * + * + * Operator state is available to all operators, while keyed state is only available for operators after keyBy. + * + * + * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from + * a previous execution of this operator. + * + */ +public interface ManagedInitializationContext { --- End diff -- I think this interface and its sub interfaces/implementations should be in the same module as `AbstractStreamOperator` and somewhere in the api package space. Also, the naming could be changed to something like `StateInitializationContext` -> `FunctionInitializationContext` -> `OperatorInitializationContext`. Or something reflecting their purpose but `StateInitializationContext` should be at the bottom of the hierarchy. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582795#comment-15582795 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83683321 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.List; + +/** + * This class holds all state handles for one operator. + */ +public class OperatorStateHandles { --- End diff -- This should be `@Internal` or at least `@PublicEvolving`. Also, the name clashes a bit with `OperatorStateHandle` which does something quite different. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582775#comment-15582775 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83663696 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -305,39 +306,42 @@ public void close() throws Exception { super.close(); } } - + // // Checkpoint and restore // - @Override - public void initializeState(OperatorStateStore stateStore) throws Exception { - this.stateStore = stateStore; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { - ListState offsets = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + offsetsStateForCheckpoint = stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - restoreToOffset = new HashMap<>(); + if (context.isRestored()) { + restoreToOffset = new HashMap<>(); + for (Serializable serializable : offsetsStateForCheckpoint.get()) { + @SuppressWarnings("unchecked") + Tuple2 kafkaOffset = (Tuple2) serializable; + restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + } - for (Serializable serializable : offsets.get()) { - @SuppressWarnings("unchecked") - Tuple2 kafkaOffset = (Tuple2) serializable; - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + LOG.info("Setting restore state in the FlinkKafkaConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", restoreToOffset); + } + } else { + LOG.info("No restore state for FlinkKafkaConsumer."); } - - LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoreToOffset); } @Override - public void prepareSnapshot(long checkpointId, long timestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("storeOperatorState() called on closed source"); } else { - ListState listState = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - listState.clear(); + offsetsStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { --- End diff -- This is a workaround for the fact that we initialise the fetcher in `run()` and not in `open()`. Might be worthwhile to change that in a follow-up, if at all possible. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582780#comment-15582780 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661983 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -55,20 +56,20 @@ /** * Base class of all Flink Kafka Consumer data sources. * This implements the common behavior across all Kafka versions. - * + * --- End diff -- This file contains a lot of whitespace changes. It would be good to remove them before we merge this. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582778#comment-15582778 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661373 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long timestamp) throws Exception } } + @Override --- End diff -- The methods don't need to be reordered here. Also, the state store is not used anywhere, as far as I can see. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582781#comment-15582781 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83669355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.Entry taskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { - opParallelSt
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582783#comment-15582783 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83662569 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -80,38 +81,38 @@ // private final List topics; - + /** The schema to convert between Kafka's byte messages, and Flink's objects */ protected final KeyedDeserializationSchema deserializer; /** The set of topic partitions that the source will read */ protected List subscribedPartitions; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue> periodicWatermarkAssigner; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, -* to exploit per-partition timestamp characteristics. +* to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue> punctuatedWatermarkAssigner; - private transient OperatorStateStore stateStore; + private transient ListState offsetsStateForCheckpoint; --- End diff -- This can can have a more concrete type. You changed `OperatorStateStore.getSerializableListState` to this: ``` ListState getSerializableListState(String stateName) throws Exception; ``` > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582790#comment-15582790 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680077 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.io.IOException; +import java.io.InputStream; + +public class NonClosingStreamDecorator extends InputStream { --- End diff -- It's quite clear what it does but Javadocs would still be nice. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582082#comment-15582082 ] ASF GitHub Bot commented on FLINK-4844: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2648 Please review @aljoscha and whoever else wants to take a look. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582079#comment-15582079 ] ASF GitHub Bot commented on FLINK-4844: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2648 [FLINK-4844] Partitionable Raw Keyed/Operator State Partitionable operator and keyed state are currently only available by using backends. However, the serialization code for many operators is build around reading/writing their state to a stream for checkpointing. We want to provide partitionable states also through streams, so that migrating existing operators becomes more easy. This PR includes the following main changes: # 1) `KeyedStateCheckpointOutputStream` and `OperatorStateCheckpointedOutputStream` Those class allow writing partitionable keyed (and operator) state in a stream for checkpointing. They enhance the basic stream interface with methods to signal the start of new partitions. # 2) Changes to `StreamTask` and `AbstractStreamOperator` The lifecycle of StreamTask is slightly modified for the initialization of operator states. In `AbstractStreamOperator`, two new hooks have ben added that new operators can override: ``` /** * Stream operators with state, which want to participate in a snapshot need to override this hook method. * * @param context context that provides information and means required for taking a snapshot */ public void snapshotState(StateSnapshotContext context) throws Exception { } /** * Stream operators with state which can be restored need to override this hook method. * * @param context context that allows to register different states. */ public void initializeState(StateInitializationContext context) throws Exception { } ``` Access to snapshot/restore partitionable state is provided through the respective context # 3) Exposing partitionable states to UDFs The interface `CheckpointedFunction` must be implemented by stateful UDFs: ``` /** * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself. * * @param context the context for drawing a snapshot of the operator * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * This method is called when an operator is initialized, so that the function can set up it's state through * the provided context. Initialization typically includes registering user states through the state stores * that the context offers. * * @param context the context for initializing the operator * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception; ``` Contexts for initialization and snapshot provide a subset of the functionality of the internal contexts from `AbstractStreamOperator` and which is safe to present to user code. # 4) This PR also introduces serval classes that bundle state handles One example of this would be `TaskStateHandles`. The purpose of this is a) reducing the number of parameters passed through several methods and b) making adding/removing state handles simpler. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink stream-keyed-state-checkpointing Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2648.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2648 commit 2f72bdfe9e39f76a766c5fad68500caa39e9a624 Author: Stefan Richter Date: 2016-10-04T08:59:38Z StreamCheckpointed operator state WIP commit f3a1ee0963ac8521cb9581dcb1ab6547c50c304f Author: Stefan Richter Date: 2016-10-14T10:15:07Z Added to One/TwoInputStreamOperatorTestHarness commit b3d7b683b6fd3e6a7fb2158e67fe94ac6e28e730 Author: Stefan Richter Date: 2016-10-17T09:38:50Z Small optimizations for GC friendliness. commit 614592d475486c1a2eff8e9fc24a423ff18a78bf Author: Stefan Richter Date: 2016-10-13T09:32:19Z Added AbstractUdfStreamOperatorLifecycleTest commit 886ca3e2d4362db2f5125b0c1ea1f2078042c456 Author: Stefan Richter Date: 2016-10-17T11:51:40Z Rename KeyedStateCheckpoin