[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585088#comment-15585088
 ] 

ASF GitHub Bot commented on FLINK-4844:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2648#discussion_r83822468
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
    @@ -199,30 +228,56 @@ public Environment getEnvironment() {
        }
     
        /**
    -    * Calls
    -    * {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
         */
        public void setup() throws Exception {
                operator.setup(mockTask, config, new MockOutput());
                setupCalled = true;
        }
     
        /**
    -    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
    -    * calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
         * if it was not called before.
         */
    -   public void open() throws Exception {
    +   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
                if (!setupCalled) {
                        setup();
                }
    +           operator.initializeState(operatorStateHandles);
    +           initializeCalled = true;
    +   }
    +
    +   /**
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
    +    * was not called before.
    +    */
    +   public void open() throws Exception {
    +           if (!initializeCalled) {
    +                   initializeState(null);
    +           }
                operator.open();
        }
     
        /**
         *
         */
    -   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
    +   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
    --- End diff --
    
    I think the idea of having a single method is nice, and if there is no 
special reason why we should keep the old signature, I suggest to do it the 
other way around. `OperatorSnapshotResult`is already a container for all 
operator states (except the legacy state that will be removed in the near 
future). Using this removed the need for the multiplexing.
    
    However, `OperatorSnapshotResult` does not contain the legacy state 
anymore, so for the time being, we might return a Tuple2 of both, or some 
special container class which could also strip away the `RunnableFuture` part.
    
    What do you think?


> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
>                 Key: FLINK-4844
>                 URL: https://issues.apache.org/jira/browse/FLINK-4844
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to