[FLINK-1638] [streaming] At-Least once monitoring semantics added and bug fixes

Fault Tolerance monitor suicide on ExecutionGraph terminal state

Forwarding messages to StreamCheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37390d63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37390d63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37390d63

Branch: refs/heads/master
Commit: 37390d6322ab984f17dd257f7a9939311c81edf5
Parents: ed5ba95
Author: Paris Carbone <seniorcarb...@gmail.com>
Authored: Fri Mar 6 15:10:12 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    |  15 ++-
 .../runtime/event/task/StreamingSuperstep.java  |  51 --------
 .../flink/runtime/executiongraph/Execution.java |  11 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  45 ++++++-
 .../runtime/executiongraph/ExecutionVertex.java |   9 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  33 +++++
 .../jobgraph/tasks/OperatorStateCarrier.java    |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  70 +++-------
 .../StreamCheckpointCoordinator.scala           | 129 +++++++++++++++++++
 .../runtime/jobmanager/StreamStateMonitor.scala | 122 ------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  21 +--
 .../connectors/kafka/KafkaConsumerExample.java  |   5 -
 .../kafka/api/simple/KafkaConsumerIterator.java |   5 +-
 .../KafkaDeserializingConsumerIterator.java     |   1 +
 .../flink/streaming/api/StreamConfig.java       |  14 ++
 .../apache/flink/streaming/api/StreamGraph.java |  20 +++
 .../api/StreamingJobGraphGenerator.java         |   9 +-
 .../environment/StreamExecutionEnvironment.java |  13 ++
 .../source/SocketTextStreamFunction.java        |   1 -
 .../api/streamvertex/InputHandler.java          |   1 -
 .../api/streamvertex/OutputHandler.java         |   1 -
 .../api/streamvertex/StreamVertex.java          |  17 ++-
 .../streamvertex/StreamingRuntimeContext.java   |   9 +-
 .../api/streamvertex/StreamingSuperstep.java    |  52 ++++++++
 .../flink/streaming/io/BarrierBuffer.java       |  74 +++++++++--
 .../flink/streaming/io/CoRecordReader.java      |  11 +-
 .../io/StreamingAbstractRecordReader.java       |   4 +-
 .../flink/streaming/io/BarrierBufferTest.java   |   2 +-
 .../streaming/examples/wordcount/WordCount.java |   5 -
 29 files changed, 452 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index a431a76..b2573f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorState;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -78,7 +79,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        /** The list of JAR files required to run this task. */
        private final List<BlobKey> requiredJarFiles;
        
