[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582796#comment-15582796
 ] 

ASF GitHub Bot commented on FLINK-4844:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2648#discussion_r83685681
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
    @@ -199,30 +228,56 @@ public Environment getEnvironment() {
        }
     
        /**
    -    * Calls
    -    * {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
         */
        public void setup() throws Exception {
                operator.setup(mockTask, config, new MockOutput());
                setupCalled = true;
        }
     
        /**
    -    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
    -    * calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
         * if it was not called before.
         */
    -   public void open() throws Exception {
    +   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
                if (!setupCalled) {
                        setup();
                }
    +           operator.initializeState(operatorStateHandles);
    +           initializeCalled = true;
    +   }
    +
    +   /**
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
    +    * was not called before.
    +    */
    +   public void open() throws Exception {
    +           if (!initializeCalled) {
    +                   initializeState(null);
    +           }
                operator.open();
        }
     
        /**
         *
         */
    -   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
    +   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
    --- End diff --
    
    I think we can keep the old method signature by doing something like this:
    ```
        /**
         * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
         */
        public final StreamStateHandle snapshot(long checkpointId, long 
timestamp) throws Exception {
                synchronized (checkpointLock) {
                        CheckpointStreamFactory.CheckpointStateOutputStream 
outStream = stateBackend.createStreamFactory(
                                        new JobID(),
                                        
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
    
                        if (operator instanceof StreamCheckpointedOperator) {
                                ((StreamCheckpointedOperator) 
operator).snapshotState(
                                                outStream,
                                                checkpointId,
                                                timestamp);
                        }
    
                        RunnableFuture<OperatorStateHandle> snapshotRunnable = 
operator.snapshotState(
                                        checkpointId,
                                        timestamp,
                                        stateBackend.createStreamFactory(new 
JobID(), "test_op"));
    
                        if (snapshotRunnable != null) {
                                outStream.write(1);
                                snapshotRunnable.run();
                                OperatorStateHandle operatorStateHandle = 
snapshotRunnable.get();
    
                                InstantiationUtil.serializeObject(outStream, 
operatorStateHandle);
                        } else {
                                outStream.write(0);
                        }
    
                        snapshotToStream(checkpointId, timestamp, outStream);
    
                        return outStream.closeAndGetHandle();
                }
        }
    ```
    This multiplexes the results from the different operator snapshotting 
methods into the same stream. The restore method just tweezes out the correct 
items from the stream and hands them to the correct operator methods.
    
    This would let all tests use the same method and we can keep the 
name/signature the same if we evolve the operator/snapshot interfaces.


> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
>                 Key: FLINK-4844
>                 URL: https://issues.apache.org/jira/browse/FLINK-4844
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to