Repository: flink
Updated Branches:
  refs/heads/master b4e8350f5 -> 2bba2b3f0


[FLINK-1638] [streaming] Operator state checkpointing and injection prototype


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452c39a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452c39a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452c39a9

Branch: refs/heads/master
Commit: 452c39a965d93aab84d2fea84345badd2cc45975
Parents: a34869c
Author: Paris Carbone <seniorcarb...@gmail.com>
Authored: Wed Mar 4 18:30:09 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:48 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    |  26 +++++
 .../flink/runtime/executiongraph/Execution.java |  11 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  10 ++
 .../runtime/executiongraph/ExecutionVertex.java |  19 +++-
 .../api/reader/AbstractRecordReader.java        |   5 +-
 .../jobgraph/tasks/OperatorStateCarrier.java    |  27 +++++
 .../flink/runtime/state/OperatorState.java      | 103 ++++++++++++++++++
 .../flink/runtime/state/StateCheckpoint.java    |  80 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  38 +++++--
 .../runtime/jobmanager/StreamStateMonitor.scala |  78 +++++++++-----
 .../flink/runtime/taskmanager/TaskManager.scala |  15 ++-
 .../flink/streaming/api/StreamConfig.java       |   2 +-
 .../apache/flink/streaming/api/StreamGraph.java |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   2 +-
 .../api/streamvertex/StreamVertex.java          |  53 ++++++++--
 .../streamvertex/StreamingRuntimeContext.java   |   2 +-
 .../apache/flink/streaming/state/MapState.java  |   4 +-
 .../flink/streaming/state/OperatorState.java    | 105 -------------------
 .../streaming/state/PartitionableState.java     |   2 +
 .../flink/streaming/state/SimpleState.java      |   3 +-
 .../state/checkpoint/MapCheckpoint.java         |   3 +-
 .../state/checkpoint/StateCheckpoint.java       |  82 ---------------
 .../flink/streaming/state/MapStateTest.java     |   2 +-
 .../streaming/state/OperatorStateTest.java      |   3 +-
 .../streaming/examples/wordcount/WordCount.java |   5 +
 25 files changed, 439 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 20204d5..a431a76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.OperatorState;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -76,6 +77,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
        /** The list of JAR files required to run this task. */
        private final List<BlobKey> requiredJarFiles;
+       
+       private OperatorState operatorState;
 
        /**
         * Constructs a task deployment descriptor.
@@ -119,6 +122,21 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.requiredJarFiles = new ArrayList<BlobKey>();
        }
 
+       public TaskDeploymentDescriptor(
+                       JobID jobID, JobVertexID vertexID, ExecutionAttemptID 
executionId, String taskName,
+                       int indexInSubtaskGroup, int numberOfSubtasks, 
Configuration jobConfiguration,
+                       Configuration taskConfiguration, String 
invokableClassName,
+                       List<PartitionDeploymentDescriptor> producedPartitions,
+                       List<PartitionConsumerDeploymentDescriptor> 
consumedPartitions,
+                       List<BlobKey> requiredJarFiles, int targetSlotNumber, 
OperatorState operatorState) {
+
+               this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
+                               jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
+                               consumedPartitions, requiredJarFiles, 
targetSlotNumber);
+               
+               setOperatorState(operatorState);
+       }
+
        /**
         * Returns the ID of the job the tasks belongs to.
         */
@@ -224,4 +242,12 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                                taskName, indexInSubtaskGroup, 
numberOfSubtasks, invokableClassName,
                                strProducedPartitions, strConsumedPartitions);
        }
