Repository: flink Updated Branches: refs/heads/master b4e8350f5 -> 2bba2b3f0
[FLINK-1638] [streaming] Operator state checkpointing and injection prototype Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452c39a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452c39a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452c39a9 Branch: refs/heads/master Commit: 452c39a965d93aab84d2fea84345badd2cc45975 Parents: a34869c Author: Paris Carbone <seniorcarb...@gmail.com> Authored: Wed Mar 4 18:30:09 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:48 2015 +0100 ---------------------------------------------------------------------- .../deployment/TaskDeploymentDescriptor.java | 26 +++++ .../flink/runtime/executiongraph/Execution.java | 11 ++ .../runtime/executiongraph/ExecutionGraph.java | 10 ++ .../runtime/executiongraph/ExecutionVertex.java | 19 +++- .../api/reader/AbstractRecordReader.java | 5 +- .../jobgraph/tasks/OperatorStateCarrier.java | 27 +++++ .../flink/runtime/state/OperatorState.java | 103 ++++++++++++++++++ .../flink/runtime/state/StateCheckpoint.java | 80 ++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 38 +++++-- .../runtime/jobmanager/StreamStateMonitor.scala | 78 +++++++++----- .../flink/runtime/taskmanager/TaskManager.scala | 15 ++- .../flink/streaming/api/StreamConfig.java | 2 +- .../apache/flink/streaming/api/StreamGraph.java | 2 +- .../datastream/SingleOutputStreamOperator.java | 2 +- .../api/streamvertex/StreamVertex.java | 53 ++++++++-- .../streamvertex/StreamingRuntimeContext.java | 2 +- .../apache/flink/streaming/state/MapState.java | 4 +- .../flink/streaming/state/OperatorState.java | 105 ------------------- .../streaming/state/PartitionableState.java | 2 + .../flink/streaming/state/SimpleState.java | 3 +- .../state/checkpoint/MapCheckpoint.java | 3 +- .../state/checkpoint/StateCheckpoint.java | 82 --------------- .../flink/streaming/state/MapStateTest.java | 2 +- .../streaming/state/OperatorStateTest.java | 3 +- .../streaming/examples/wordcount/WordCount.java | 5 + 25 files changed, 439 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 20204d5..a431a76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.OperatorState; import java.io.Serializable; import java.util.ArrayList; @@ -76,6 +77,8 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List<BlobKey> requiredJarFiles; + + private OperatorState operatorState; /** * Constructs a task deployment descriptor. @@ -119,6 +122,21 @@ public final class TaskDeploymentDescriptor implements Serializable { this.requiredJarFiles = new ArrayList<BlobKey>(); } + public TaskDeploymentDescriptor( + JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName, + int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration, + Configuration taskConfiguration, String invokableClassName, + List<PartitionDeploymentDescriptor> producedPartitions, + List<PartitionConsumerDeploymentDescriptor> consumedPartitions, + List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) { + + this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, + jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, + consumedPartitions, requiredJarFiles, targetSlotNumber); + + setOperatorState(operatorState); + } + /** * Returns the ID of the job the tasks belongs to. */ @@ -224,4 +242,12 @@ public final class TaskDeploymentDescriptor implements Serializable { taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName, strProducedPartitions, strConsumedPartitions); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 57ed4c0..89f5183 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -122,6 +123,8 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution + private OperatorState operatorState; + // -------------------------------------------------------------------------------------------- public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { @@ -853,4 +856,12 @@ public class Execution implements Serializable { return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e6d9c85..bf34e33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple3; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -529,6 +531,14 @@ public class ExecutionGraph implements Serializable { return false; } } + + public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states) + { + for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet()) + { + tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); + } + } public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) { Execution execution = currentExecutions.get(executionId); http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 0158fbf..b7f962a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.state.OperatorState; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -89,6 +90,8 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; + private OperatorState operatorState; + // -------------------------------------------------------------------------------------------- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, @@ -194,6 +197,14 @@ public class ExecutionVertex implements Serializable { public InstanceConnectionInfo getCurrentAssignedResourceLocation() { return currentExecution.getAssignedResourceLocation(); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); @@ -379,6 +390,12 @@ public class ExecutionVertex implements Serializable { if (grp != null) { this.locationConstraint = grp.getLocationConstraint(subTaskIndex); } + + if(operatorState!=null) + { + execution.setOperatorState(operatorState); + } + } else { throw new IllegalStateException("Cannot reset a vertex that is in state " + state); @@ -506,7 +523,7 @@ public class ExecutionVertex implements Serializable { return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(), - producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber()); + producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index cc36438..920792c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,10 +19,7 @@ package org.apache.flink.runtime.io.network.api.reader; import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; + import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.event.task.AbstractEvent; http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java new file mode 100644 index 0000000..6ea4f27 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.runtime.state.OperatorState; + +public interface OperatorStateCarrier { + + public void injectState(OperatorState state); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java new file mode 100644 index 0000000..74ea1a7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Serializable; + +/** + * Abstract class for representing operator states in Flink programs. By + * implementing the methods declared in this abstraction the state of the + * operator can be checkpointed by the fault tolerance mechanism. + * + * @param <T> + * The type of the operator state. + */ +public abstract class OperatorState<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + public T state; + + /** + * Constructor used for initializing the state. In case of failure, the + * state will be reinitialized using this constructor, then + * {@link #restore(StateCheckpoint)} will be used to restore from the last + * available backup. + */ + public OperatorState() { + state = null; + } + + /** + * Initializes the state using the given state object. + * + * @param initialState + * The initial state object + */ + public OperatorState(T initialState) { + state = initialState; + } + + /** + * Returns the currently stored state object. + * + * @return The state. + */ + public T getState() { + return state; + } + + /** + * Sets the current state object. + * + * @param state + * The new state object. + * @return The operator state with the new state object set. + */ + public OperatorState<T> setState(T state) { + this.state = state; + return this; + } + + /** + * Creates a {@link StateCheckpoint} that will be used to backup the state + * for failure recovery. + * + * @return The {@link StateCheckpoint} created. + */ + public abstract StateCheckpoint<T> checkpoint(); + + /** + * Restores the state from the given {@link StateCheckpoint}. + * + * @param checkpoint + * The checkpoint to restore from + * @return The restored operator. + */ + public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint); + + @Override + public String toString() { + return state.toString(); + } + + public boolean stateEquals(OperatorState<T> other) { + return state.equals(other.state); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java new file mode 100644 index 0000000..4e4906e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Serializable; + +/** + * Base class for creating checkpoints for {@link OperatorState}. This + * checkpoints will be used to backup states in stateful Flink operators and + * also to restore them in case of node failure. To allow incremental + * checkpoints override the {@link #update(StateCheckpoint)} method. + * + * @param <T> + * The type of the state. + */ +public class StateCheckpoint<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + public T checkpointedState; + + /** + * Creates a state checkpoint from the given {@link OperatorState} + * + * @param operatorState + * The {@link OperatorState} to checkpoint. + */ + public StateCheckpoint(OperatorState<T> operatorState) { + this.checkpointedState = operatorState.getState(); + } + + public StateCheckpoint() { + this.checkpointedState = null; + } + + /** + * Returns the state object for the checkpoint. + * + * @return The checkpointed state object. + */ + public T getCheckpointedState() { + return checkpointedState; + } + + /** + * Updates the checkpoint from next one. Override this method to allow + * incremental updates. + * + * @param nextCheckpoint + * The {@link StateCheckpoint} will be used to update from. + */ + public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) { + this.checkpointedState = nextCheckpoint.getCheckpointedState(); + return this; + } + + @Override + public String toString() { + return checkpointedState.toString(); + } + + public boolean stateEquals(StateCheckpoint<T> other) { + return checkpointedState.equals(other.checkpointedState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4f58ba7..97a6099 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -301,13 +301,19 @@ class JobManager(val configuration: Configuration, jobInfo.client ! Failure(exception) throw exception } + barrierMonitors.get(jobID) match { case Some(monitor) => - monitor ! PoisonPill - barrierMonitors.remove(jobID) + newJobStatus match{ + case JobStatus.FINISHED | JobStatus.CANCELED => + monitor ! PoisonPill + barrierMonitors.remove(jobID) + case JobStatus.FAILING => + monitor ! JobStateRequest + } case None => + removeJob(jobID) } - removeJob(jobID) } else { newJobStatus match { @@ -315,7 +321,10 @@ class JobManager(val configuration: Configuration, case Some((executionGraph, _)) => //FIXME this is just a fast n dirty check for determining streaming jobs if (executionGraph.getScheduleMode == ScheduleMode.ALL) { - barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) + barrierMonitors.get(jobID) match { + case None => + barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) + } } case None => log.error("Cannot create state monitor for job ID {}.", jobID) @@ -327,12 +336,27 @@ class JobManager(val configuration: Configuration, removeJob(jobID) } - case BarrierAck(jobID, jobVertex, instanceID, checkpoint) => - barrierMonitors.get(jobID) match { - case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, instanceID, checkpoint) + case msg: BarrierAck => + barrierMonitors.get(msg.jobID) match { + case Some(monitor) => monitor ! msg + case None => + } + case msg: StateBarrierAck => + barrierMonitors.get(msg.jobID) match { + case Some(monitor) => monitor ! msg case None => } + case msg: JobStateResponse => + //inject initial states and restart the job + currentJobs.get(msg.jobID) match { + case Some(jobExecution) => + import scala.collection.JavaConverters._ + jobExecution._1.loadOperatorStates(msg.opStates.asJava) + jobExecution._1.restart() + case None => + } + case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala index a37ddb5..65840f9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala @@ -20,65 +20,82 @@ package org.apache.flink.runtime.jobmanager import akka.actor._ import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} -import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex} +import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID} +import org.apache.flink.runtime.state.OperatorState -import scala.collection.JavaConversions.mapAsScalaMap +import java.lang.Long +import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.duration.{FiniteDuration,_} object StreamStateMonitor { - def props(context: ActorContext, executionGraph: ExecutionGraph, + def props(context: ActorContext,executionGraph: ExecutionGraph, interval: FiniteDuration = 5 seconds): ActorRef = { val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, - vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L))) + vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), + List.empty[Long])).toMap,Map(),interval,0L,-1L))) monitor ! InitBarrierScheduler monitor } private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { - for ((_, execJobVertex) <- executionGraph.getAllVertices; - execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + for((_,execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) yield execVertex } } class StreamStateMonitor(val executionGraph: ExecutionGraph, - val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]], - val interval: FiniteDuration, var curId: Long, var ackId: Long) + val vertices: Iterable[ExecutionVertex], + var acks: Map[(JobVertexID,Int),List[Long]], + var states: Map[(JobVertexID, Integer, Long), OperatorState[_]], + val interval: FiniteDuration,var curId: Long,var ackId: Long) extends Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { + case InitBarrierScheduler => - context.system.scheduler.schedule(interval, interval, self, BarrierTimeout) - context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier) + context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) + context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction) log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", - executionGraph.getJobID, executionGraph.getJobName) + executionGraph.getJobID,executionGraph.getJobName) + case BarrierTimeout => curId += 1 log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex => vertex.getCurrentAssignedResource.getInstance.getTaskManager - ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId)) - case BarrierAck(_, jobVertexID, instanceID, checkpointID) => - acks.get(jobVertexID, instanceID) match { + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) + + case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => + states += (jobVertexID, instanceID, checkpointID) -> opState + self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID) + + case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) => + acks.get(jobVertexID,instanceID) match { case Some(acklist) => - acks += (jobVertexID, instanceID) -> (checkpointID :: acklist) + acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) case None => } - log.info(acks.toString) - case UpdateCurrentBarrier => - val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList) - => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1))) + log.debug(acks.toString) + + case TriggerBarrierCompaction => + val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) + => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) val keysToKeep = barrierCount.filter(_._2 == acks.size).keys - ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId - acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId))) + ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId + acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId))) + states = states.filterKeys(_._3 >= ackId) log.debug("[FT-MONITOR] Last global barrier is " + ackId) + + case JobStateRequest => + sender ! JobStateResponse(executionGraph.getJobID, ackId, states) } } @@ -86,11 +103,20 @@ case class BarrierTimeout() case class InitBarrierScheduler() -case class UpdateCurrentBarrier() +case class TriggerBarrierCompaction() + +case class JobStateRequest() + +case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, + Long), OperatorState[_]]) + +case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) -case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long) +case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) -case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long) +case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, + checkpointID: Long, state: OperatorState[_]) + http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4271141..497e784 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobgraph.IntermediateDataSetID -import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver +import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver} import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager} import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState @@ -358,7 +358,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, if (i.getExecutionState == ExecutionState.RUNNING) { i.getEnvironment.getInvokable match { case barrierTransceiver: BarrierTransceiver => - barrierTransceiver.broadcastBarrier(checkpointID) + new Thread(new Runnable { + override def run(): Unit = barrierTransceiver.broadcastBarrier(checkpointID); + }).start() case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex") } } @@ -415,6 +417,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID, tdd.getTaskName, self) + //inject operator state + if(tdd.getOperatorState != null) + { + val vertex = task.getEnvironment.getInvokable match { + case opStateCarrier: OperatorStateCarrier => + opStateCarrier.injectState(tdd.getOperatorState) + } + } + runningTasks.put(executionID, task) match { case Some(_) => throw new RuntimeException( s"TaskManager contains already a task with executionID $executionID.") http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index d464ef1..7c90629 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.util.InstantiationUtil; public class StreamConfig implements Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index f69605b..640416d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.api.streamvertex.StreamVertex; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; import org.apache.sling.commons.json.JSONArray; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index cdf43ee..b0fc364 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; /** * The SingleOutputStreamOperator represents a user defined transformation http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index e2cdc34..24a90d0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -25,15 +25,17 @@ import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; +import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.jobmanager.BarrierAck; +import org.apache.flink.runtime.jobmanager.StateBarrierAck; import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.CoReaderIterator; import org.apache.flink.streaming.io.IndexedReaderIterator; -import org.apache.flink.streaming.state.OperatorState; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.StringUtils; @@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory; import akka.actor.ActorRef; public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>, - BarrierTransceiver { + BarrierTransceiver, OperatorStateCarrier { private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class); @@ -90,9 +92,27 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } + protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception { + userInvokable.setRuntimeContext(context); + userInvokable.open(getTaskConfiguration()); + + for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { + invokable.setRuntimeContext(context); + invokable.open(getTaskConfiguration()); + } + + userInvokable.invoke(); + userInvokable.close(); + + for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { + invokable.close(); + } + + } + @Override public void broadcastBarrier(long id) { - // Only called at input vertices + //Only called at input vertices if (LOG.isDebugEnabled()) { LOG.debug("Received barrier from jobmanager: " + id); } @@ -101,9 +121,21 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa @Override public void confirmBarrier(long barrierID) { - getEnvironment().getJobManager().tell( - new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), - context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + + if(states != null && states.containsKey("kafka")) + { + getEnvironment().getJobManager().tell( + new StateBarrierAck(getEnvironment().getJobID(), + getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), + barrierID, states.get("kafka")), ActorRef.noSender()); + } + else + { + getEnvironment().getJobManager().tell( + new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), + context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + } + } public void setInputsOutputs() { @@ -240,7 +272,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa private void actOnBarrier(long id) { try { outputHandler.broadcastBarrier(id); - System.out.println("Superstep " + id + " processed: " + StreamVertex.this); + //TODO checkpoint state here + confirmBarrier(id); if (LOG.isDebugEnabled()) { LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); } @@ -256,6 +289,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")"; } + @Override + public void injectState(OperatorState state) { + states.put("kafka", state); + } + + private class SuperstepEventListener implements EventListener<TaskEvent> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 0daf3c2..a47100b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -27,7 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; /** * Implementation of the {@link RuntimeContext}, created by runtime stream UDF http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java index 85aec52..1b861f5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java @@ -23,8 +23,10 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.state.checkpoint.MapCheckpoint; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.StateCheckpoint; + /** * A Map that can be used as a partitionable operator state, for both fault http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java deleted file mode 100644 index a0cedba..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state; - -import java.io.Serializable; - -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; - -/** - * Abstract class for representing operator states in Flink programs. By - * implementing the methods declared in this abstraction the state of the - * operator can be checkpointed by the fault tolerance mechanism. - * - * @param <T> - * The type of the operator state. - */ -public abstract class OperatorState<T> implements Serializable { - - private static final long serialVersionUID = 1L; - - protected T state; - - /** - * Constructor used for initializing the state. In case of failure, the - * state will be reinitialized using this constructor, then - * {@link #restore(StateCheckpoint)} will be used to restore from the last - * available backup. - */ - public OperatorState() { - state = null; - } - - /** - * Initializes the state using the given state object. - * - * @param initialState - * The initial state object - */ - public OperatorState(T initialState) { - state = initialState; - } - - /** - * Returns the currently stored state object. - * - * @return The state. - */ - public T getState() { - return state; - } - - /** - * Sets the current state object. - * - * @param state - * The new state object. - * @return The operator state with the new state object set. - */ - public OperatorState<T> setState(T state) { - this.state = state; - return this; - } - - /** - * Creates a {@link StateCheckpoint} that will be used to backup the state - * for failure recovery. - * - * @return The {@link StateCheckpoint} created. - */ - public abstract StateCheckpoint<T> checkpoint(); - - /** - * Restores the state from the given {@link StateCheckpoint}. - * - * @param checkpoint - * The checkpoint to restore from - * @return The restored operator. - */ - public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint); - - @Override - public String toString() { - return state.toString(); - } - - public boolean stateEquals(OperatorState<T> other) { - return state.equals(other.state); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java index c58c545..ddedcd9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.state; +import org.apache.flink.runtime.state.OperatorState; + /** * Base class for representing operator states that can be repartitioned for * state state and load balancing. http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java index 7ae1f81..b76f5ac 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java @@ -17,7 +17,8 @@ package org.apache.flink.streaming.state; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; /** * Basic {@link OperatorState} for storing and updating simple objects. By default the http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java index 15d1fd5..ee27d4f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java @@ -21,8 +21,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; import org.apache.flink.streaming.state.MapState; -import org.apache.flink.streaming.state.OperatorState; public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> { http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java deleted file mode 100644 index 8b76245..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.state.checkpoint; - -import java.io.Serializable; - -import org.apache.flink.streaming.state.OperatorState; - -/** - * Base class for creating checkpoints for {@link OperatorState}. This - * checkpoints will be used to backup states in stateful Flink operators and - * also to restore them in case of node failure. To allow incremental - * checkpoints override the {@link #update(StateCheckpoint)} method. - * - * @param <T> - * The type of the state. - */ -public class StateCheckpoint<T> implements Serializable { - - private static final long serialVersionUID = 1L; - - T checkpointedState; - - /** - * Creates a state checkpoint from the given {@link OperatorState} - * - * @param operatorState - * The {@link OperatorState} to checkpoint. - */ - public StateCheckpoint(OperatorState<T> operatorState) { - this.checkpointedState = operatorState.getState(); - } - - public StateCheckpoint() { - this.checkpointedState = null; - } - - /** - * Returns the state object for the checkpoint. - * - * @return The checkpointed state object. - */ - public T getCheckpointedState() { - return checkpointedState; - } - - /** - * Updates the checkpoint from next one. Override this method to allow - * incremental updates. - * - * @param nextCheckpoint - * The {@link StateCheckpoint} will be used to update from. - */ - public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) { - this.checkpointedState = nextCheckpoint.getCheckpointedState(); - return this; - } - - @Override - public String toString() { - return checkpointedState.toString(); - } - - public boolean stateEquals(StateCheckpoint<T> other) { - return checkpointedState.equals(other.checkpointedState); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java index 194403c..98bafe4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.streaming.state.checkpoint.MapCheckpoint; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.StateCheckpoint; import org.junit.Test; public class MapStateTest { http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java index 6cb8f51..4e07a3f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java @@ -20,7 +20,8 @@ package org.apache.flink.streaming.state; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; import org.junit.Test; public class OperatorStateTest { http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index c207d60..b7a1ba3 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -101,6 +101,11 @@ public class WordCount { // emit the pairs for (String token : tokens) { + //FIXME to be removed. added this for test purposes + if("killme".equals(token)) + { + throw new Exception("byee"); + } if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); }