-       private OperatorState operatorState;
+       private Map<String, OperatorState<?>> operatorStates;
 
        /**
         * Constructs a task deployment descriptor.
@@ -128,13 +129,13 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        Configuration taskConfiguration, String 
invokableClassName,
                        List<PartitionDeploymentDescriptor> producedPartitions,
                        List<PartitionConsumerDeploymentDescriptor> 
consumedPartitions,
-                       List<BlobKey> requiredJarFiles, int targetSlotNumber, 
OperatorState operatorState) {
+                       List<BlobKey> requiredJarFiles, int targetSlotNumber, 
Map<String,OperatorState<?>> operatorStates) {
 
                this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks,
                                jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
                                consumedPartitions, requiredJarFiles, 
targetSlotNumber);
                
-               setOperatorState(operatorState);
+               setOperatorStates(operatorStates);
        }
 
        /**
@@ -243,11 +244,11 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                                strProducedPartitions, strConsumedPartitions);
        }
 
-       public void setOperatorState(OperatorState operatorState) {
-               this.operatorState = operatorState;
+       public void setOperatorStates(Map<String,OperatorState<?>> 
operatorStates) {
+               this.operatorStates = operatorStates;
        }
 
-       public OperatorState getOperatorState() {
-               return operatorState;
+       public Map<String, OperatorState<?>> getOperatorStates() {
+               return operatorStates;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
deleted file mode 100644
index e35eb28..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class StreamingSuperstep extends TaskEvent {
-
-       protected long id;
-
-       public StreamingSuperstep() {
-
-       }
-
-       public StreamingSuperstep(long id) {
-               this.id = id;
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeLong(id);
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               id = in.readLong();
-       }
-
-       public long getId() {
-               return id;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 89f5183..93845c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -123,7 +124,7 @@ public class Execution implements Serializable {
        
        private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
        
-       private OperatorState operatorState;
+       private Map<String,OperatorState<?>> operatorStates;
 
        // 
--------------------------------------------------------------------------------------------
        
@@ -857,11 +858,11 @@ public class Execution implements Serializable {
                                (assignedResource == null ? "(unassigned)" : 
assignedResource.toString()), state);
        }
 
-       public void setOperatorState(OperatorState operatorState) {
-               this.operatorState = operatorState;
+       public void setOperatorStates(Map<String,OperatorState<?>> 
operatorStates) {
+               this.operatorStates = operatorStates;
        }
 
-       public OperatorState getOperatorState() {
-               return operatorState;
+       public Map<String,OperatorState<?>> getOperatorStates() {
+               return operatorStates;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index bf34e33..0c6c3a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.state.OperatorState;
@@ -38,8 +39,8 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple3;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -52,6 +53,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static akka.dispatch.Futures.future;
@@ -118,6 +120,14 @@ public class ExecutionGraph implements Serializable {
 
        private boolean allowQueuedScheduling = true;
 
+       private ActorContext parentContext;
+
+       private  ActorRef stateMonitorActor;
+       
+       private boolean monitoringEnabled;
+       
+       private long monitoringInterval = 10000;
+
        private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
        
@@ -159,6 +169,18 @@ public class ExecutionGraph implements Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
+       
+       public void setStateMonitorActor(ActorRef stateMonitorActor) {
+               this.stateMonitorActor = stateMonitorActor;
+       }
+
+       public ActorRef getStateMonitorActor() {
+               return stateMonitorActor;
+       }
+
+       public void setParentContext(ActorContext parentContext) {
+               this.parentContext = parentContext;
+       }
 
        public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
                if (numberOfRetriesLeft < -1) {
@@ -214,6 +236,14 @@ public class ExecutionGraph implements Serializable {
                }
        }
 
+       public void setMonitoringEnabled(boolean monitoringEnabled) {
+               this.monitoringEnabled = monitoringEnabled;
+       }
+
+       public void setMonitoringInterval(long  monitoringInterval) {
+               this.monitoringInterval = monitoringInterval;
+       }
+
        /**
         * Returns a list of BLOB keys referring to the JAR files required to 
run this job
         * @return list of BLOB keys referring to the JAR files required to run 
this job
@@ -361,12 +391,17 @@ public class ExecutionGraph implements Serializable {
                                        for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
                                                ejv.scheduleAll(scheduler, 
allowQueuedScheduling);
                                        }
-
                                        break;
 
                                case BACKTRACKING:
                                        throw new JobException("BACKTRACKING is 
currently not supported as schedule mode.");
                        }
+
+                       if(monitoringEnabled)
+                       {
+                               stateMonitorActor = 
StreamCheckpointCoordinator.spawn(parentContext, this,
+                                               
Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
+                       }
                }
                else {
                        throw new IllegalStateException("Job may only be 
scheduled from state " + JobStatus.CREATED);
@@ -532,9 +567,9 @@ public class ExecutionGraph implements Serializable {
                }
        }
        
-       public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> 
,OperatorState<?>> states)
+       public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, 
Integer, Long> , Map<String,OperatorState<?>>> states)
        {
-               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> 
,OperatorState<?>> state : states.entrySet())
+               for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , 
Map<String,OperatorState<?>>> state : states.entrySet())
                {
                        
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b7f962a..41d34d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -90,7 +91,7 @@ public class ExecutionVertex implements Serializable {
        
        private volatile boolean scheduleLocalOnly;
        
-       private OperatorState operatorState;
+       private Map<String,OperatorState<?>> operatorState;
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -198,11 +199,11 @@ public class ExecutionVertex implements Serializable {
                return currentExecution.getAssignedResourceLocation();
        }
 
-       public void setOperatorState(OperatorState operatorState) {
+       public void setOperatorState(Map<String,OperatorState<?>> 
operatorState) {
                this.operatorState = operatorState;
        }
 
-       public OperatorState getOperatorState() {
+       public Map<String,OperatorState<?>> getOperatorState() {
                return operatorState;
        }
        
@@ -393,7 +394,7 @@ public class ExecutionVertex implements Serializable {
                                
                                if(operatorState!=null)
                                {
-                                       
execution.setOperatorState(operatorState);
+                                       
execution.setOperatorStates(operatorState);
                                }
                                
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 0cf2f5e..4b398e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -75,6 +75,14 @@ public class JobGraph implements Serializable {
 
        private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
        
+       public enum JobType {STREAMING, BATCH}
+       
+       private JobType jobType = JobType.BATCH;
+       
+       private boolean monitoringEnabled = false;
+       
+       private long monitorInterval = 10000;
+       
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -253,6 +261,31 @@ public class JobGraph implements Serializable {
                return this.taskVertices.size();
        }
 
+
+       public void setJobType(JobType jobType) {
+               this.jobType = jobType;
+       }
+
+       public JobType getJobType() {
+               return jobType;
+       }
+
+       public void setMonitoringEnabled(boolean monitoringEnabled) {
+               this.monitoringEnabled = monitoringEnabled;
+       }
+
+       public boolean isMonitoringEnabled() {
+               return monitoringEnabled;
+       }
+
+       public void setMonitorInterval(long monitorInterval) {
+               this.monitorInterval = monitorInterval;
+       }
+
+       public long getMonitorInterval() {
+               return monitorInterval;
+       }
+
        /**
         * Searches for a vertex with a matching ID and returns it.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 6ea4f27..e8b6d6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.state.OperatorState;
 
+import java.util.Map;
+
 public interface OperatorStateCarrier {
        
-       public void injectState(OperatorState state);
+       public void injectStates(Map<String, OperatorState<?>> state);
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 97a6099..c350680 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -104,7 +104,6 @@ class JobManager(val configuration: Configuration,
   /** List of current jobs running jobs */
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, 
JobInfo)]()
   