+
+       public void setOperatorState(OperatorState operatorState) {
+               this.operatorState = operatorState;
+       }
+
+       public OperatorState getOperatorState() {
+               return operatorState;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 57ed4c0..89f5183 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
@@ -122,6 +123,8 @@ public class Execution implements Serializable {
        
        private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
        
+       private OperatorState operatorState;
+
        // 
--------------------------------------------------------------------------------------------
        
        public Execution(ExecutionVertex vertex, int attemptNumber, long 
startTimestamp, FiniteDuration timeout) {
@@ -853,4 +856,12 @@ public class Execution implements Serializable {
                return String.format("Attempt #%d (%s) @ %s - [%s]", 
attemptNumber, vertex.getSimpleName(),
                                (assignedResource == null ? "(unassigned)" : 
assignedResource.toString()), state);
        }
+
+       public void setOperatorState(OperatorState operatorState) {
+               this.operatorState = operatorState;
+       }
+
+       public OperatorState getOperatorState() {
+               return operatorState;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e6d9c85..bf34e33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Tuple3;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -529,6 +531,14 @@ public class ExecutionGraph implements Serializable {
                        return false;
                }
        }
+       
+       public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> 
,OperatorState<?>> states)
+       {
+               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> 
,OperatorState<?>> state : states.entrySet())
+               {
+                       
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
+               }
+       }
 
        public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, 
int partitionIndex) {
                Execution execution = currentExecutions.get(executionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0158fbf..b7f962a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.state.OperatorState;
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -89,6 +90,8 @@ public class ExecutionVertex implements Serializable {
        
        private volatile boolean scheduleLocalOnly;
        
+       private OperatorState operatorState;
+       
        // 
--------------------------------------------------------------------------------------------
 
        public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -194,6 +197,14 @@ public class ExecutionVertex implements Serializable {
        public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
                return currentExecution.getAssignedResourceLocation();
        }
+
+       public void setOperatorState(OperatorState operatorState) {
+               this.operatorState = operatorState;
+       }
+
+       public OperatorState getOperatorState() {
+               return operatorState;
+       }
        
        public ExecutionGraph getExecutionGraph() {
                return this.jobVertex.getGraph();
@@ -379,6 +390,12 @@ public class ExecutionVertex implements Serializable {
                                if (grp != null) {
                                        this.locationConstraint = 
grp.getLocationConstraint(subTaskIndex);
                                }
+                               
+                               if(operatorState!=null)
+                               {
+                                       
execution.setOperatorState(operatorState);
+                               }
+                               
                        }
                        else {
                                throw new IllegalStateException("Cannot reset a 
vertex that is in state " + state);
@@ -506,7 +523,7 @@ public class ExecutionVertex implements Serializable {
                return new TaskDeploymentDescriptor(getJobId(), 
getJobvertexId(), executionId, getTaskName(),
                                subTaskIndex, 
getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
                                jobVertex.getJobVertex().getConfiguration(), 
jobVertex.getJobVertex().getInvokableClassName(),
-                               producedPartitions, consumedPartitions, 
jarFiles, slot.getSlotNumber());
+                               producedPartitions, consumedPartitions, 
jarFiles, slot.getSlotNumber(), operatorState);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index cc36438..920792c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,10 +19,7 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.AbstractEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
new file mode 100644
index 0000000..6ea4f27
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.state.OperatorState;
+
+public interface OperatorStateCarrier {
+       
+       public void injectState(OperatorState state);
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
new file mode 100644
index 0000000..74ea1a7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Abstract class for representing operator states in Flink programs. By
+ * implementing the methods declared in this abstraction the state of the
+ * operator can be checkpointed by the fault tolerance mechanism.
+ *
+ * @param <T>
+ *            The type of the operator state.
+ */
+public abstract class OperatorState<T> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       public T state;
+
+       /**
+        * Constructor used for initializing the state. In case of failure, the
+        * state will be reinitialized using this constructor, then
+        * {@link #restore(StateCheckpoint)} will be used to restore from the 
last
+        * available backup.
+        */
+       public OperatorState() {
+               state = null;
+       }
+
+       /**
+        * Initializes the state using the given state object.
+        * 
+        * @param initialState
+        *            The initial state object
+        */
+       public OperatorState(T initialState) {
+               state = initialState;
+       }
+
+       /**
+        * Returns the currently stored state object.
+        * 
+        * @return The state.
+        */
+       public T getState() {
+               return state;
+       }
+
+       /**
+        * Sets the current state object.
+        * 
+        * @param state
+        *            The new state object.
+        * @return The operator state with the new state object set.
+        */
+       public OperatorState<T> setState(T state) {
+               this.state = state;
+               return this;
+       }
+
+       /**
+        * Creates a {@link StateCheckpoint} that will be used to backup the 
state
+        * for failure recovery.
+        * 
+        * @return The {@link StateCheckpoint} created.
+        */
+       public abstract StateCheckpoint<T> checkpoint();
+
+       /**
+        * Restores the state from the given {@link StateCheckpoint}.
+        * 
+        * @param checkpoint
+        *            The checkpoint to restore from
+        * @return The restored operator.
+        */
+       public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
+
+       @Override
+       public String toString() {
+               return state.toString();
+       }
+
+       public boolean stateEquals(OperatorState<T> other) {
+               return state.equals(other.state);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
new file mode 100644
index 0000000..4e4906e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Base class for creating checkpoints for {@link OperatorState}. This
+ * checkpoints will be used to backup states in stateful Flink operators and
+ * also to restore them in case of node failure. To allow incremental
+ * checkpoints override the {@link #update(StateCheckpoint)} method.
+ * 
+ * @param <T>
+ *            The type of the state.
+ */
+public class StateCheckpoint<T> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       public T checkpointedState;
+
+       /**
+        * Creates a state checkpoint from the given {@link OperatorState}
+        * 
+        * @param operatorState
+        *            The {@link OperatorState} to checkpoint.
+        */
+       public StateCheckpoint(OperatorState<T> operatorState) {
+               this.checkpointedState = operatorState.getState();
+       }
+
+       public StateCheckpoint() {
+               this.checkpointedState = null;
+       }
+
+       /**
+        * Returns the state object for the checkpoint.
+        * 
+        * @return The checkpointed state object.
+        */
+       public T getCheckpointedState() {
+               return checkpointedState;
+       }
+
+       /**
+        * Updates the checkpoint from next one. Override this method to allow
+        * incremental updates.
+        * 
+        * @param nextCheckpoint
+        *            The {@link StateCheckpoint} will be used to update from.
+        */
+       public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
+               this.checkpointedState = nextCheckpoint.getCheckpointedState();
+               return this;
+       }
+
+       @Override
+       public String toString() {
+               return checkpointedState.toString();
+       }
+
+       public boolean stateEquals(StateCheckpoint<T> other) {
+               return checkpointedState.equals(other.checkpointedState);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4f58ba7..97a6099 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -301,13 +301,19 @@ class JobManager(val configuration: Configuration,
                 jobInfo.client ! Failure(exception)
                 throw exception
             }
+            
             barrierMonitors.get(jobID) match {
                          case Some(monitor) =>
-                              monitor ! PoisonPill
-                              barrierMonitors.remove(jobID)
+                           newJobStatus match{
+                             case JobStatus.FINISHED | JobStatus.CANCELED =>
+                               monitor ! PoisonPill
+                               barrierMonitors.remove(jobID)
+                             case JobStatus.FAILING => 
+                               monitor ! JobStateRequest
+                           }
                           case None =>
+                            removeJob(jobID)
                         }
-            removeJob(jobID)
           }
           else {
             newJobStatus match {
@@ -315,7 +321,10 @@ class JobManager(val configuration: Configuration,
               case Some((executionGraph, _)) => 
               //FIXME this is just a fast n dirty check for determining 
streaming jobs 
               if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
-                barrierMonitors += jobID -> StreamStateMonitor.props(context, 
executionGraph)
+                barrierMonitors.get(jobID) match {
+                  case None => 
+                    barrierMonitors += jobID -> 
StreamStateMonitor.props(context, executionGraph)
+                }
               }
               case None =>
                 log.error("Cannot create state monitor for job ID {}.", jobID)
@@ -327,12 +336,27 @@ class JobManager(val configuration: Configuration,
           removeJob(jobID)
       }
 
-    case BarrierAck(jobID, jobVertex, instanceID, checkpoint) =>
-      barrierMonitors.get(jobID) match {
-        case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, 
instanceID, checkpoint)
+    case msg: BarrierAck =>
+      barrierMonitors.get(msg.jobID) match {
+        case Some(monitor) => monitor ! msg
+        case None =>
+      }
+    case msg: StateBarrierAck =>
+      barrierMonitors.get(msg.jobID) match {
+        case Some(monitor) => monitor ! msg
         case None =>
       }
 
+    case msg: JobStateResponse =>
+      //inject initial states and restart the job
+      currentJobs.get(msg.jobID) match {
+        case Some(jobExecution) =>
+          import scala.collection.JavaConverters._
+          jobExecution._1.loadOperatorStates(msg.opStates.asJava)
+          jobExecution._1.restart()
+        case None =>
+      } 
+      
     case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
index a37ddb5..65840f9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
@@ -20,65 +20,82 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor._
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph, ExecutionVertex}
-import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+import 
org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
+import org.apache.flink.runtime.state.OperatorState
 
-import scala.collection.JavaConversions.mapAsScalaMap
+import java.lang.Long
+import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.{FiniteDuration, _}
+import scala.concurrent.duration.{FiniteDuration,_}
 
 
 object StreamStateMonitor {
 
-  def props(context: ActorContext, executionGraph: ExecutionGraph,
+  def props(context: ActorContext,executionGraph: ExecutionGraph,
             interval: FiniteDuration = 5 seconds): ActorRef = {
 
     val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
     val monitor = context.system.actorOf(Props(new 
StreamStateMonitor(executionGraph,
-      vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, 
x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L)))
+      vertices,vertices.map(x => 
((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap,Map(),interval,0L,-1L)))
     monitor ! InitBarrierScheduler
     monitor
   }
 
   private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
-    for ((_, execJobVertex) <- executionGraph.getAllVertices;
-         execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
     yield execVertex
   }
 }
 
 class StreamStateMonitor(val executionGraph: ExecutionGraph,
-                         val vertices: Iterable[ExecutionVertex], var acks: 
Map[(JobVertexID, Int), List[Long]],
-                         val interval: FiniteDuration, var curId: Long, var 
ackId: Long)
+                         val vertices: Iterable[ExecutionVertex],
+                         var acks: Map[(JobVertexID,Int),List[Long]],
+                         var states: Map[(JobVertexID, Integer, Long), 
OperatorState[_]],
+                         val interval: FiniteDuration,var curId: Long,var 
ackId: Long)
         extends Actor with ActorLogMessages with ActorLogging {
 
   override def receiveWithLogMessages: Receive = {
+    
     case InitBarrierScheduler =>
-      context.system.scheduler.schedule(interval, interval, self, 
BarrierTimeout)
-      context.system.scheduler.schedule(2 * interval, 2 * interval, self, 
UpdateCurrentBarrier)
+      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval,2 * 
interval,self,TriggerBarrierCompaction)
       log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
-        executionGraph.getJobID, executionGraph.getJobName)
+        executionGraph.getJobID,executionGraph.getJobName)
+      
     case BarrierTimeout =>
       curId += 1
       log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + 
executionGraph.getJobName)
       vertices.filter(v => 
v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
       => vertex.getCurrentAssignedResource.getInstance.getTaskManager
-                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, 
curId))
-    case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
-      acks.get(jobVertexID, instanceID) match {
+                ! 
BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+      
+    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, 
opState) =>
+      states += (jobVertexID, instanceID, checkpointID) -> opState  
+      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
+      
+    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
+      acks.get(jobVertexID,instanceID) match {
         case Some(acklist) =>
-          acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
+          acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
         case None =>
       }
-      log.info(acks.toString)
-    case UpdateCurrentBarrier =>
-      val barrierCount = acks.values.foldLeft(TreeMap[Long, 
Int]().withDefaultValue(0))((dict, myList)
-      => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, 
dict2(elem) + 1)))
+      log.debug(acks.toString)
+      
+    case TriggerBarrierCompaction =>
+      val barrierCount = 
acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) 
+ 1)))
       val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
