[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582796#comment-15582796 ]
ASF GitHub Bot commented on FLINK-4844: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83685681 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** - * Calls - * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also - * calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it + * was not called before. + */ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think we can keep the old method signature by doing something like this: ``` /** * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. */ public final StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { synchronized (checkpointLock) { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); if (operator instanceof StreamCheckpointedOperator) { ((StreamCheckpointedOperator) operator).snapshotState( outStream, checkpointId, timestamp); } RunnableFuture<OperatorStateHandle> snapshotRunnable = operator.snapshotState( checkpointId, timestamp, stateBackend.createStreamFactory(new JobID(), "test_op")); if (snapshotRunnable != null) { outStream.write(1); snapshotRunnable.run(); OperatorStateHandle operatorStateHandle = snapshotRunnable.get(); InstantiationUtil.serializeObject(outStream, operatorStateHandle); } else { outStream.write(0); } snapshotToStream(checkpointId, timestamp, outStream); return outStream.closeAndGetHandle(); } } ``` This multiplexes the results from the different operator snapshotting methods into the same stream. The restore method just tweezes out the correct items from the stream and hands them to the correct operator methods. This would let all tests use the same method and we can keep the name/signature the same if we evolve the operator/snapshot interfaces. > Partitionable Raw Keyed/Operator State > -------------------------------------- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature > Reporter: Stefan Richter > Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)