[FLINK-1638] [streaming] At-Least once monitoring semantics added and bug fixes
Fault Tolerance monitor suicide on ExecutionGraph terminal state Forwarding messages to StreamCheckpointCoordinator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37390d63 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37390d63 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37390d63 Branch: refs/heads/master Commit: 37390d6322ab984f17dd257f7a9939311c81edf5 Parents: ed5ba95 Author: Paris Carbone <seniorcarb...@gmail.com> Authored: Fri Mar 6 15:10:12 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../deployment/TaskDeploymentDescriptor.java | 15 ++- .../runtime/event/task/StreamingSuperstep.java | 51 -------- .../flink/runtime/executiongraph/Execution.java | 11 +- .../runtime/executiongraph/ExecutionGraph.java | 45 ++++++- .../runtime/executiongraph/ExecutionVertex.java | 9 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 33 +++++ .../jobgraph/tasks/OperatorStateCarrier.java | 4 +- .../flink/runtime/jobmanager/JobManager.scala | 70 +++------- .../StreamCheckpointCoordinator.scala | 129 +++++++++++++++++++ .../runtime/jobmanager/StreamStateMonitor.scala | 122 ------------------ .../flink/runtime/taskmanager/TaskManager.scala | 21 +-- .../connectors/kafka/KafkaConsumerExample.java | 5 - .../kafka/api/simple/KafkaConsumerIterator.java | 5 +- .../KafkaDeserializingConsumerIterator.java | 1 + .../flink/streaming/api/StreamConfig.java | 14 ++ .../apache/flink/streaming/api/StreamGraph.java | 20 +++ .../api/StreamingJobGraphGenerator.java | 9 +- .../environment/StreamExecutionEnvironment.java | 13 ++ .../source/SocketTextStreamFunction.java | 1 - .../api/streamvertex/InputHandler.java | 1 - .../api/streamvertex/OutputHandler.java | 1 - .../api/streamvertex/StreamVertex.java | 17 ++- .../streamvertex/StreamingRuntimeContext.java | 9 +- .../api/streamvertex/StreamingSuperstep.java | 52 ++++++++ .../flink/streaming/io/BarrierBuffer.java | 74 +++++++++-- .../flink/streaming/io/CoRecordReader.java | 11 +- .../io/StreamingAbstractRecordReader.java | 4 +- .../flink/streaming/io/BarrierBufferTest.java | 2 +- .../streaming/examples/wordcount/WordCount.java | 5 - 29 files changed, 452 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index a431a76..b2573f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorState; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -78,7 +79,7 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List<BlobKey> requiredJarFiles; - private OperatorState operatorState; + private Map<String, OperatorState<?>> operatorStates; /** * Constructs a task deployment descriptor. @@ -128,13 +129,13 @@ public final class TaskDeploymentDescriptor implements Serializable { Configuration taskConfiguration, String invokableClassName, List<PartitionDeploymentDescriptor> producedPartitions, List<PartitionConsumerDeploymentDescriptor> consumedPartitions, - List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) { + List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) { this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, consumedPartitions, requiredJarFiles, targetSlotNumber); - setOperatorState(operatorState); + setOperatorStates(operatorStates); } /** @@ -243,11 +244,11 @@ public final class TaskDeploymentDescriptor implements Serializable { strProducedPartitions, strConsumedPartitions); } - public void setOperatorState(OperatorState operatorState) { - this.operatorState = operatorState; + public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { + this.operatorStates = operatorStates; } - public OperatorState getOperatorState() { - return operatorState; + public Map<String, OperatorState<?>> getOperatorStates() { + return operatorStates; } } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java deleted file mode 100644 index e35eb28..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.event.task; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class StreamingSuperstep extends TaskEvent { - - protected long id; - - public StreamingSuperstep() { - - } - - public StreamingSuperstep(long id) { - this.id = id; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeLong(id); - } - - @Override - public void read(DataInputView in) throws IOException { - id = in.readLong(); - } - - public long getId() { - return id; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 89f5183..93845c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; @@ -123,7 +124,7 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - private OperatorState operatorState; + private Map<String,OperatorState<?>> operatorStates; // -------------------------------------------------------------------------------------------- @@ -857,11 +858,11 @@ public class Execution implements Serializable { (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } - public void setOperatorState(OperatorState operatorState) { - this.operatorState = operatorState; + public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { + this.operatorStates = operatorStates; } - public OperatorState getOperatorState() { - return operatorState; + public Map<String,OperatorState<?>> getOperatorStates() { + return operatorStates; } } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index bf34e33..0c6c3a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.executiongraph; +import akka.actor.ActorContext; import akka.actor.ActorRef; - import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.state.OperatorState; @@ -38,8 +39,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple3; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -52,6 +53,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static akka.dispatch.Futures.future; @@ -118,6 +120,14 @@ public class ExecutionGraph implements Serializable { private boolean allowQueuedScheduling = true; + private ActorContext parentContext; + + private ActorRef stateMonitorActor; + + private boolean monitoringEnabled; + + private long monitoringInterval = 10000; + private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; @@ -159,6 +169,18 @@ public class ExecutionGraph implements Serializable { } // -------------------------------------------------------------------------------------------- + + public void setStateMonitorActor(ActorRef stateMonitorActor) { + this.stateMonitorActor = stateMonitorActor; + } + + public ActorRef getStateMonitorActor() { + return stateMonitorActor; + } + + public void setParentContext(ActorContext parentContext) { + this.parentContext = parentContext; + } public void setNumberOfRetriesLeft(int numberOfRetriesLeft) { if (numberOfRetriesLeft < -1) { @@ -214,6 +236,14 @@ public class ExecutionGraph implements Serializable { } } + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public void setMonitoringInterval(long monitoringInterval) { + this.monitoringInterval = monitoringInterval; + } + /** * Returns a list of BLOB keys referring to the JAR files required to run this job * @return list of BLOB keys referring to the JAR files required to run this job @@ -361,12 +391,17 @@ public class ExecutionGraph implements Serializable { for (ExecutionJobVertex ejv : getVerticesTopologically()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); } - break; case BACKTRACKING: throw new JobException("BACKTRACKING is currently not supported as schedule mode."); } + + if(monitoringEnabled) + { + stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this, + Duration.create(monitoringInterval, TimeUnit.MILLISECONDS)); + } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); @@ -532,9 +567,9 @@ public class ExecutionGraph implements Serializable { } } - public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states) + public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states) { - for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet()) + for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet()) { tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index b7f962a..41d34d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -90,7 +91,7 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; - private OperatorState operatorState; + private Map<String,OperatorState<?>> operatorState; // -------------------------------------------------------------------------------------------- @@ -198,11 +199,11 @@ public class ExecutionVertex implements Serializable { return currentExecution.getAssignedResourceLocation(); } - public void setOperatorState(OperatorState operatorState) { + public void setOperatorState(Map<String,OperatorState<?>> operatorState) { this.operatorState = operatorState; } - public OperatorState getOperatorState() { + public Map<String,OperatorState<?>> getOperatorState() { return operatorState; } @@ -393,7 +394,7 @@ public class ExecutionVertex implements Serializable { if(operatorState!=null) { - execution.setOperatorState(operatorState); + execution.setOperatorStates(operatorState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 0cf2f5e..4b398e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -75,6 +75,14 @@ public class JobGraph implements Serializable { private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + public enum JobType {STREAMING, BATCH} + + private JobType jobType = JobType.BATCH; + + private boolean monitoringEnabled = false; + + private long monitorInterval = 10000; + // -------------------------------------------------------------------------------------------- /** @@ -253,6 +261,31 @@ public class JobGraph implements Serializable { return this.taskVertices.size(); } + + public void setJobType(JobType jobType) { + this.jobType = jobType; + } + + public JobType getJobType() { + return jobType; + } + + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public boolean isMonitoringEnabled() { + return monitoringEnabled; + } + + public void setMonitorInterval(long monitorInterval) { + this.monitorInterval = monitorInterval; + } + + public long getMonitorInterval() { + return monitorInterval; + } + /** * Searches for a vertex with a matching ID and returns it. * http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java index 6ea4f27..e8b6d6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.state.OperatorState; +import java.util.Map; + public interface OperatorStateCarrier { - public void injectState(OperatorState state); + public void injectStates(Map<String, OperatorState<?>> state); } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 97a6099..c350680 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -104,7 +104,6 @@ class JobManager(val configuration: Configuration, /** List of current jobs running jobs */ val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() - val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]() /** * Run when the job manager is started. Simply logs an informational message. @@ -284,78 +283,43 @@ class JobManager(val configuration: Configuration, if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp - // is the client waiting for the job result? + // is the client waiting for the job result? newJobStatus match { case JobStatus.FINISHED => val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) - jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults) + jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults) case JobStatus.CANCELED => jobInfo.client ! Failure(new JobCancellationException(jobID, - "Job was cancelled.", error)) + "Job was cancelled.",error)) case JobStatus.FAILED => jobInfo.client ! Failure(new JobExecutionException(jobID, - "Job execution failed.", error)) + "Job execution failed.",error)) case x => - val exception = new JobExecutionException(jobID, s"$x is not a " + - "terminal state.") + val exception = new JobExecutionException(jobID,s"$x is not a " + + "terminal state.") jobInfo.client ! Failure(exception) throw exception } + + removeJob(jobID) - barrierMonitors.get(jobID) match { - case Some(monitor) => - newJobStatus match{ - case JobStatus.FINISHED | JobStatus.CANCELED => - monitor ! PoisonPill - barrierMonitors.remove(jobID) - case JobStatus.FAILING => - monitor ! JobStateRequest - } - case None => - removeJob(jobID) - } - } - else { - newJobStatus match { - case JobStatus.RUNNING => currentJobs.get(jobID) match { - case Some((executionGraph, _)) => - //FIXME this is just a fast n dirty check for determining streaming jobs - if (executionGraph.getScheduleMode == ScheduleMode.ALL) { - barrierMonitors.get(jobID) match { - case None => - barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) - } - } - case None => - log.error("Cannot create state monitor for job ID {}.", jobID) - new IllegalStateException("Cannot find execution graph for job ID " + jobID) - } - } } case None => removeJob(jobID) - } + } case msg: BarrierAck => - barrierMonitors.get(msg.jobID) match { - case Some(monitor) => monitor ! msg + currentJobs.get(msg.jobID) match { + case Some(jobExecution) => + jobExecution._1.getStateMonitorActor forward msg case None => } case msg: StateBarrierAck => - barrierMonitors.get(msg.jobID) match { - case Some(monitor) => monitor ! msg - case None => - } - - case msg: JobStateResponse => - //inject initial states and restart the job currentJobs.get(msg.jobID) match { case Some(jobExecution) => - import scala.collection.JavaConverters._ - jobExecution._1.loadOperatorStates(msg.opStates.asJava) - jobExecution._1.restart() + jobExecution._1.getStateMonitorActor forward msg case None => - } + } case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { @@ -522,6 +486,9 @@ class JobManager(val configuration: Configuration, executionGraph.setDelayBeforeRetrying(delayBetweenRetries) executionGraph.setScheduleMode(jobGraph.getScheduleMode) executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) + + executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled) + executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval) // initialize the vertices that have a master initialization hook // file output formats create directories here, input formats create splits @@ -564,6 +531,9 @@ class JobManager(val configuration: Configuration, log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).") } + // give an actorContext + executionGraph.setParentContext(context); + // get notified about job status changes executionGraph.registerJobStatusListener(self) http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala new file mode 100644 index 0000000..7ab6a6f --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import java.lang.Long + +import akka.actor._ +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.execution.ExecutionState.RUNNING +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} +import org.apache.flink.runtime.jobgraph.JobStatus._ +import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} +import org.apache.flink.runtime.state.OperatorState + +import scala.collection.JavaConversions._ +import scala.collection.immutable.TreeMap +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{FiniteDuration, _} + +object StreamCheckpointCoordinator { + + def spawn(context: ActorContext,executionGraph: ExecutionGraph, + interval: FiniteDuration = 5 seconds): ActorRef = { + + val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) + val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph, + vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), + List.empty[Long])).toMap, Map() ,interval,0L,-1L))) + monitor ! InitBarrierScheduler + monitor + } + + private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { + for((_,execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + yield execVertex + } +} + +class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, + val vertices: Iterable[ExecutionVertex], + var acks: Map[(JobVertexID,Int),List[Long]], + var states: Map[(JobVertexID, Integer, Long), + java.util.Map[String,OperatorState[_]]], + val interval: FiniteDuration,var curId: Long,var ackId: Long) + extends Actor with ActorLogMessages with ActorLogging { + + override def receiveWithLogMessages: Receive = { + + case InitBarrierScheduler => + context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) + context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate) + log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", + executionGraph.getJobID,executionGraph.getJobName) + + case BarrierTimeout => + executionGraph.getState match { + case FAILED | CANCELED | FINISHED => + log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", executionGraph.getJobID) + self ! PoisonPill + case _ => + curId += 1 + log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) + vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex && + v.getExecutionState == RUNNING).foreach(vertex + => vertex.getCurrentAssignedResource.getInstance.getTaskManager + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) + } + + case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => + states += (jobVertexID, instanceID, checkpointID) -> opState + self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID) + + case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) => + acks.get(jobVertexID,instanceID) match { + case Some(acklist) => + acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) + case None => + } + log.debug(acks.toString) + + + + case CompactAndUpdate => + val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) + => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) + val keysToKeep = barrierCount.filter(_._2 == acks.size).keys + ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId + acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId))) + states = states.filterKeys(_._3 >= ackId) + log.debug("[FT-MONITOR] Last global barrier is " + ackId) + executionGraph.loadOperatorStates(states) + + } + +} + +case class BarrierTimeout() + +case class InitBarrierScheduler() + +case class CompactAndUpdate() + +case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) + +case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) + +case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, + checkpointID: Long, states: java.util.Map[String,OperatorState[_]]) + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala deleted file mode 100644 index 65840f9..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor._ -import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex} -import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID} -import org.apache.flink.runtime.state.OperatorState - -import java.lang.Long -import scala.collection.JavaConversions._ -import scala.collection.immutable.TreeMap -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{FiniteDuration,_} - - -object StreamStateMonitor { - - def props(context: ActorContext,executionGraph: ExecutionGraph, - interval: FiniteDuration = 5 seconds): ActorRef = { - - val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) - val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, - vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), - List.empty[Long])).toMap,Map(),interval,0L,-1L))) - monitor ! InitBarrierScheduler - monitor - } - - private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { - for((_,execJobVertex) <- executionGraph.getAllVertices; - execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) - yield execVertex - } -} - -class StreamStateMonitor(val executionGraph: ExecutionGraph, - val vertices: Iterable[ExecutionVertex], - var acks: Map[(JobVertexID,Int),List[Long]], - var states: Map[(JobVertexID, Integer, Long), OperatorState[_]], - val interval: FiniteDuration,var curId: Long,var ackId: Long) - extends Actor with ActorLogMessages with ActorLogging { - - override def receiveWithLogMessages: Receive = { - - case InitBarrierScheduler => - context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) - context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction) - log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", - executionGraph.getJobID,executionGraph.getJobName) - - case BarrierTimeout => - curId += 1 - log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) - vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex - => vertex.getCurrentAssignedResource.getInstance.getTaskManager - ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) - - case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => - states += (jobVertexID, instanceID, checkpointID) -> opState - self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID) - - case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) => - acks.get(jobVertexID,instanceID) match { - case Some(acklist) => - acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) - case None => - } - log.debug(acks.toString) - - case TriggerBarrierCompaction => - val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) - => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) - val keysToKeep = barrierCount.filter(_._2 == acks.size).keys - ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId - acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId))) - states = states.filterKeys(_._3 >= ackId) - log.debug("[FT-MONITOR] Last global barrier is " + ackId) - - case JobStateRequest => - sender ! JobStateResponse(executionGraph.getJobID, ackId, states) - } -} - -case class BarrierTimeout() - -case class InitBarrierScheduler() - -case class TriggerBarrierCompaction() - -case class JobStateRequest() - -case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, - Long), OperatorState[_]]) - -case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) - -case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) - -case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, - checkpointID: Long, state: OperatorState[_]) - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 497e784..3de917b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -352,7 +352,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, } case BarrierReq(attemptID, checkpointID) => - log.debug("[FT-TaskManager] Barrier request received for attempt {}", attemptID) + log.debug("[FT-TaskManager] Barrier {} request received for attempt {}", + checkpointID, attemptID) runningTasks.get(attemptID) match { case Some(i) => if (i.getExecutionState == ExecutionState.RUNNING) { @@ -416,15 +417,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID, tdd.getTaskName, self) - - //inject operator state - if(tdd.getOperatorState != null) - { - val vertex = task.getEnvironment.getInvokable match { - case opStateCarrier: OperatorStateCarrier => - opStateCarrier.injectState(tdd.getOperatorState) - } - } runningTasks.put(executionID, task) match { case Some(_) => throw new RuntimeException( @@ -446,6 +438,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task.setEnvironment(env) + //inject operator state + if(tdd.getOperatorStates != null) + { + val vertex = task.getEnvironment.getInvokable match { + case opStateCarrier: OperatorStateCarrier => + opStateCarrier.injectStates(tdd.getOperatorStates) + } + } + // register the task with the network stack and profiles networkEnvironment match { case Some(ne) => http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index d9bb7d3..dd1221d 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -17,14 +17,9 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; -<<<<<<< HEAD -import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; -======= ->>>>>>> a62796a... s import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; public class KafkaConsumerExample { http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java index 92d351a..370b3f0 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.api.simple; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -36,13 +37,11 @@ import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Iterates the records received from a partition of a Kafka topic as byte arrays. */ -public class KafkaConsumerIterator { +public class KafkaConsumerIterator implements Serializable { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java index 6ca4c81..f2af6ca 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema; public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterator { + private static final long serialVersionUID = 1L; private DeserializationSchema<IN> deserializationSchema; public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 7c90629..d813a30 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -67,6 +67,7 @@ public class StreamConfig implements Serializable { // DEFAULT VALUES private static final long DEFAULT_TIMEOUT = 100; + public static final String STATE_MONITORING = "STATE_MONITORING"; // CONFIG METHODS @@ -300,6 +301,18 @@ public class StreamConfig implements Serializable { config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); } + + public void setStateMonitoring(boolean stateMonitoring) { + + config.setBoolean(STATE_MONITORING, stateMonitoring); + + } + + public boolean getStateMonitoring() + { + return config.getBoolean(STATE_MONITORING, false); + } + @SuppressWarnings("unchecked") public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader cl) { try { @@ -399,6 +412,7 @@ public class StreamConfig implements Serializable { builder.append("\nInvokable: Missing"); } builder.append("\nBuffer timeout: " + getBufferTimeout()); + builder.append("\nState Monitoring: " + getStateMonitoring()); if (isChainStart() && getChainedOutputs(cl).size() > 0) { builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n"); builder.append(getTransitiveChainedTaskConfigs(cl)).toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 641708e..8334aa1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -92,6 +92,10 @@ public class StreamGraph extends StreamingPlan { private Set<Integer> sources; private ExecutionConfig executionConfig; + + private boolean monitoringEnabled; + + private long monitoringInterval = 10000; public StreamGraph(ExecutionConfig executionConfig) { @@ -606,6 +610,22 @@ public class StreamGraph extends StreamingPlan { return operatorNames.get(vertexID); } + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public boolean isMonitoringEnabled() { + return monitoringEnabled; + } + + public void setMonitoringInterval(long monitoringInterval) { + this.monitoringInterval = monitoringInterval; + } + + public long getMonitoringInterval() { + return monitoringInterval; + } + @Override public String getStreamingPlanAsJSON() { http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index c9698e3..b50ac25 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -74,7 +74,13 @@ public class StreamingJobGraphGenerator { // Turn lazy scheduling off jobGraph.setScheduleMode(ScheduleMode.ALL); - + jobGraph.setJobType(JobGraph.JobType.STREAMING); + jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled()); + jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval()); + if(jobGraph.isMonitoringEnabled()) + { + jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE); + } init(); setChaining(); @@ -211,6 +217,7 @@ public class StreamingJobGraphGenerator { config.setNumberOfOutputs(nonChainableOutputs.size()); config.setOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); + config.setStateMonitoring(streamGraph.isMonitoringEnabled()); Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID); http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 835ce4e..9cc6131 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -151,6 +151,19 @@ public abstract class StreamExecutionEnvironment { this.bufferTimeout = timeoutMillis; return this; } + + public StreamExecutionEnvironment enableMonitoring(long interval) + { + streamGraph.setMonitoringEnabled(true); + streamGraph.setMonitoringInterval(interval); + return this; + } + + public StreamExecutionEnvironment enableMonitoring() + { + streamGraph.setMonitoringEnabled(true); + return this; + } /** * Sets the maximum time frequency (milliseconds) for the flushing of the http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java index 67bc128..d6a5b2b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java @@ -59,7 +59,6 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> { public void open(Configuration parameters) throws Exception { super.open(parameters); socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java index a95965c..d766705 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.streamvertex; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 82f1329..fd375f6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 24a90d0..5ff47d6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex; import java.io.IOException; import java.util.Map; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -65,6 +64,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected ClassLoader userClassLoader; private EventListener<TaskEvent> superstepListener; + + private boolean onRecovery; public StreamVertex() { userInvokable = null; @@ -88,7 +89,10 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected void initialize() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - this.states = configuration.getOperatorStates(userClassLoader); + if(!onRecovery) + { + this.states = configuration.getOperatorStates(userClassLoader); + } this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } @@ -122,12 +126,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa @Override public void confirmBarrier(long barrierID) { - if(states != null && states.containsKey("kafka")) + if(configuration.getStateMonitoring() && states != null) { getEnvironment().getJobManager().tell( new StateBarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), - barrierID, states.get("kafka")), ActorRef.noSender()); + barrierID, states), ActorRef.noSender()); } else { @@ -290,8 +294,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa } @Override - public void injectState(OperatorState state) { - states.put("kafka", state); + public void injectStates(Map<String,OperatorState<?>> states) { + onRecovery = true; + this.states.putAll(states); } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 60dfe7a..492d2a0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -71,7 +71,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { if (state == null) { throw new RuntimeException("Cannot register null state"); } else { - operatorStates.put(name, state); + if(operatorStates.containsKey(name)) + { + throw new RuntimeException("State is already registered"); + } + else + { + operatorStates.put(name, state); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java new file mode 100644 index 0000000..557c636 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.streamvertex; + +import java.io.IOException; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.task.TaskEvent; + +public class StreamingSuperstep extends TaskEvent { + + protected long id; + + public StreamingSuperstep() { + + } + + public StreamingSuperstep(long id) { + this.id = id; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(id); + } + + @Override + public void read(DataInputView in) throws IOException { + id = in.readLong(); + } + + public long getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java index 3ff718a..7dfccb0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java @@ -23,10 +23,10 @@ import java.util.LinkedList; import java.util.Queue; import java.util.Set; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +53,12 @@ public class BarrierBuffer { this.reader = reader; } + /** + * Starts the next superstep + * + * @param superstep + * The next superstep + */ protected void startSuperstep(StreamingSuperstep superstep) { this.currentSuperstep = superstep; this.superstepStarted = true; @@ -61,30 +67,53 @@ public class BarrierBuffer { } } + /** + * Buffers a bufferOrEvent received from a blocked channel + * + * @param bufferOrEvent + * bufferOrEvent to buffer + */ protected void store(BufferOrEvent bufferOrEvent) { nonprocessed.add(bufferOrEvent); } + /** + * Get then next non-blocked non-processed BufferOrEvent. Returns null if + * not available. + */ protected BufferOrEvent getNonProcessed() { - BufferOrEvent nextNonprocessed = null; - while (nextNonprocessed == null && !nonprocessed.isEmpty()) { - nextNonprocessed = nonprocessed.poll(); + BufferOrEvent nextNonprocessed; + while ((nextNonprocessed = nonprocessed.poll()) != null) { if (isBlocked(nextNonprocessed.getChannelIndex())) { blockedNonprocessed.add(nextNonprocessed); - nextNonprocessed = null; + } else { + return nextNonprocessed; } } - return nextNonprocessed; + return null; } + /** + * Checks whether a given channel index is blocked for this inputgate + * + * @param channelIndex + * The channel index to check + */ protected boolean isBlocked(int channelIndex) { return blockedChannels.contains(channelIndex); } + /** + * Checks whether all channels are blocked meaning that barriers are + * received from all channels + */ protected boolean isAllBlocked() { return blockedChannels.size() == totalNumberOfInputChannels; } + /** + * Returns the next non-blocked BufferOrEvent. This is a blocking operator. + */ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException { // If there are non-processed buffers from the previously blocked ones, // we get the next @@ -99,7 +128,7 @@ public class BarrierBuffer { bufferOrEvent = inputGate.getNextBufferOrEvent(); if (isBlocked(bufferOrEvent.getChannelIndex())) { // If channel blocked we just store it - store(bufferOrEvent); + blockedNonprocessed.add(bufferOrEvent); } else { return bufferOrEvent; } @@ -107,6 +136,12 @@ public class BarrierBuffer { } } + /** + * Blocks the given channel index, from which a barrier has been received. + * + * @param channelIndex + * The channel index to block. + */ protected void blockChannel(int channelIndex) { if (!blockedChannels.contains(channelIndex)) { blockedChannels.add(channelIndex); @@ -122,16 +157,27 @@ public class BarrierBuffer { } } + /** + * Releases the blocks on all channels. + */ protected void releaseBlocks() { - nonprocessed.addAll(blockedNonprocessed); + if (!nonprocessed.isEmpty()) { + // sanity check + throw new RuntimeException("Error in barrier buffer logic"); + } + nonprocessed = blockedNonprocessed; + blockedNonprocessed = new LinkedList<BufferOrEvent>(); blockedChannels.clear(); - blockedNonprocessed.clear(); superstepStarted = false; if (LOG.isDebugEnabled()) { LOG.debug("All barriers received, blocks released"); } } + /** + * Method that is executed once the barrier has been received from all + * channels. + */ protected void actOnAllBlocked() { if (LOG.isDebugEnabled()) { LOG.debug("Publishing barrier to the vertex"); @@ -140,10 +186,12 @@ public class BarrierBuffer { releaseBlocks(); } - public String toString() { - return blockedChannels.toString(); - } - + /** + * Processes a streaming superstep event + * + * @param bufferOrEvent + * The BufferOrEvent containing the event + */ public void processSuperstep(BufferOrEvent bufferOrEvent) { StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); if (!superstepStarted) { http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index 6a1f624..6c91f4d 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -19,11 +19,9 @@ package org.apache.flink.streaming.io; import java.io.IOException; import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; @@ -33,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; /** * A CoRecordReader wraps {@link MutableRecordReader}s of two different input @@ -66,8 +65,6 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable private CoBarrierBuffer barrierBuffer1; private CoBarrierBuffer barrierBuffer2; - private Queue<Integer> unprocessedIndices = new LinkedList<Integer>(); - public CoRecordReader(InputGate inputgate1, InputGate inputgate2) { super(new UnionInputGate(inputgate1, inputgate2)); @@ -109,14 +106,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable @SuppressWarnings("unchecked") protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException { - requestPartitionsOnce(); + requestPartitionsOnce(); while (true) { if (currentReaderIndex == 0) { if ((bufferReader1.isFinished() && bufferReader2.isFinished())) { return 0; } - + currentReaderIndex = getNextReaderIndexBlocking(); } @@ -234,10 +231,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable @Override public void onEvent(InputGate bufferReader) { if (bufferReader == bufferReader1) { - System.out.println("Added 1"); availableRecordReaders.add(1); } else if (bufferReader == bufferReader2) { - System.out.println("Added 2"); availableRecordReaders.add(2); } } http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java index 811c48a..d30c241 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.ReaderBase; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; @@ -31,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); private final RecordDeserializer<T>[] recordDeserializers; @@ -56,6 +57,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable private final BarrierBuffer barrierBuffer; + @SuppressWarnings("unchecked") protected StreamingAbstractRecordReader(InputGate inputGate) { super(inputGate); barrierBuffer = new BarrierBuffer(inputGate, this); http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java index e7a03d9..203216b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Queue; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; import org.junit.Test; public class BarrierBufferTest { http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index b7a1ba3..c207d60 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -101,11 +101,6 @@ public class WordCount { // emit the pairs for (String token : tokens) { - //FIXME to be removed. added this for test purposes - if("killme".equals(token)) - { - throw new Exception("byee"); - } if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); }