-      acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= 
ackId)))
+      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
+      states = states.filterKeys(_._3 >= ackId)
       log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+
+    case JobStateRequest =>
+      sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
   }
 }
 
@@ -86,11 +103,20 @@ case class BarrierTimeout()
 
 case class InitBarrierScheduler()
 
-case class UpdateCurrentBarrier()
+case class TriggerBarrierCompaction()
+
+case class JobStateRequest()
+
+case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: 
Map[(JobVertexID, Integer, 
+        Long), OperatorState[_]])
+
+case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
 
-case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
+case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: 
Int,checkpointID: Long)
 
-case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, 
checkpointID: Long)
+case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: 
Integer,
+                           checkpointID: Long, state: OperatorState[_])
+       
 
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4271141..497e784 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -43,7 +43,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver
+import 
org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver}
 import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager}
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import 
org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
@@ -358,7 +358,9 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
           if (i.getExecutionState == ExecutionState.RUNNING) {
             i.getEnvironment.getInvokable match {
               case barrierTransceiver: BarrierTransceiver =>
-                barrierTransceiver.broadcastBarrier(checkpointID)
+                new Thread(new Runnable {
+                  override def run(): Unit =  
barrierTransceiver.broadcastBarrier(checkpointID);
+                }).start()
               case _ => log.error("[FT-TaskManager] Received a barrier for the 
wrong vertex")
             }
           }
@@ -415,6 +417,15 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
         tdd.getTaskName, self)
 
