[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585088#comment-15585088 ]
ASF GitHub Bot commented on FLINK-4844: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83822468 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** - * Calls - * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also - * calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it + * was not called before. + */ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think the idea of having a single method is nice, and if there is no special reason why we should keep the old signature, I suggest to do it the other way around. `OperatorSnapshotResult`is already a container for all operator states (except the legacy state that will be removed in the near future). Using this removed the need for the multiplexing. However, `OperatorSnapshotResult` does not contain the legacy state anymore, so for the time being, we might return a Tuple2 of both, or some special container class which could also strip away the `RunnableFuture` part. What do you think? > Partitionable Raw Keyed/Operator State > -------------------------------------- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature > Reporter: Stefan Richter > Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)