[FLINK-1638] [streaming] Add StateHandle and include javadoc This closes #459
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2b5c21d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2b5c21d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2b5c21d Branch: refs/heads/master Commit: f2b5c21da6a297f20ffad99e9f26ccb0a9491881 Parents: 490fa70 Author: Paris Carbone <seniorcarb...@gmail.com> Authored: Mon Mar 9 14:58:30 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../deployment/TaskDeploymentDescriptor.java | 13 ++-- .../flink/runtime/executiongraph/Execution.java | 13 ++-- .../runtime/executiongraph/ExecutionGraph.java | 6 +- .../runtime/executiongraph/ExecutionVertex.java | 29 ++++---- .../jobgraph/tasks/BarrierTransceiver.java | 16 ++++- .../jobgraph/tasks/OperatorStateCarrier.java | 15 ++-- .../flink/runtime/state/LocalStateHandle.java | 41 +++++++++++ .../apache/flink/runtime/state/StateHandle.java | 40 +++++++++++ .../StreamCheckpointCoordinator.scala | 76 +++++++++++++------- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../api/streamvertex/StreamVertex.java | 9 ++- 11 files changed, 191 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index b2573f7..6993248 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -23,12 +23,11 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateHandle; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -79,7 +78,7 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List<BlobKey> requiredJarFiles; - private Map<String, OperatorState<?>> operatorStates; + private StateHandle operatorStates; /** * Constructs a task deployment descriptor. @@ -129,13 +128,13 @@ public final class TaskDeploymentDescriptor implements Serializable { Configuration taskConfiguration, String invokableClassName, List<PartitionDeploymentDescriptor> producedPartitions, List<PartitionConsumerDeploymentDescriptor> consumedPartitions, - List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) { + List<BlobKey> requiredJarFiles, int targetSlotNumber, StateHandle operatorStates) { this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, consumedPartitions, requiredJarFiles, targetSlotNumber); - setOperatorStates(operatorStates); + setOperatorState(operatorStates); } /** @@ -244,11 +243,11 @@ public final class TaskDeploymentDescriptor implements Serializable { strProducedPartitions, strConsumedPartitions); } - public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { + public void setOperatorState(StateHandle operatorStates) { this.operatorStates = operatorStates; } - public Map<String, OperatorState<?>> getOperatorStates() { + public StateHandle getOperatorStates() { return operatorStates; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 93845c7..cf24b20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; -import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -56,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; @@ -124,7 +123,7 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - private Map<String,OperatorState<?>> operatorStates; + private StateHandle operatorState; // -------------------------------------------------------------------------------------------- @@ -858,11 +857,11 @@ public class Execution implements Serializable { (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } - public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { - this.operatorStates = operatorStates; + public void setOperatorState(StateHandle operatorStates) { + this.operatorState = operatorStates; } - public Map<String,OperatorState<?>> getOperatorStates() { - return operatorStates; + public StateHandle getOperatorState() { + return operatorState; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0c6c3a7..c319a5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -567,9 +567,9 @@ public class ExecutionGraph implements Serializable { } } - public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states) + public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) { - for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet()) + for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> state : states.entrySet()) { tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 41d34d5..24bcf21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,17 +18,17 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.runtime.deployment.PartialPartitionInfo; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.deployment.PartialPartitionInfo; import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; @@ -38,9 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateHandle; import org.slf4j.Logger; - import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -48,13 +47,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; -import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; -import static org.apache.flink.runtime.execution.ExecutionState.FAILED; -import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; /** * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of @@ -91,7 +86,7 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; - private Map<String,OperatorState<?>> operatorState; + private StateHandle operatorState; // -------------------------------------------------------------------------------------------- @@ -199,11 +194,11 @@ public class ExecutionVertex implements Serializable { return currentExecution.getAssignedResourceLocation(); } - public void setOperatorState(Map<String,OperatorState<?>> operatorState) { + public void setOperatorState(StateHandle operatorState) { this.operatorState = operatorState; } - public Map<String,OperatorState<?>> getOperatorState() { + public StateHandle getOperatorState() { return operatorState; } @@ -382,7 +377,8 @@ public class ExecutionVertex implements Serializable { Execution execution = currentExecution; ExecutionState state = execution.getState(); - if (state == FINISHED || state == CANCELED || state == FAILED) { + if (state == ExecutionState.FINISHED || state == ExecutionState.CANCELED + || state == ExecutionState.FAILED) { priorExecutions.add(execution); currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis(), timeout); @@ -394,7 +390,7 @@ public class ExecutionVertex implements Serializable { if(operatorState!=null) { - execution.setOperatorStates(operatorState); + execution.setOperatorState(operatorState); } } @@ -440,8 +436,9 @@ public class ExecutionVertex implements Serializable { ExecutionState state = execution.getState(); // sanity check - if (!(state == FINISHED || state == CANCELED || state == FAILED)) { - throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); + if (!(state == ExecutionState.FINISHED || state == ExecutionState.CANCELED || state == ExecutionState.FAILED)) { + throw new IllegalStateException( + "Cannot archive ExecutionVertex that is not in a finished state."); } // prepare the current execution for archiving http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java index c56da62..0a8642e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java @@ -18,10 +18,24 @@ package org.apache.flink.runtime.jobgraph.tasks; +/** + * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for + * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called + * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should + * be implemented and used for acknowledging a specific checkpoint checkpoint. + */ public interface BarrierTransceiver { + /** + * A callback for notifying an operator of a new checkpoint barrier. + * @param barrierID + */ public void broadcastBarrier(long barrierID); - + + /** + * A callback for confirming that a barrier checkpoint is complete + * @param barrierID + */ public void confirmBarrier(long barrierID); } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java index e8b6d6b..670dc3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -18,12 +18,15 @@ package org.apache.flink.runtime.jobgraph.tasks; -import org.apache.flink.runtime.state.OperatorState; - -import java.util.Map; +import org.apache.flink.runtime.state.StateHandle; +/** + * This is an interface meant to be implemented by any invokable that has to support state recovery. + * It is mainly used by the TaskManager to identify operators that support state recovery in order + * to inject their initial state upon creation. + */ public interface OperatorStateCarrier { - - public void injectStates(Map<String, OperatorState<?>> state); - + + public void injectState(StateHandle stateHandle); + } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java new file mode 100644 index 0000000..ac40bf8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + + +import java.util.Map; + +/** + * A StateHandle that includes a copy of the state itself. This state handle is recommended for + * cases where the operatorState is lightweight enough to pass throughout the network. + * + */ +public class LocalStateHandle implements StateHandle{ + + private final Map<String, OperatorState<?>> state; + + public LocalStateHandle(Map<String,OperatorState<?>> state) { + this.state = state; + } + + @Override + public Map<String,OperatorState<?>> getState() { + return state; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java new file mode 100644 index 0000000..ddc8038 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + + +import java.io.Serializable; +import java.util.Map; + +/** + * StateHandle is a general handle interface meant to abstract operator state fetching. + * A StateHandle implementation can for example include the state itself in cases where the state + * is lightweight or fetching it lazily from some external storage when the state is too large. + * + */ +public interface StateHandle extends Serializable{ + + /** + * getState should retrieve and return the state managed the handle. + * + * @return + */ + public Map<String,OperatorState<?>> getState(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala index 7ab6a6f..fee69b5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala @@ -26,38 +26,47 @@ import org.apache.flink.runtime.execution.ExecutionState.RUNNING import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} import org.apache.flink.runtime.jobgraph.JobStatus._ import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} -import org.apache.flink.runtime.state.OperatorState +import org.apache.flink.runtime.state.StateHandle import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.{FiniteDuration, _} -object StreamCheckpointCoordinator { - - def spawn(context: ActorContext,executionGraph: ExecutionGraph, - interval: FiniteDuration = 5 seconds): ActorRef = { - - val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) - val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph, - vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), - List.empty[Long])).toMap, Map() ,interval,0L,-1L))) - monitor ! InitBarrierScheduler - monitor - } - - private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { - for((_,execJobVertex) <- executionGraph.getAllVertices; - execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) - yield execVertex - } -} +/** + * The StreamCheckpointCoordinator is responsible for operator state management and checkpoint + * coordination in streaming jobs. It periodically sends checkpoint barriers to the sources of a + * running job and constantly collects acknowledgements from operators while the barriers are being + * disseminated throughout the execution graph. Upon time intervals it finds the last globally + * acknowledged checkpoint barrier to be used for a consistent recovery and loads all associated + * state handles to the respected execution vertices. + * + * The following messages describe this actor's expected behavior: + * + * - [[InitBarrierScheduler]] initiates the actor and schedules the periodic [[BarrierTimeout]] + * and [[CompactAndUpdate]] messages that are used for maintaining the state checkpointing logic. + * + * - [[BarrierTimeout]] is periodically triggered upon initiation in order to start a new + * checkpoint barrier. That is when the barriers are being disseminated to the source vertices. + * + * - [[BarrierAck]] is being sent by each operator upon the completion of a state checkpoint. All + * such acknowledgements are being collected and inspected upon [[CompactAndUpdate]] handling in + * order to find out the last consistent checkpoint. + * + * - [[StateBarrierAck]] describes an acknowledgement such as the case of a [[BarrierAck]] that + * additionally carries operatorState with it. + * + * - [[CompactAndUpdate]] marks the last globally consistent checkpoint barrier to be used for + * recovery purposes and removes all older states and acknowledgements up to that barrier. + * Furthermore, it updates the current ExecutionGraph with the current operator state handles + * + */ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID,Int),List[Long]], var states: Map[(JobVertexID, Integer, Long), - java.util.Map[String,OperatorState[_]]], + StateHandle], val interval: FiniteDuration,var curId: Long,var ackId: Long) extends Actor with ActorLogMessages with ActorLogging { @@ -95,8 +104,6 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, } log.debug(acks.toString) - - case CompactAndUpdate => val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) @@ -108,7 +115,26 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, executionGraph.loadOperatorStates(states) } - +} + +object StreamCheckpointCoordinator { + + def spawn(context: ActorContext,executionGraph: ExecutionGraph, + interval: FiniteDuration = 5 seconds): ActorRef = { + + val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) + val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph, + vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), + List.empty[Long])).toMap, Map() ,interval,0L,-1L))) + monitor ! InitBarrierScheduler + monitor + } + + private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { + for((_,execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + yield execVertex + } } case class BarrierTimeout() @@ -122,7 +148,7 @@ case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, - checkpointID: Long, states: java.util.Map[String,OperatorState[_]]) + checkpointID: Long, states: StateHandle) http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 3de917b..53c45ce 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -443,7 +443,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, { val vertex = task.getEnvironment.getInvokable match { case opStateCarrier: OperatorStateCarrier => - opStateCarrier.injectStates(tdd.getOperatorStates) + opStateCarrier.injectState(tdd.getOperatorStates) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f2b5c21d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index eb0d6ed..3548712 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.jobmanager.BarrierAck; import org.apache.flink.runtime.jobmanager.StateBarrierAck; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.StreamConfig; @@ -112,7 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa if (configuration.getStateMonitoring() && !states.isEmpty()) { getEnvironment().getJobManager().tell( new StateBarrierAck(getEnvironment().getJobID(), getEnvironment() - .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, states), + .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, + new LocalStateHandle(states)), ActorRef.noSender()); } else { getEnvironment().getJobManager().tell( @@ -284,8 +287,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa * Re-injects the user states into the map */ @Override - public void injectStates(Map<String, OperatorState<?>> states) { - this.states.putAll(states); + public void injectState(StateHandle stateHandle) { + this.states.putAll(stateHandle.getState()); } private class SuperstepEventListener implements EventListener<TaskEvent> {