+      //inject operator state
+      if(tdd.getOperatorState != null)
+      {
+        val vertex = task.getEnvironment.getInvokable match {
+          case opStateCarrier: OperatorStateCarrier =>
+            opStateCarrier.injectState(tdd.getOperatorState)
+        }
+      }
+      
       runningTasks.put(executionID, task) match {
         case Some(_) => throw new RuntimeException(
           s"TaskManager contains already a task with executionID 
$executionID.")

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d464ef1..7c90629 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index f69605b..640416d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -46,7 +46,7 @@ import 
org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.sling.commons.json.JSONArray;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index cdf43ee..b0fc364 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index e2cdc34..24a90d0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -25,15 +25,17 @@ import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
+import org.apache.flink.runtime.jobmanager.StateBarrierAck;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
@@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory;
 import akka.actor.ActorRef;
 
 public class StreamVertex<IN, OUT> extends AbstractInvokable implements 
StreamTaskContext<OUT>,
-               BarrierTransceiver {
+               BarrierTransceiver, OperatorStateCarrier {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamVertex.class);
 
@@ -90,9 +92,27 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
                this.context = 
createRuntimeContext(getEnvironment().getTaskName(), this.states);
        }
 
+       protected <T> void invokeUserFunction(StreamInvokable<?, T> 
userInvokable) throws Exception {
+               userInvokable.setRuntimeContext(context);
+               userInvokable.open(getTaskConfiguration());
+
+               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
+                       invokable.setRuntimeContext(context);
+                       invokable.open(getTaskConfiguration());
+               }
+
+               userInvokable.invoke();
+               userInvokable.close();
+
+               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
+                       invokable.close();
+               }
+
+       }
+
        @Override
        public void broadcastBarrier(long id) {
-               // Only called at input vertices
+               //Only called at input vertices
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Received barrier from jobmanager: " + id);
                }
