[FLINK-1638] [streaming] Add StateHandle and include javadoc

This closes #459


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2b5c21d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2b5c21d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2b5c21d

Branch: refs/heads/master
Commit: f2b5c21da6a297f20ffad99e9f26ccb0a9491881
Parents: 490fa70
Author: Paris Carbone <seniorcarb...@gmail.com>
Authored: Mon Mar 9 14:58:30 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    | 13 ++--
 .../flink/runtime/executiongraph/Execution.java | 13 ++--
 .../runtime/executiongraph/ExecutionGraph.java  |  6 +-
 .../runtime/executiongraph/ExecutionVertex.java | 29 ++++----
 .../jobgraph/tasks/BarrierTransceiver.java      | 16 ++++-
 .../jobgraph/tasks/OperatorStateCarrier.java    | 15 ++--
 .../flink/runtime/state/LocalStateHandle.java   | 41 +++++++++++
 .../apache/flink/runtime/state/StateHandle.java | 40 +++++++++++
 .../StreamCheckpointCoordinator.scala           | 76 +++++++++++++-------
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../api/streamvertex/StreamVertex.java          |  9 ++-
 11 files changed, 191 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index b2573f7..6993248 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -79,7 +78,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        /** The list of JAR files required to run this task. */
        private final List<BlobKey> requiredJarFiles;
        