-  val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]()
 
   /**
    * Run when the job manager is started. Simply logs an informational message.
@@ -284,78 +283,43 @@ class JobManager(val configuration: Configuration,
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-          // is the client waiting for the job result?
+            // is the client waiting for the job result?
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults = 
accumulatorManager.getJobAccumulatorResults(jobID)
-                jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, 
accumulatorResults)
+                jobInfo.client ! 
JobResultSuccess(jobID,jobInfo.duration,accumulatorResults)
               case JobStatus.CANCELED =>
                 jobInfo.client ! Failure(new JobCancellationException(jobID,
-                  "Job was cancelled.", error))
+                  "Job was cancelled.",error))
               case JobStatus.FAILED =>
                 jobInfo.client ! Failure(new JobExecutionException(jobID,
-                  "Job execution failed.", error))
+                  "Job execution failed.",error))
               case x =>
-                val exception = new JobExecutionException(jobID, s"$x is not a 
" +
-                  "terminal state.")
+                val exception = new JobExecutionException(jobID,s"$x is not a 
" +
+                        "terminal state.")
                 jobInfo.client ! Failure(exception)
                 throw exception
             }
+
+            removeJob(jobID)
             
-            barrierMonitors.get(jobID) match {
-                         case Some(monitor) =>
-                           newJobStatus match{
-                             case JobStatus.FINISHED | JobStatus.CANCELED =>
-                               monitor ! PoisonPill
-                               barrierMonitors.remove(jobID)
-                             case JobStatus.FAILING => 
-                               monitor ! JobStateRequest
-                           }
-                          case None =>
-                            removeJob(jobID)
-                        }
-          }
-          else {
-            newJobStatus match {
-              case JobStatus.RUNNING => currentJobs.get(jobID) match {
-              case Some((executionGraph, _)) => 
-              //FIXME this is just a fast n dirty check for determining 
streaming jobs 
-              if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
-                barrierMonitors.get(jobID) match {
-                  case None => 
-                    barrierMonitors += jobID -> 
StreamStateMonitor.props(context, executionGraph)
-                }
-              }
-              case None =>
-                log.error("Cannot create state monitor for job ID {}.", jobID)
-                new IllegalStateException("Cannot find execution graph for job 
ID " + jobID)
-              }
-            }
           }
         case None =>
           removeJob(jobID)
-      }
+          }
 
     case msg: BarrierAck =>
-      barrierMonitors.get(msg.jobID) match {
-        case Some(monitor) => monitor ! msg
+      currentJobs.get(msg.jobID) match {
+        case Some(jobExecution) =>
+          jobExecution._1.getStateMonitorActor forward  msg
         case None =>
       }
     case msg: StateBarrierAck =>
-      barrierMonitors.get(msg.jobID) match {
-        case Some(monitor) => monitor ! msg
-        case None =>
-      }
-
-    case msg: JobStateResponse =>
-      //inject initial states and restart the job
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          import scala.collection.JavaConverters._
-          jobExecution._1.loadOperatorStates(msg.opStates.asJava)
-          jobExecution._1.restart()
+          jobExecution._1.getStateMonitorActor forward  msg
         case None =>
-      } 
+      }
       
     case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
@@ -522,6 +486,9 @@ class JobManager(val configuration: Configuration,
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+        
+        executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
+        executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
 
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create 
splits
@@ -564,6 +531,9 @@ class JobManager(val configuration: Configuration,
           log.debug(s"Successfully created execution graph from job graph 
${jobId} (${jobName}).")
         }
 
+        // give an actorContext
+        executionGraph.setParentContext(context);
+        
         // get notified about job status changes
         executionGraph.registerJobStatusListener(self)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
new file mode 100644
index 0000000..7ab6a6f
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.lang.Long
+
+import akka.actor._
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.execution.ExecutionState.RUNNING
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.JobStatus._
+import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+import org.apache.flink.runtime.state.OperatorState
+
+import scala.collection.JavaConversions._
+import scala.collection.immutable.TreeMap
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.{FiniteDuration, _}
+
+object StreamCheckpointCoordinator {
+
+  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new 
StreamCheckpointCoordinator(executionGraph,
+      vertices,vertices.map(x => 
((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
+}
+
+class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
+                         val vertices: Iterable[ExecutionVertex],
+                         var acks: Map[(JobVertexID,Int),List[Long]],
+                         var states: Map[(JobVertexID, Integer, Long), 
+                                 java.util.Map[String,OperatorState[_]]],
+                         val interval: FiniteDuration,var curId: Long,var 
ackId: Long)
+        extends Actor with ActorLogMessages with ActorLogging {
+  
+  override def receiveWithLogMessages: Receive = {
+    
+    case InitBarrierScheduler =>
+      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval,2 * 
interval,self,CompactAndUpdate)
+      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
+        executionGraph.getJobID,executionGraph.getJobName)
+      
+    case BarrierTimeout =>
+      executionGraph.getState match {
+        case FAILED | CANCELED | FINISHED =>
+          log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", 
executionGraph.getJobID)
+          self ! PoisonPill
+        case _ =>
+          curId += 1
+          log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + 
executionGraph.getJobName)
+          vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
+                  v.getExecutionState == RUNNING).foreach(vertex
+          => vertex.getCurrentAssignedResource.getInstance.getTaskManager
+                    ! 
BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+      }
+      
+    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, 
opState) =>
+      states += (jobVertexID, instanceID, checkpointID) -> opState
+      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
+      
+    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
+          acks.get(jobVertexID,instanceID) match {
+            case Some(acklist) =>
+              acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
+            case None =>
+          }
+          log.debug(acks.toString)
+      
+      
+      
+    case CompactAndUpdate =>
+      val barrierCount = 
acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) 
+ 1)))
+      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
+      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
+      states = states.filterKeys(_._3 >= ackId)
+      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+      executionGraph.loadOperatorStates(states)
+      
+  }
+  
+}
+
+case class BarrierTimeout()
+
+case class InitBarrierScheduler()
+
+case class CompactAndUpdate()
+
+case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
+
+case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: 
Int,checkpointID: Long)
+
+case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: 
Integer,
+                           checkpointID: Long, states: 
java.util.Map[String,OperatorState[_]])
+       
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
deleted file mode 100644
index 65840f9..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import akka.actor._
-import org.apache.flink.runtime.ActorLogMessages
-import 
org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
-import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
-import org.apache.flink.runtime.state.OperatorState
-
-import java.lang.Long
-import scala.collection.JavaConversions._
-import scala.collection.immutable.TreeMap
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.{FiniteDuration,_}
-
-
-object StreamStateMonitor {
-
-  def props(context: ActorContext,executionGraph: ExecutionGraph,
-            interval: FiniteDuration = 5 seconds): ActorRef = {
-
-    val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
-    val monitor = context.system.actorOf(Props(new 
StreamStateMonitor(executionGraph,
-      vertices,vertices.map(x => 
((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[Long])).toMap,Map(),interval,0L,-1L)))
-    monitor ! InitBarrierScheduler
-    monitor
-  }
-
-  private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
-    for((_,execJobVertex) <- executionGraph.getAllVertices;
-        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
-    yield execVertex
-  }
-}
-
-class StreamStateMonitor(val executionGraph: ExecutionGraph,
-                         val vertices: Iterable[ExecutionVertex],
-                         var acks: Map[(JobVertexID,Int),List[Long]],
-                         var states: Map[(JobVertexID, Integer, Long), 
OperatorState[_]],
-                         val interval: FiniteDuration,var curId: Long,var 
ackId: Long)
-        extends Actor with ActorLogMessages with ActorLogging {
-
-  override def receiveWithLogMessages: Receive = {
-    
-    case InitBarrierScheduler =>
-      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
-      context.system.scheduler.schedule(2 * interval,2 * 
interval,self,TriggerBarrierCompaction)
-      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
-        executionGraph.getJobID,executionGraph.getJobName)
-      
-    case BarrierTimeout =>
-      curId += 1
-      log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + 
executionGraph.getJobName)
-      vertices.filter(v => 
v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
-      => vertex.getCurrentAssignedResource.getInstance.getTaskManager
-                ! 
BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
-      
-    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, 
opState) =>
-      states += (jobVertexID, instanceID, checkpointID) -> opState  
-      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
-      
-    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
-      acks.get(jobVertexID,instanceID) match {
-        case Some(acklist) =>
-          acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
-        case None =>
-      }
-      log.debug(acks.toString)
-      
-    case TriggerBarrierCompaction =>
-      val barrierCount = 
acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
-      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) 
+ 1)))
-      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
-      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
-      states = states.filterKeys(_._3 >= ackId)
-      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
-
-    case JobStateRequest =>
-      sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
-  }
-}
-
-case class BarrierTimeout()
-
-case class InitBarrierScheduler()
-
-case class TriggerBarrierCompaction()
-
-case class JobStateRequest()
-
-case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: 
Map[(JobVertexID, Integer, 
-        Long), OperatorState[_]])
-
-case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
-
-case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: 
Int,checkpointID: Long)
-
-case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: 
Integer,
-                           checkpointID: Long, state: OperatorState[_])
-       
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 497e784..3de917b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -352,7 +352,8 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
       }
 
     case BarrierReq(attemptID, checkpointID) =>
-      log.debug("[FT-TaskManager] Barrier request received for attempt {}", 
attemptID)
+      log.debug("[FT-TaskManager] Barrier {} request received for attempt {}", 
+          checkpointID, attemptID)
       runningTasks.get(attemptID) match {
         case Some(i) =>
           if (i.getExecutionState == ExecutionState.RUNNING) {
@@ -416,15 +417,6 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
 
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
         tdd.getTaskName, self)
-
-      //inject operator state
-      if(tdd.getOperatorState != null)
-      {
-        val vertex = task.getEnvironment.getInvokable match {
-          case opStateCarrier: OperatorStateCarrier =>
-            opStateCarrier.injectState(tdd.getOperatorState)
-        }
-      }
       
       runningTasks.put(executionID, task) match {
         case Some(_) => throw new RuntimeException(
@@ -446,6 +438,15 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo,
 
       task.setEnvironment(env)
 
+      //inject operator state
+      if(tdd.getOperatorStates != null)
+      {
+        val vertex = task.getEnvironment.getInvokable match {
+          case opStateCarrier: OperatorStateCarrier =>
+            opStateCarrier.injectStates(tdd.getOperatorStates)
+        }
+      }
+      
       // register the task with the network stack and profiles
       networkEnvironment match {
         case Some(ne) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index d9bb7d3..dd1221d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -17,14 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-<<<<<<< HEAD
-import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-=======
->>>>>>> a62796a... s
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
index 92d351a..370b3f0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,13 +37,11 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Iterates the records received from a partition of a Kafka topic as byte 
arrays.
  */