@@ -101,9 +121,21 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
 
        @Override
        public void confirmBarrier(long barrierID) {
-               getEnvironment().getJobManager().tell(
-                               new BarrierAck(getEnvironment().getJobID(), 
getEnvironment().getJobVertexId(),
-                                               
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+               
+               if(states != null && states.containsKey("kafka"))
+               {
+                       getEnvironment().getJobManager().tell(
+                                       new 
StateBarrierAck(getEnvironment().getJobID(), 
+                                                       
getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
+                                                       barrierID, 
states.get("kafka")), ActorRef.noSender());
+               }
+               else
+               {
+                       getEnvironment().getJobManager().tell(
+                                       new 
BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+                                                       
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());      
+               }
+               
        }
 
        public void setInputsOutputs() {
@@ -240,7 +272,8 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        private void actOnBarrier(long id) {
                try {
                        outputHandler.broadcastBarrier(id);
-                       System.out.println("Superstep " + id + " processed: " + 
StreamVertex.this);
+                       //TODO checkpoint state here
+                       confirmBarrier(id);
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Superstep " + id + " processed: " + 
StreamVertex.this);
                        }
@@ -256,6 +289,12 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                return configuration.getOperatorName() + " (" + 
context.getIndexOfThisSubtask() + ")";
        }
 
+       @Override
+       public void injectState(OperatorState state) {
+               states.put("kafka", state);
+       }
+       
+
        private class SuperstepEventListener implements 
EventListener<TaskEvent> {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 0daf3c2..a47100b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -27,7 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
index 85aec52..1b861f5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
@@ -23,8 +23,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.StateCheckpoint;
+
 
 /**
  * A Map that can be used as a partitionable operator state, for both fault

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
deleted file mode 100644
index a0cedba..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
-
-/**
- * Abstract class for representing operator states in Flink programs. By
- * implementing the methods declared in this abstraction the state of the
- * operator can be checkpointed by the fault tolerance mechanism.
- *
- * @param <T>
- *            The type of the operator state.
- */
-public abstract class OperatorState<T> implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       protected T state;
-
-       /**
-        * Constructor used for initializing the state. In case of failure, the
-        * state will be reinitialized using this constructor, then
-        * {@link #restore(StateCheckpoint)} will be used to restore from the 
last
-        * available backup.
-        */
-       public OperatorState() {
-               state = null;
-       }
-
-       /**
-        * Initializes the state using the given state object.
-        * 
-        * @param initialState
-        *            The initial state object
-        */
-       public OperatorState(T initialState) {
-               state = initialState;
-       }
-
-       /**
-        * Returns the currently stored state object.
-        * 
-        * @return The state.
-        */
-       public T getState() {
-               return state;
-       }
-
-       /**
-        * Sets the current state object.
-        * 
-        * @param state
-        *            The new state object.
-        * @return The operator state with the new state object set.
-        */
-       public OperatorState<T> setState(T state) {
-               this.state = state;
-               return this;
-       }
-
-       /**
-        * Creates a {@link StateCheckpoint} that will be used to backup the 
state
-        * for failure recovery.
-        * 
-        * @return The {@link StateCheckpoint} created.
-        */
-       public abstract StateCheckpoint<T> checkpoint();
-
-       /**
-        * Restores the state from the given {@link StateCheckpoint}.
-        * 
-        * @param checkpoint
-        *            The checkpoint to restore from
-        * @return The restored operator.
-        */
-       public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
-
-       @Override
-       public String toString() {
-               return state.toString();
-       }
-
-       public boolean stateEquals(OperatorState<T> other) {
-               return state.equals(other.state);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index c58c545..ddedcd9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.state;
 
+import org.apache.flink.runtime.state.OperatorState;
+
 /**
  * Base class for representing operator states that can be repartitioned for
  * state state and load balancing.

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
index 7ae1f81..b76f5ac 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.state;
 
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 
 /**
  * Basic {@link OperatorState} for storing and updating simple objects. By 
default the

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
index 15d1fd5..ee27d4f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
@@ -21,8 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.apache.flink.streaming.state.MapState;
-import org.apache.flink.streaming.state.OperatorState;
 
 public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
deleted file mode 100644
index 8b76245..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.state.OperatorState;
-
-/**
- * Base class for creating checkpoints for {@link OperatorState}. This
- * checkpoints will be used to backup states in stateful Flink operators and
- * also to restore them in case of node failure. To allow incremental
- * checkpoints override the {@link #update(StateCheckpoint)} method.
- * 
- * @param <T>
- *            The type of the state.
- */
-public class StateCheckpoint<T> implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       T checkpointedState;
-
-       /**
-        * Creates a state checkpoint from the given {@link OperatorState}
-        * 
-        * @param operatorState
-        *            The {@link OperatorState} to checkpoint.
-        */
-       public StateCheckpoint(OperatorState<T> operatorState) {
-               this.checkpointedState = operatorState.getState();
-       }
-
-       public StateCheckpoint() {
-               this.checkpointedState = null;
-       }
-
-       /**
-        * Returns the state object for the checkpoint.
-        * 
-        * @return The checkpointed state object.
-        */
-       public T getCheckpointedState() {
-               return checkpointedState;
-       }
-
-       /**
-        * Updates the checkpoint from next one. Override this method to allow
-        * incremental updates.
-        * 
-        * @param nextCheckpoint
-        *            The {@link StateCheckpoint} will be used to update from.
-        */
-       public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
-               this.checkpointedState = nextCheckpoint.getCheckpointedState();
-               return this;
-       }
-
-       @Override
-       public String toString() {
-               return checkpointedState.toString();
-       }
-
-       public boolean stateEquals(StateCheckpoint<T> other) {
-               return checkpointedState.equals(other.checkpointedState);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
index 194403c..98bafe4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.junit.Test;
 
 public class MapStateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
index 6cb8f51..4e07a3f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.state;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.junit.Test;
 
 public class OperatorStateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index c207d60..b7a1ba3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -101,6 +101,11 @@ public class WordCount {
 
                        // emit the pairs
                        for (String token : tokens) {
+                               //FIXME to be removed. added this for test 
purposes 
+                               if("killme".equals(token))
+                               {
+                                       throw new Exception("byee");
+                               }
                                if (token.length() > 0) {
                                        out.collect(new Tuple2<String, 
Integer>(token, 1));
                                }

Reply via email to