-       private Map<String, OperatorState<?>> operatorStates;
+       private StateHandle operatorStates;
 
        /**
         * Constructs a task deployment descriptor.
@@ -129,13 +128,13 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        Configuration taskConfiguration, String 
invokableClassName,
                        List<PartitionDeploymentDescriptor> producedPartitions,
                        List<PartitionConsumerDeploymentDescriptor> 
consumedPartitions,
-                       List<BlobKey> requiredJarFiles, int targetSlotNumber, 
Map<String,OperatorState<?>> operatorStates) {
+                       List<BlobKey> requiredJarFiles, int targetSlotNumber, 
StateHandle operatorStates) {
 
                this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
                                jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
                                consumedPartitions, requiredJarFiles, 
targetSlotNumber);
                
-               setOperatorStates(operatorStates);
+               setOperatorState(operatorStates);
        }
 
        /**
@@ -244,11 +243,11 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                                strProducedPartitions, strConsumedPartitions);
        }
 
-       public void setOperatorStates(Map<String,OperatorState<?>> 
operatorStates) {
+       public void setOperatorState(StateHandle operatorStates) {
                this.operatorStates = operatorStates;
        }
 
-       public Map<String, OperatorState<?>> getOperatorStates() {
+       public StateHandle getOperatorStates() {
                return operatorStates;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 93845c7..cf24b20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,7 +46,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
@@ -56,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -124,7 +123,7 @@ public class Execution implements Serializable {
        
        private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
        
-       private Map<String,OperatorState<?>> operatorStates;
+       private StateHandle operatorState;
 
        // 
--------------------------------------------------------------------------------------------
        
@@ -858,11 +857,11 @@ public class Execution implements Serializable {
                                (assignedResource == null ? "(unassigned)" : 
assignedResource.toString()), state);
        }
 
-       public void setOperatorStates(Map<String,OperatorState<?>> 
operatorStates) {
-               this.operatorStates = operatorStates;
+       public void setOperatorState(StateHandle operatorStates) {
+               this.operatorState = operatorStates;
        }
 
-       public Map<String,OperatorState<?>> getOperatorStates() {
-               return operatorStates;
+       public StateHandle getOperatorState() {
+               return operatorState;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c6c3a7..c319a5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -567,9 +567,9 @@ public class ExecutionGraph implements Serializable {
                }
        }
        
-       public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, 
Integer, Long> , Map<String,OperatorState<?>>> states)
+       public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, 
Integer, Long> , StateHandle> states)
        {
-               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , 
Map<String,OperatorState<?>>> state : states.entrySet())
+               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> 
state : states.entrySet())
                {
                        
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41d34d5..24bcf21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.PartialPartitionInfo;
 import 
org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -38,9 +38,8 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
 import org.slf4j.Logger;
-
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -48,13 +47,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
@@ -91,7 +86,7 @@ public class ExecutionVertex implements Serializable {
        
        private volatile boolean scheduleLocalOnly;
        
-       private Map<String,OperatorState<?>> operatorState;
+       private StateHandle operatorState;
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -199,11 +194,11 @@ public class ExecutionVertex implements Serializable {
                return currentExecution.getAssignedResourceLocation();
        }
 
-       public void setOperatorState(Map<String,OperatorState<?>> 
operatorState) {
+       public void setOperatorState(StateHandle operatorState) {
                this.operatorState = operatorState;
        }
 
-       public Map<String,OperatorState<?>> getOperatorState() {
+       public StateHandle getOperatorState() {
                return operatorState;
        }
        
@@ -382,7 +377,8 @@ public class ExecutionVertex implements Serializable {
                        Execution execution = currentExecution;
                        ExecutionState state = execution.getState();
 
-                       if (state == FINISHED || state == CANCELED || state == 
FAILED) {
+                       if (state == ExecutionState.FINISHED || state == 
ExecutionState.CANCELED
+                                       || state == ExecutionState.FAILED) {
                                priorExecutions.add(execution);
                                currentExecution = new Execution(this, 
execution.getAttemptNumber()+1,
                                                System.currentTimeMillis(), 
timeout);
@@ -394,7 +390,7 @@ public class ExecutionVertex implements Serializable {
                                
                                if(operatorState!=null)
                                {
-                                       
execution.setOperatorStates(operatorState);
+                                       
execution.setOperatorState(operatorState);
                                }
                                
                        }
@@ -440,8 +436,9 @@ public class ExecutionVertex implements Serializable {
                ExecutionState state = execution.getState();
 
                // sanity check
-               if (!(state == FINISHED || state == CANCELED || state == 
FAILED)) {
-                       throw new IllegalStateException("Cannot archive 
ExecutionVertex that is not in a finished state.");
+               if (!(state == ExecutionState.FINISHED || state == 
ExecutionState.CANCELED || state == ExecutionState.FAILED)) {
+                       throw new IllegalStateException(
+                                       "Cannot archive ExecutionVertex that is 
not in a finished state.");
                }
                
                // prepare the current execution for archiving

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
index c56da62..0a8642e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -18,10 +18,24 @@
 package org.apache.flink.runtime.jobgraph.tasks;
 
 
+/**
+ * A BarrierTransceiver describes an operator's barrier checkpointing behavior 
used for 
+ * fault tolerance. In the most common case [[broadcastBarrier]] is being 
expected to be called 
+ * periodically upon receiving a checkpoint barrier. Furthermore, a 
[[confirmBarrier]] method should
+ * be implemented and used for acknowledging a specific checkpoint checkpoint.
+ */
 public interface BarrierTransceiver {
 
+       /**
+        * A callback for notifying an operator of a new checkpoint barrier.
+        * @param barrierID
+        */
        public void broadcastBarrier(long barrierID);
-       
+
+       /**
+        * A callback for confirming that a barrier checkpoint is complete
+        * @param barrierID
+        */
        public void confirmBarrier(long barrierID);
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index e8b6d6b..670dc3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
-import org.apache.flink.runtime.state.OperatorState;
-
-import java.util.Map;
+import org.apache.flink.runtime.state.StateHandle;
 
+/**
+ * This is an interface meant to be implemented by any invokable that has to 
support state recovery.
+ * It is mainly used by the TaskManager to identify operators that support 
state recovery in order 
+ * to inject their initial state upon creation.
+ */
 public interface OperatorStateCarrier {
-       
-       public void injectStates(Map<String, OperatorState<?>> state);
-       
+
+       public void injectState(StateHandle stateHandle);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
new file mode 100644
index 0000000..ac40bf8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.util.Map;
+
+/**
+ * A StateHandle that includes a copy of the state itself. This state handle 
is recommended for 
+ * cases where the operatorState is lightweight enough to pass throughout the 
network. 
+ * 
+ */
+public class LocalStateHandle implements StateHandle{
+       
+       private final Map<String, OperatorState<?>>  state;
+
+       public LocalStateHandle(Map<String,OperatorState<?>> state) {
+               this.state = state;
+       }
+
+       @Override
+       public Map<String,OperatorState<?>> getState() {
+               return state;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
new file mode 100644
index 0000000..ddc8038
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * StateHandle is a general handle interface meant to abstract operator state 
fetching. 
+ * A StateHandle implementation can for example include the state itself in 
cases where the state 
+ * is lightweight or fetching it lazily from some external storage when the 
state is too large.
+ * 
+ */
+public interface StateHandle extends Serializable{
+
+       /**
+        * getState should retrieve and return the state managed the handle. 
+        * 
+        * @return
+        */
+       public Map<String,OperatorState<?>> getState();
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index 7ab6a6f..fee69b5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -26,38 +26,47 @@ import 
org.apache.flink.runtime.execution.ExecutionState.RUNNING
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
 import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
-import org.apache.flink.runtime.state.OperatorState
+import org.apache.flink.runtime.state.StateHandle
 
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.{FiniteDuration, _}
 
-object StreamCheckpointCoordinator {
-
-  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
-            interval: FiniteDuration = 5 seconds): ActorRef = {
-
-    val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
-    val monitor = context.system.actorOf(Props(new 
StreamCheckpointCoordinator(executionGraph,
-      vertices,vertices.map(x => 
((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
-    monitor ! InitBarrierScheduler
-    monitor
-  }
-
-  private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
-    for((_,execJobVertex) <- executionGraph.getAllVertices;
-        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
-    yield execVertex
-  }
-}
+/**
+ * The StreamCheckpointCoordinator is responsible for operator state 
management and checkpoint
+ * coordination in streaming jobs. It periodically sends checkpoint barriers 
to the sources of a
+ * running job and constantly collects acknowledgements from operators while 
the barriers are being 
+ * disseminated throughout the execution graph. Upon time intervals it finds 
the last globally
+ * acknowledged checkpoint barrier to be used for a consistent recovery and 
loads all associated 
+ * state handles to the respected execution vertices.
+ * 
+ * The following messages describe this actor's expected behavior: 
+ *
+ *  - [[InitBarrierScheduler]] initiates the actor and schedules the periodic 
[[BarrierTimeout]] 
+ *  and [[CompactAndUpdate]] messages that are used for maintaining the state 
checkpointing logic. 
+ *
+ *  - [[BarrierTimeout]] is periodically triggered upon initiation in order to 
start a new 
+ *  checkpoint barrier. That is when the barriers are being disseminated to 
the source vertices.
+ *
+ *  - [[BarrierAck]] is being sent by each operator upon the completion of a 
state checkpoint. All
+ *  such acknowledgements are being collected and inspected upon 
[[CompactAndUpdate]] handling in
+ *  order to find out the last consistent checkpoint.
+ *  
+ *  - [[StateBarrierAck]] describes an acknowledgement such as the case of a 
[[BarrierAck]] that 
+ *  additionally carries operatorState with it.
+ *
+ * - [[CompactAndUpdate]] marks the last globally consistent checkpoint 
barrier to be used for 
+ * recovery purposes and removes all older states and acknowledgements up to 
that barrier.
+ * Furthermore, it updates the current ExecutionGraph with the current 
operator state handles 
+ * 
+ */
 
 class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
                          val vertices: Iterable[ExecutionVertex],
                          var acks: Map[(JobVertexID,Int),List[Long]],
                          var states: Map[(JobVertexID, Integer, Long), 
-                                 java.util.Map[String,OperatorState[_]]],
+                                 StateHandle],
                          val interval: FiniteDuration,var curId: Long,var 
ackId: Long)
         extends Actor with ActorLogMessages with ActorLogging {
   
@@ -95,8 +104,6 @@ class StreamCheckpointCoordinator(val executionGraph: 
ExecutionGraph,
           }
           log.debug(acks.toString)
       
-      
-      
     case CompactAndUpdate =>
       val barrierCount = 
acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
       => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) 
+ 1)))
@@ -108,7 +115,26 @@ class StreamCheckpointCoordinator(val executionGraph: 
ExecutionGraph,
       executionGraph.loadOperatorStates(states)
       
   }
-  
+}
+
+object StreamCheckpointCoordinator {
+
+  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new 
StreamCheckpointCoordinator(executionGraph,
+      vertices,vertices.map(x => 
((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
 }
 
 case class BarrierTimeout()
@@ -122,7 +148,7 @@ case class BarrierReq(attemptID: 
ExecutionAttemptID,checkpointID: Long)
 case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: 
Int,checkpointID: Long)
 
 case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: 
Integer,
-                           checkpointID: Long, states: 
java.util.Map[String,OperatorState[_]])
+                           checkpointID: Long, states: StateHandle)
        
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3de917b..53c45ce 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -443,7 +443,7 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
       {
         val vertex = task.getEnvironment.getInvokable match {
           case opStateCarrier: OperatorStateCarrier =>
-            opStateCarrier.injectStates(tdd.getOperatorStates)
+            opStateCarrier.injectState(tdd.getOperatorStates)
         }
       }
       

http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index eb0d6ed..3548712 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
 import org.apache.flink.runtime.jobmanager.StateBarrierAck;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -112,7 +114,8 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                if (configuration.getStateMonitoring() && !states.isEmpty()) {
                        getEnvironment().getJobManager().tell(
                                        new 
StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-                                                       .getJobVertexId(), 
context.getIndexOfThisSubtask(), barrierID, states),
+                                                       .getJobVertexId(), 
context.getIndexOfThisSubtask(), barrierID, 
+                                                       new 
LocalStateHandle(states)),
                                        ActorRef.noSender());
                } else {
                        getEnvironment().getJobManager().tell(
@@ -284,8 +287,8 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
         * Re-injects the user states into the map
         */
        @Override
-       public void injectStates(Map<String, OperatorState<?>> states) {
-               this.states.putAll(states);
+       public void injectState(StateHandle stateHandle) {
+               this.states.putAll(stateHandle.getState());
        }
 
        private class SuperstepEventListener implements 
EventListener<TaskEvent> {

Reply via email to