-public class KafkaConsumerIterator {
+public class KafkaConsumerIterator implements Serializable {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
index 6ca4c81..f2af6ca 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.streaming.connectors.util.DeserializationSchema;
 
 public class KafkaDeserializingConsumerIterator<IN> extends 
KafkaConsumerIterator {
 
+       private static final long serialVersionUID = 1L;
        private DeserializationSchema<IN> deserializationSchema;
 
        public KafkaDeserializingConsumerIterator(String host, int port, String 
topic, int partition, long waitOnEmptyFetch,

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7c90629..d813a30 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -67,6 +67,7 @@ public class StreamConfig implements Serializable {
        // DEFAULT VALUES
 
        private static final long DEFAULT_TIMEOUT = 100;
+       public static final String STATE_MONITORING = "STATE_MONITORING";
 
        // CONFIG METHODS
 
@@ -300,6 +301,18 @@ public class StreamConfig implements Serializable {
                config.setBytes(EDGES_IN_ORDER, 
SerializationUtils.serialize((Serializable) outEdgeList));
        }
 
+
+       public void setStateMonitoring(boolean stateMonitoring) {
+               
+               config.setBoolean(STATE_MONITORING, stateMonitoring);
+               
+       }
+       
+       public boolean getStateMonitoring()
+       {
+               return config.getBoolean(STATE_MONITORING, false);
+       }
+
        @SuppressWarnings("unchecked")
        public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader 
cl) {
                try {
@@ -399,6 +412,7 @@ public class StreamConfig implements Serializable {
                        builder.append("\nInvokable: Missing");
                }
                builder.append("\nBuffer timeout: " + getBufferTimeout());
+               builder.append("\nState Monitoring: " + getStateMonitoring());
                if (isChainStart() && getChainedOutputs(cl).size() > 0) {
                        builder.append("\n\n\n---------------------\nChained 
task configs\n---------------------\n");
                        
builder.append(getTransitiveChainedTaskConfigs(cl)).toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 641708e..8334aa1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -92,6 +92,10 @@ public class StreamGraph extends StreamingPlan {
        private Set<Integer> sources;
 
        private ExecutionConfig executionConfig;
+       
+       private boolean monitoringEnabled;
+       
+       private long monitoringInterval = 10000;
 
        public StreamGraph(ExecutionConfig executionConfig) {
 
@@ -606,6 +610,22 @@ public class StreamGraph extends StreamingPlan {
                return operatorNames.get(vertexID);
        }
 
+       public void setMonitoringEnabled(boolean monitoringEnabled) {
+               this.monitoringEnabled = monitoringEnabled;
+       }
+
+       public boolean isMonitoringEnabled() {
+               return monitoringEnabled;
+       }
+
+       public void setMonitoringInterval(long monitoringInterval) {
+               this.monitoringInterval = monitoringInterval;
+       }
+
+       public long getMonitoringInterval() {
+               return monitoringInterval;
+       }
+
        @Override
        public String getStreamingPlanAsJSON() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index c9698e3..b50ac25 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -74,7 +74,13 @@ public class StreamingJobGraphGenerator {
 
                // Turn lazy scheduling off
                jobGraph.setScheduleMode(ScheduleMode.ALL);
-
+               jobGraph.setJobType(JobGraph.JobType.STREAMING);
+               
jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
+               
jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
+               if(jobGraph.isMonitoringEnabled())
+               {
+                       jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+               }
                init();
 
                setChaining();
@@ -211,6 +217,7 @@ public class StreamingJobGraphGenerator {
                config.setNumberOfOutputs(nonChainableOutputs.size());
                config.setOutputs(nonChainableOutputs);
                config.setChainedOutputs(chainableOutputs);
+               config.setStateMonitoring(streamGraph.isMonitoringEnabled());
 
                Class<? extends AbstractInvokable> vertexClass = 
streamGraph.getJobVertexClass(vertexID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 835ce4e..9cc6131 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -151,6 +151,19 @@ public abstract class StreamExecutionEnvironment {
                this.bufferTimeout = timeoutMillis;
                return this;
        }
+       
+       public StreamExecutionEnvironment enableMonitoring(long interval)
+       {
+               streamGraph.setMonitoringEnabled(true);
+               streamGraph.setMonitoringInterval(interval);
+               return this;
+       }
+       
+       public StreamExecutionEnvironment enableMonitoring()
+       {
+               streamGraph.setMonitoringEnabled(true);
+               return this;
+       }
 
        /**
         * Sets the maximum time frequency (milliseconds) for the flushing of 
the

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 67bc128..d6a5b2b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -59,7 +59,6 @@ public class SocketTextStreamFunction extends 
RichSourceFunction<String> {
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                socket = new Socket();
-
                socket.connect(new InetSocketAddress(hostname, port), 
CONNECTION_TIMEOUT_TIME);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index a95965c..d766705 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 82f1329..fd375f6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 24a90d0..5ff47d6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -65,6 +64,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
        protected ClassLoader userClassLoader;
 
        private EventListener<TaskEvent> superstepListener;
+       
+       private boolean onRecovery;
 
        public StreamVertex() {
                userInvokable = null;
@@ -88,7 +89,10 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
        protected void initialize() {
                this.userClassLoader = getUserCodeClassLoader();
                this.configuration = new StreamConfig(getTaskConfiguration());
-               this.states = configuration.getOperatorStates(userClassLoader);
+               if(!onRecovery)
+               {
+                       this.states = 
configuration.getOperatorStates(userClassLoader);
+               }
                this.context = 
createRuntimeContext(getEnvironment().getTaskName(), this.states);
        }
 
@@ -122,12 +126,12 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        @Override
        public void confirmBarrier(long barrierID) {
                
-               if(states != null && states.containsKey("kafka"))
+               if(configuration.getStateMonitoring() && states != null)
                {
                        getEnvironment().getJobManager().tell(
                                        new 
StateBarrierAck(getEnvironment().getJobID(), 
                                                        
getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
-                                                       barrierID, 
states.get("kafka")), ActorRef.noSender());
+                                                       barrierID, states), 
ActorRef.noSender());
                }
                else
                {
@@ -290,8 +294,9 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        }
 
        @Override
-       public void injectState(OperatorState state) {
-               states.put("kafka", state);
+       public void injectStates(Map<String,OperatorState<?>> states) {
+               onRecovery = true;
+               this.states.putAll(states);
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 60dfe7a..492d2a0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -71,7 +71,14 @@ public class StreamingRuntimeContext extends 
RuntimeUDFContext {
                if (state == null) {
                        throw new RuntimeException("Cannot register null 
state");
                } else {
-                       operatorStates.put(name, state);
+                       if(operatorStates.containsKey(name))
+                       {
+                               throw new RuntimeException("State is already 
registered");
+                       }
+                       else
+                       {
+                               operatorStates.put(name, state);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
new file mode 100644
index 0000000..557c636
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+public class StreamingSuperstep extends TaskEvent {
+
+       protected long id;
+
+       public StreamingSuperstep() {
+
+       }
+
+       public StreamingSuperstep(long id) {
+               this.id = id;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeLong(id);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               id = in.readLong();
+       }
+
+       public long getId() {
+               return id;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 3ff718a..7dfccb0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -23,10 +23,10 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +53,12 @@ public class BarrierBuffer {
                this.reader = reader;
        }
 
+       /**
+        * Starts the next superstep
+        * 
+        * @param superstep
+        *            The next superstep
+        */
        protected void startSuperstep(StreamingSuperstep superstep) {
                this.currentSuperstep = superstep;
                this.superstepStarted = true;
@@ -61,30 +67,53 @@ public class BarrierBuffer {
                }
        }
 
+       /**
+        * Buffers a bufferOrEvent received from a blocked channel
+        * 
+        * @param bufferOrEvent
+        *            bufferOrEvent to buffer
+        */
        protected void store(BufferOrEvent bufferOrEvent) {
                nonprocessed.add(bufferOrEvent);
        }
 
+       /**
+        * Get then next non-blocked non-processed BufferOrEvent. Returns null 
if
+        * not available.
+        */
        protected BufferOrEvent getNonProcessed() {
-               BufferOrEvent nextNonprocessed = null;
-               while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
-                       nextNonprocessed = nonprocessed.poll();
+               BufferOrEvent nextNonprocessed;
+               while ((nextNonprocessed = nonprocessed.poll()) != null) {
                        if (isBlocked(nextNonprocessed.getChannelIndex())) {
                                blockedNonprocessed.add(nextNonprocessed);
-                               nextNonprocessed = null;
+                       } else {
+                               return nextNonprocessed;
                        }
                }
-               return nextNonprocessed;
+               return null;
        }
 
+       /**
+        * Checks whether a given channel index is blocked for this inputgate
+        * 
+        * @param channelIndex
+        *            The channel index to check
+        */
        protected boolean isBlocked(int channelIndex) {
                return blockedChannels.contains(channelIndex);
        }
 
+       /**
+        * Checks whether all channels are blocked meaning that barriers are
+        * received from all channels
+        */
        protected boolean isAllBlocked() {
                return blockedChannels.size() == totalNumberOfInputChannels;
        }
 
+       /**
+        * Returns the next non-blocked BufferOrEvent. This is a blocking 
operator.
+        */
        public BufferOrEvent getNextNonBlocked() throws IOException, 
InterruptedException {
                // If there are non-processed buffers from the previously 
blocked ones,
                // we get the next
@@ -99,7 +128,7 @@ public class BarrierBuffer {
                                bufferOrEvent = 
inputGate.getNextBufferOrEvent();
                                if (isBlocked(bufferOrEvent.getChannelIndex())) 
{
                                        // If channel blocked we just store it
-                                       store(bufferOrEvent);
+                                       blockedNonprocessed.add(bufferOrEvent);
                                } else {
                                        return bufferOrEvent;
                                }
@@ -107,6 +136,12 @@ public class BarrierBuffer {
                }
        }
 
+       /**
+        * Blocks the given channel index, from which a barrier has been 
received.
+        * 
+        * @param channelIndex
+        *            The channel index to block.
+        */
        protected void blockChannel(int channelIndex) {
                if (!blockedChannels.contains(channelIndex)) {
                        blockedChannels.add(channelIndex);
@@ -122,16 +157,27 @@ public class BarrierBuffer {
                }
        }
 
+       /**
+        * Releases the blocks on all channels.
+        */
        protected void releaseBlocks() {
-               nonprocessed.addAll(blockedNonprocessed);
+               if (!nonprocessed.isEmpty()) {
+                       // sanity check
+                       throw new RuntimeException("Error in barrier buffer 
logic");
+               }
+               nonprocessed = blockedNonprocessed;
+               blockedNonprocessed = new LinkedList<BufferOrEvent>();
                blockedChannels.clear();
-               blockedNonprocessed.clear();
                superstepStarted = false;
                if (LOG.isDebugEnabled()) {
                        LOG.debug("All barriers received, blocks released");
                }
        }
 
+       /**
+        * Method that is executed once the barrier has been received from all
+        * channels.
+        */
        protected void actOnAllBlocked() {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Publishing barrier to the vertex");
@@ -140,10 +186,12 @@ public class BarrierBuffer {
                releaseBlocks();
        }
 
-       public String toString() {
-               return blockedChannels.toString();
-       }
-
+       /**
+        * Processes a streaming superstep event
+        * 
+        * @param bufferOrEvent
+        *            The BufferOrEvent containing the event
+        */
        public void processSuperstep(BufferOrEvent bufferOrEvent) {
                StreamingSuperstep superstep = (StreamingSuperstep) 
bufferOrEvent.getEvent();
                if (!superstepStarted) {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 6a1f624..6c91f4d 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -19,11 +19,9 @@ package org.apache.flink.streaming.io;
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -33,6 +31,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 
 /**
  * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
@@ -66,8 +65,6 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 
extends IOReadable
        private CoBarrierBuffer barrierBuffer1;
        private CoBarrierBuffer barrierBuffer2;
 
-       private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
-
        public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
                super(new UnionInputGate(inputgate1, inputgate2));
 
@@ -109,14 +106,14 @@ public class CoRecordReader<T1 extends 
IOReadableWritable, T2 extends IOReadable
        @SuppressWarnings("unchecked")
        protected int getNextRecord(T1 target1, T2 target2) throws IOException, 
InterruptedException {
 
-               requestPartitionsOnce();        
+               requestPartitionsOnce();
 
                while (true) {
                        if (currentReaderIndex == 0) {
                                if ((bufferReader1.isFinished() && 
bufferReader2.isFinished())) {
                                        return 0;
                                }
-                               
+
                                currentReaderIndex = 
getNextReaderIndexBlocking();
 
                        }
@@ -234,10 +231,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
        @Override
        public void onEvent(InputGate bufferReader) {
                if (bufferReader == bufferReader1) {
-                       System.out.println("Added 1");
                        availableRecordReaders.add(1);
                } else if (bufferReader == bufferReader2) {
-                       System.out.println("Added 2");
                        availableRecordReaders.add(2);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
index 811c48a..d30c241 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable> extends AbstractReader implements
                ReaderBase {
 
+       @SuppressWarnings("unused")
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
 
        private final RecordDeserializer<T>[] recordDeserializers;
@@ -56,6 +57,7 @@ public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable
 
        private final BarrierBuffer barrierBuffer;
 
+       @SuppressWarnings("unchecked")
        protected StreamingAbstractRecordReader(InputGate inputGate) {
                super(inputGate);
                barrierBuffer = new BarrierBuffer(inputGate, this);

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index e7a03d9..203216b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Queue;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.junit.Test;
 
 public class BarrierBufferTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index b7a1ba3..c207d60 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -101,11 +101,6 @@ public class WordCount {
 
                        // emit the pairs
                        for (String token : tokens) {
-                               //FIXME to be removed. added this for test 
purposes 
-                               if("killme".equals(token))
-                               {
-                                       throw new Exception("byee");
-                               }
                                if (token.length() > 0) {
                                        out.collect(new Tuple2<String, 
Integer>(token, 1));
                                }

Reply via email to