[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612045#comment-16612045
 ] 

ASF GitHub Bot commented on FLINK-10205:
----------------------------------------

isunjin closed pull request #6657:     [FLINK-10205] [JobManager] Batch Job: 
InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 98054e94224..0c0b0dd2ffb 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,11 +7,21 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>metrics.latency.granularity</h5></td>
+            <td style="word-wrap: break-word;">"operator"</td>
+            <td>Defines the granularity of latency metrics. Accepted values 
are:<ul><li>single - Track latency without differentiating between sources and 
subtasks.</li><li>operator - Track latency while differentiating between 
sources, but not subtasks.</li><li>subtask - Track latency while 
differentiating between sources and subtasks.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>metrics.latency.history-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Defines the number of measured latencies to maintain at each 
operator.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.latency.interval</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Defines the interval at which latency tracking marks are 
emitted from the sources. Disables latency tracking if set to 0 or a negative 
value. Enabling this feature can significantly impact the performance of the 
cluster.</td>
+        </tr>
         <tr>
             <td><h5>metrics.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index c624fce8954..d0043647227 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -592,7 +592,7 @@ val output: DataSet[(Int, String, Double)] = 
input.sum(0).min(2)
       </td>
     </tr>
 
-    </tr>
+    <tr>
       <td><strong>Join</strong></td>
       <td>
         Joins two data sets by creating all pairs of elements that are equal 
on their keys.
@@ -608,7 +608,7 @@ val result = input1.join(input2).where(0).equalTo(1)
         describe whether the join happens through partitioning or 
broadcasting, and whether it uses
         a sort-based or a hash-based algorithm. Please refer to the
         <a 
href="dataset_transformations.html#join-algorithm-hints">Transformations 
Guide</a> for
-        a list of possible hints and an example.</br>
+        a list of possible hints and an example.<br />
         If no hint is specified, the system will try to make an estimate of 
the input sizes and
         pick the best strategy according to those estimates.
 {% highlight scala %}
@@ -700,7 +700,6 @@ val result = in.partitionByRange(0).mapPartition { ... }
 {% endhighlight %}
       </td>
     </tr>
-    </tr>
     <tr>
       <td><strong>Custom Partitioning</strong></td>
       <td>
@@ -1615,7 +1614,7 @@ In object-reuse enabled mode, Flink's runtime minimizes 
the number of object ins
    <tr>
       <td><strong>Emitting Input Objects</strong></td>
       <td>
-        You <strong>must not</strong> emit input objects, except for input 
objects of MapFunction, FlatMapFunction, MapPartitionFunction, 
GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and 
InputFormat.next(reuse).</td>
+        You <strong>must not</strong> emit input objects, except for input 
objects of MapFunction, FlatMapFunction, MapPartitionFunction, 
GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and 
InputFormat.next(reuse).
       </td>
    </tr>
    <tr>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 7d88a36393c..85c60a67a22 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the 
startup.
 
 ## Latency tracking
 
-Flink allows to track the latency of records traveling through the system. To 
enable the latency tracking
-a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+Flink allows to track the latency of records traveling through the system. 
This feature is disabled by default.
+To enable the latency tracking you must set the `latencyTrackingInterval` to a 
positive number in either the
+[Flink configuration]({{ site.baseurl 
}}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`.
 
 At the `latencyTrackingInterval`, the sources will periodically emit a special 
record, called a `LatencyMarker`.
 The marker contains a timestamp from the time when the record has been emitted 
at the sources.
@@ -1659,6 +1660,9 @@ latency issues caused by individual machines.
 Currently, Flink assumes that the clocks of all machines in the cluster are in 
sync. We recommend setting
 up an automated clock synchronisation service (like NTP) to avoid false 
latency results.
 
+<span class="label label-danger">Warning</span> Enabling latency metrics can 
significantly impact the performance
+of the cluster. It is highly recommended to only use them for debugging 
purposes.
+
 ## REST API integration
 
 Metrics can be queried through the [Monitoring REST API]({{ site.baseurl 
}}/monitoring/rest_api.html).
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index d563bcf108e..4f85e3cf8d5 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -1049,7 +1049,7 @@ private void 
handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
        }
 
        /**
-        * Sets the prefix of part files.  The default is no suffix.
+        * Sets the suffix of part files.  The default is no suffix.
         */
        public BucketingSink<T> setPartSuffix(String partSuffix) {
                this.partSuffix = partSuffix;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..6b7caaac6ec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.Preconditions;
 
@@ -131,7 +132,9 @@
        /**
         * Interval in milliseconds for sending latency tracking marks from the 
sources to the sinks.
         */
-       private long latencyTrackingInterval = 2000L;
+       private long latencyTrackingInterval = 
MetricOptions.LATENCY_INTERVAL.defaultValue();
+
+       private boolean isLatencyTrackingConfigured = false;
 
        /**
         * @deprecated Should no longer be used because it is subsumed by 
RestartStrategyConfiguration
@@ -234,8 +237,6 @@ public long getAutoWatermarkInterval()  {
         * Interval for sending latency tracking marks from the sources to the 
sinks.
         * Flink will send latency tracking marks from the sources at the 
specified interval.
         *
-        * Recommended value: 2000 (2 seconds).
-        *
         * Setting a tracking interval <= 0 disables the latency tracking.
         *
         * @param interval Interval in milliseconds.
@@ -243,6 +244,7 @@ public long getAutoWatermarkInterval()  {
        @PublicEvolving
        public ExecutionConfig setLatencyTrackingInterval(long interval) {
                this.latencyTrackingInterval = interval;
+               this.isLatencyTrackingConfigured = true;
                return this;
        }
 
@@ -256,12 +258,17 @@ public long getLatencyTrackingInterval() {
        }
 
        /**
-        * Returns if latency tracking is enabled
-        * @return True, if the tracking is enabled, false otherwise.
+        * @deprecated will be removed in a future version
         */
        @PublicEvolving
+       @Deprecated
        public boolean isLatencyTrackingEnabled() {
-               return latencyTrackingInterval > 0;
+               return isLatencyTrackingConfigured && latencyTrackingInterval > 
0;
+       }
+
+       @Internal
+       public boolean isLatencyTrackingConfigured() {
+               return isLatencyTrackingConfigured;
        }
 
        /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index f9fd02423d8..67444a5397c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -19,8 +19,10 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * Configuration options for metrics and metric reporters.
@@ -104,6 +106,24 @@
                        
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>")
                        .withDescription("Defines the scope format string that 
is applied to all metrics scoped to an operator.");
 
+       public static final ConfigOption<Long> LATENCY_INTERVAL =
+               key("metrics.latency.interval")
+                       .defaultValue(0L)
+                       .withDescription("Defines the interval at which latency 
tracking marks are emitted from the sources." +
+                               " Disables latency tracking if set to 0 or a 
negative value. Enabling this feature can significantly" +
+                               " impact the performance of the cluster.");
+
+       public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
+               key("metrics.latency.granularity")
+                       .defaultValue("operator")
+                       .withDescription(Description.builder()
+                               .text("Defines the granularity of latency 
metrics. Accepted values are:")
+                               .list(
+                                       text("single - Track latency without 
differentiating between sources and subtasks."),
+                                       text("operator - Track latency while 
differentiating between sources, but not subtasks."),
+                                       text("subtask - Track latency while 
differentiating between sources and subtasks."))
+                               .build());
+
        /** The number of measured latencies to maintain at each operator. */
        public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
                key("metrics.latency.history-size")
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d024a07..195812d1d1c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
       case insert: SqlInsert =>
         // validate the SQL query
         val query = insert.getSource
-        planner.validate(query)
+        val validatedQuery = planner.validate(query)
 
         // get query result as Table
-        val queryResult = new Table(this, 
LogicalRelNode(planner.rel(query).rel))
+        val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))
 
         // get name of sink table
         val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 1aadf3140da..fa6f020ea6d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.functions
 
 import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
+import java.nio.charset.StandardCharsets
 
 import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.lang3.StringUtils
@@ -207,12 +208,14 @@ object ScalarFunctions {
   /**
     * Returns the base string decoded with base64.
     */
-  def fromBase64(str: String): String = new String(Base64.decodeBase64(str))
+  def fromBase64(str: String): String =
+    new String(Base64.decodeBase64(str), StandardCharsets.UTF_8)
 
   /**
     * Returns the base64-encoded result of the input string.
     */
-  def toBase64(base: String): String = 
Base64.encodeBase64String(base.getBytes())
+  def toBase64(base: String): String =
+    Base64.encodeBase64String(base.getBytes(StandardCharsets.UTF_8))
 
   /**
     * Returns the hex string of a long argument.
@@ -222,7 +225,8 @@ object ScalarFunctions {
   /**
     * Returns the hex string of a string argument.
     */
-  def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase()
+  def hex(x: String): String =
+    Hex.encodeHexString(x.getBytes(StandardCharsets.UTF_8)).toUpperCase()
 
   /**
     * Returns an UUID string using Java utilities.
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed9468bf..6c477fd9ca9 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "")
   }
 
-
   // test should fail because time is not the primary order field
   @Test(expected = classOf[TableException])
   def testSortProcessingTimeSecondaryField(): Unit = {
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 503825475e7..fbd9b028547 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -483,6 +483,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f24.hex()",
       "HEX(f24)",
       "2A5F546869732069732061207465737420537472696E672E")
+
+    testAllApis(
+      "你好".hex(),
+      "'你好'.hex()",
+      "HEX('你好')",
+      "E4BDA0E5A5BD"
+    )
   }
 
   @Test
@@ -563,6 +570,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f33.fromBase64()",
       "FROM_BASE64(f33)",
       "null")
+
+    testAllApis(
+      "5L2g5aW9".fromBase64(),
+      "'5L2g5aW9'.fromBase64()",
+      "FROM_BASE64('5L2g5aW9')",
+      "你好"
+    )
   }
 
   @Test
@@ -591,6 +605,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f33.toBase64()",
       "TO_BASE64(f33)",
       "null")
+
+    testAllApis(
+      "你好".toBase64(),
+      "'你好'.toBase64()",
+      "TO_BASE64('你好')",
+      "5L2g5aW9"
+    )
   }
 
   @Test
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a031b4..e7b79a5a196 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import 
org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
       "20")
     assertEquals(expected, SortITCase.testResults)
   }
+
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+      .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable ORDER BY rowtime, a desc"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "3,2,Hello world,1970-01-01 00:00:00.002",
+      "2,2,Hello,1970-01-01 00:00:00.002")
+    assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+  }
 }
 
 object SortITCase {
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad436a18..1edd79fca56 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
     }
 
     override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      val inputParallelism = dataStream.getParallelism
       dataStream
         .addSink(new MemoryAppendSink)
+        .setParallelism(inputParallelism)
         .name(TableConnectorUtil.generateRuntimeName(this.getClass, 
getFieldNames))
     }
   }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..ca206c15874 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -176,6 +177,8 @@
 
        private volatile IOMetrics ioMetrics;
 
+       private int currentSplitIndex = 0;
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
                }
        }
 
+       public InputSplit getNextInputSplit() {
+               final LogicalSlot slot = this.getAssignedResource();
+               final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+               return this.vertex.getNextInputSplit(this.currentSplitIndex++, 
host);
+       }
+
        @Override
        public TaskManagerLocation getAssignedResourceLocation() {
                // returns non-null only when a location is already assigned
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e4228011830..4929d3931b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
@@ -104,6 +105,9 @@
        /** The current or latest execution attempt of this vertex's task. */
        private volatile Execution currentExecution;    // this field must 
never be null
 
+       /** input split*/
+       private ArrayList<InputSplit> inputSplits;
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -185,6 +189,7 @@ public ExecutionVertex(
                getExecutionGraph().registerExecution(currentExecution);
 
                this.timeout = timeout;
+               this.inputSplits = new ArrayList<>();
        }
 
 
@@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
                return locationConstraint;
        }
 
+       public InputSplit getNextInputSplit(int index, String host) {
+               final int taskId = this.getParallelSubtaskIndex();
+               synchronized (this.inputSplits) {
+                       if (index < this.inputSplits.size()) {
+                               return this.inputSplits.get(index);
+                       } else {
+                               final InputSplit nextInputSplit = 
this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
+                               this.inputSplits.add(nextInputSplit);
+                               return nextInputSplit;
+                       }
+               }
+       }
+
        @Override
        public Execution getCurrentExecutionAttempt() {
                return currentExecution;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 0019acf6e86..041fea00fa9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -226,7 +226,6 @@ public void releaseJob(JobID jobId, ExecutionAttemptID 
executionId) {
                        Set<ExecutionAttemptID> jobRefCounter = 
jobRefHolders.get(jobId);
 
                        if (jobRefCounter == null || jobRefCounter.isEmpty()) {
-                               LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
                                return;
                        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 4bad92f06b0..f368ff0365e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -90,11 +90,12 @@ public Buffer build() {
        }
 
        /**
-        * @return a retained copy of self with separate indexes - it allows 
two read from the same {@link MemorySegment}
-        * twice.
+        * Returns a retained copy with separate indexes. This allows to read 
from the same {@link MemorySegment} twice.
         *
         * <p>WARNING: newly returned {@link BufferConsumer} will have reader 
index copied from the original buffer. In
         * other words, data already consumed before copying will not be 
visible to the returned copies.
+        *
+        * @return a retained copy of self with separate indexes
         */
        public BufferConsumer copy() {
                return new BufferConsumer(buffer.retainBuffer(), 
writerPosition.positionMarker, currentReaderPosition);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index a4928ed307e..2927fae1069 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -34,7 +34,7 @@
        /**
         * Destroys this buffer pool.
         *
-        * <p> If not all buffers are available, they are recycled lazily as 
soon as they are recycled.
+        * <p>If not all buffers are available, they are recycled lazily as 
soon as they are recycled.
         */
        void lazyDestroy();
 
@@ -50,7 +50,7 @@
        int getNumberOfRequiredMemorySegments();
 
        /**
-        * Returns the maximum number of memory segments this buffer pool 
should use
+        * Returns the maximum number of memory segments this buffer pool 
should use.
         *
         * @return maximum number of memory segments to use or <tt>-1</tt> if 
unlimited
         */
@@ -59,14 +59,14 @@
        /**
         * Returns the current size of this buffer pool.
         *
-        * <p> The size of the buffer pool can change dynamically at runtime.
+        * <p>The size of the buffer pool can change dynamically at runtime.
         */
        int getNumBuffers();
 
        /**
         * Sets the current size of this buffer pool.
         *
-        * <p> The size needs to be greater or equal to the guaranteed number 
of memory segments.
+        * <p>The size needs to be greater or equal to the guaranteed number of 
memory segments.
         */
        void setNumBuffers(int numBuffers) throws IOException;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index ffed43251d8..c90e3025237 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -29,7 +29,7 @@
         * Tries to create a buffer pool, which is guaranteed to provide at 
least the number of required
         * buffers.
         *
-        * <p> The buffer pool is of dynamic size with at least 
<tt>numRequiredBuffers</tt> buffers.
+        * <p>The buffer pool is of dynamic size with at least 
<tt>numRequiredBuffers</tt> buffers.
         *
         * @param numRequiredBuffers
         *              minimum number of network buffers in this pool
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
index 66a69956008..acfe2405e20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Interface for releasing memory buffers.
+ */
 public interface BufferPoolOwner {
 
        void releaseMemory(int numBuffersToRecycle) throws IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
index a6495d02c9a..66b1fa2dc58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
@@ -20,6 +20,9 @@
 
 import org.apache.flink.core.memory.MemorySegment;
 
+/**
+ * Interface for recycling {@link MemorySegment}s.
+ */
 public interface BufferRecycler {
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
index fdce8837046..548c0cc7948 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
@@ -24,11 +24,11 @@
  * A simple buffer recycler that frees the memory segments.
  */
 public class FreeingBufferRecycler implements BufferRecycler {
-       
+
        public static final BufferRecycler INSTANCE = new 
FreeingBufferRecycler();
-       
+
        // 
------------------------------------------------------------------------
-       
+
        // Not instantiable
        private FreeingBufferRecycler() {}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc793635047..c6f3e158519 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ public void flush() {
        @Override
        public void finish() throws IOException {
                
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
-               LOG.debug("Finished {}.", this);
+               LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
        }
 
        private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ public void release() {
                        isReleased = true;
                }
 
-               LOG.debug("Released {}.", this);
+               LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
 
                if (view != null) {
                        view.releaseAllResources();
@@ -224,7 +224,8 @@ public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener avail
                                        "Subpartition %s of is being (or 
already has been) consumed, " +
                                        "but pipelined subpartitions can only 
be consumed once.", index, parent.getPartitionId());
 
-                       LOG.debug("Creating read view for subpartition {} of 
partition {}.", index, parent.getPartitionId());
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), index, 
parent.getPartitionId());
 
                        readView = new PipelinedSubpartitionView(this, 
availabilityListener);
                        if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ public String toString() {
                }
 
                return String.format(
-                       "PipelinedSubpartition [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-                       numBuffers, numBytes, getBuffersInBacklog(), finished, 
hasReadView);
+                       "PipelinedSubpartition#%d [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+                       index, numBuffers, numBytes, getBuffersInBacklog(), 
finished, hasReadView);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b45bb..93e5ba15097 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public JobID getJobId() {
                return jobId;
        }
 
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
        public ResultPartitionID getPartitionId() {
                return partitionId;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b1a4d..9f696adc362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ public synchronized void finish() throws IOException {
                if (spillWriter != null) {
                        spillWriter.close();
                }
+               LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
        }
 
        @Override
@@ -180,6 +181,8 @@ public synchronized void release() throws IOException {
                        isReleased = true;
                }
 
+               LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
                if (view != null) {
                        view.releaseAllResources();
                }
@@ -236,8 +239,8 @@ public int releaseMemory() throws IOException {
                                long spilledBytes = 
spillFinishedBufferConsumers(isFinished);
                                int spilledBuffers = numberOfBuffers - 
buffers.size();
 
-                               LOG.debug("Spilling {} bytes ({} buffers} for 
sub partition {} of {}.",
-                                       spilledBytes, spilledBuffers, index, 
parent.getPartitionId());
+                               LOG.debug("{}: Spilling {} bytes ({} buffers} 
for sub partition {} of {}.",
+                                       parent.getOwningTaskName(), 
spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
                                return spilledBuffers;
                        }
@@ -300,9 +303,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 
        @Override
        public String toString() {
-               return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
+               return String.format("SpillableSubpartition#%d [%d number of 
buffers (%d bytes)," +
                                "%d number of buffers in backlog, finished? %s, 
read view? %s, spilled? %s]",
-                       getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+                       index, getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(),
                        getBuffersInBacklog(), isFinished, readView != null, 
spillWriter != null);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 01cb2b6b099..14f2c950e67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,7 +24,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
@@ -95,7 +94,6 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -569,16 +567,12 @@ public void start() {
                        return FutureUtils.completedExceptionally(new 
Exception("Cannot find execution vertex for vertex ID " + vertexID));
                }
 
-               final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
-               if (splitAssigner == null) {
+               if (vertex.getSplitAssigner() == null) {
                        log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
                        return FutureUtils.completedExceptionally(new 
Exception("No InputSplitAssigner for vertex ID " + vertexID));
                }
 
-               final LogicalSlot slot = execution.getAssignedResource();
-               final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
-               final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
-               final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+               final InputSplit nextInputSplit = execution.getNextInputSplit();
 
                if (log.isDebugEnabled()) {
                        log.debug("Send next input split {}.", nextInputSplit);
@@ -909,7 +903,7 @@ public void heartbeatFromResourceManager(final ResourceID 
resourceID) {
        }
 
        @Override
-       public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time 
timeout) {
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
                final ExecutionGraph currentExecutionGraph = executionGraph;
                return CompletableFuture.supplyAsync(() -> 
WebMonitorUtils.createDetailsForJob(currentExecutionGraph), 
scheduledExecutorService);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d17a6..bc073c192bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ void heartbeatFromTaskManager(
        CompletableFuture<String> triggerSavepoint(
                @Nullable final String targetDirectory,
                final boolean cancelJob,
-               final Time timeout);
+               @RpcTimeout final Time timeout);
 
        /**
         * Requests the statistics on operator back pressure.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
index 77b9847d331..c328dd52ce3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -25,8 +25,8 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * A leader listener that exposes a future for the first leader notification.  
- * 
+ * A leader listener that exposes a future for the first leader notification.
+ *
  * <p>The future can be obtained via the {@link #future()} method.
  */
 public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
@@ -38,7 +38,7 @@ public OneTimeLeaderListenerFuture() {
        }
 
        /**
-        * Gets the future that is completed with the leader address and ID. 
+        * Gets the future that is completed with the leader address and ID.
         * @return The future.
         */
        public CompletableFuture<LeaderAddressAndId> future() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 808de222d0a..1ef1f3b5832 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -43,8 +43,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.concurrent.Await;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1c2d2a3ecaf..19de2102a35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -100,9 +100,12 @@ public AbstractKeyedStateBackend(
                ExecutionConfig executionConfig,
                TtlTimeProvider ttlTimeProvider) {
 
+               Preconditions.checkArgument(numberOfKeyGroups >= 1, 
"NumberOfKeyGroups must be a positive number");
+               Preconditions.checkArgument(numberOfKeyGroups >= 
keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be 
at least the number in the key group range assigned to this backend");
+
                this.kvStateRegistry = kvStateRegistry;
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
-               this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
+               this.numberOfKeyGroups = numberOfKeyGroups;
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
                this.cancelStreamRegistry = new CloseableRegistry();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
index f92504ef849..9f33866bf10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
@@ -193,7 +193,7 @@ public void testConjunctFutureFailureOnSuccessive() throws 
Exception {
        }
 
        /**
-        * Tests that the conjunct future returns upon completion the 
collection of all future values
+        * Tests that the conjunct future returns upon completion the 
collection of all future values.
         */
        @Test
        public void testConjunctFutureValue() throws ExecutionException, 
InterruptedException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 1639c919569..c386952c056 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -535,7 +535,7 @@ public void testCompleteAllExceptional() throws Exception {
                        final FlinkException suppressedException;
 
                        if (actual.equals(testException1)) {
-                                suppressedException = testException2;
+                               suppressedException = testException2;
                        } else {
                                suppressedException = testException1;
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index fcbf9d553f4..5e8e42eee77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -26,6 +26,9 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the destruction of a {@link LocalBufferPool}.
+ */
 public class LocalBufferPoolDestroyTest {
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 7a6fe6a3c65..537d167908f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -50,17 +50,20 @@
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
+/**
+ * Tests for the {@link LocalBufferPool}.
+ */
 public class LocalBufferPoolTest extends TestLogger {
 
-       private final static int numBuffers = 1024;
+       private static final int numBuffers = 1024;
 
-       private final static int memorySegmentSize = 128;
+       private static final int memorySegmentSize = 128;
 
        private NetworkBufferPool networkBufferPool;
 
        private BufferPool localBufferPool;
 
-       private final static ExecutorService executor = 
Executors.newCachedThreadPool();
+       private static final ExecutorService executor = 
Executors.newCachedThreadPool();
 
        @Before
        public void setupLocalBufferPool() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
index 8efd2bb7021..dfea9376f02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.serialization.types;
 
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * A large {@link SerializationTestType}.
+ */
 public class LargeObjectType implements SerializationTestType {
 
        private static final int MIN_LEN = 12 * 1000 * 1000;
-       
+
        private static final int MAX_LEN = 40 * 1000 * 1000;
 
        private int len;
@@ -68,13 +70,13 @@ public void write(DataOutputView out) throws IOException {
        public void read(DataInputView in) throws IOException {
                final int len = in.readInt();
                this.len = len;
-               
+
                for (int i = 0; i < len / 8; i++) {
                        if (in.readLong() != i) {
                                throw new IOException("corrupt serialization");
                        }
                }
-               
+
                for (int i = 0; i < len % 8; i++) {
                        if (in.readByte() != i) {
                                throw new IOException("corrupt serialization");
@@ -91,7 +93,7 @@ public int hashCode() {
        public boolean equals(Object obj) {
                return (obj instanceof LargeObjectType) && ((LargeObjectType) 
obj).len == this.len;
        }
-       
+
        @Override
        public String toString() {
                return "Large Object " + len;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 66ca769165a..adc401b96af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,13 +21,17 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -50,10 +54,12 @@
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -64,9 +70,11 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -76,6 +84,8 @@
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -92,6 +102,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -101,6 +112,7 @@
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -108,6 +120,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -122,24 +135,29 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
  */
 public class JobMasterTest extends TestLogger {
 
-       static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new 
TestingInputSplit[0];
+       private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = 
new TestingInputSplit[0];
 
        @ClassRule
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -418,7 +436,7 @@ public void testAutomaticRestartingWhenCheckpointing() 
throws Exception {
        }
 
        /**
-        * Tests that an existing checkpoint will have precedence over an 
savepoint
+        * Tests that an existing checkpoint will have precedence over an 
savepoint.
         */
        @Test
        public void testCheckpointPrecedesSavepointRecovery() throws Exception {
@@ -470,7 +488,7 @@ public void testCheckpointPrecedesSavepointRecovery() 
throws Exception {
 
        /**
         * Tests that the JobMaster retries the scheduling of a job
-        * in case of a missing slot offering from a registered TaskExecutor
+        * in case of a missing slot offering from a registered TaskExecutor.
         */
        @Test
        public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception 
{
@@ -698,6 +716,66 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
                }
        }
 
+       private JobGraph createDataSourceJobGraph() {
+               final TextInputFormat inputFormat = new TextInputFormat(new 
Path("."));
+               final InputFormatVertex producer = new 
InputFormatVertex("Producer");
+               new TaskConfig(producer.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
+               producer.setInvokableClass(DataSourceTask.class);
+
+               final JobVertex consumer = new JobVertex("Consumer");
+               consumer.setInvokableClass(NoOpInvokable.class);
+               consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+               final JobGraph jobGraph = new JobGraph(producer, consumer);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               return jobGraph;
+       }
+
+       /**
+        * Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, 
ExecutionAttemptID)}
+        * validate that it will get same result for a different retry
+        */
+       @Test
+       public void testRequestNextInputSplitWithDataSourceFailover() throws 
Exception {
+
+               final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+               testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, 
jobMaster) ->{
+                       try{
+                               final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                               final TaskInformation taskInformation = 
tdd.getSerializedTaskInformation()
+                                       
.deserializeValue(getClass().getClassLoader());
+                               JobVertexID vertexID = 
taskInformation.getJobVertexId();
+
+                               //get the previous split
+                               SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+                               //start a new version of this execution
+                               ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
+                               Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+                               ExecutionVertex executionVertex = 
execution.getVertex();
+
+                               long version = execution.getGlobalModVersion();
+                               gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FINISHED)).get();
+                               Execution newExecution = 
executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+                               //get the new split
+                               SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+                               
Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
+
+                               //get the new split3
+                               SerializedInputSplit split3 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+                               
Assert.assertNotEquals(split1.getInputSplitData().length, 
split3.getInputSplitData().length);
+                       }
+                       catch (Exception e){
+                               Assert.fail(e.toString());
+                       }
+               });
+       }
+
        @Test
        public void testRequestNextInputSplit() throws Exception {
                final List<TestingInputSplit> expectedInputSplits = 
Arrays.asList(
@@ -844,16 +922,10 @@ public int hashCode() {
                }
        }
 
-       /**
-        * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
-        * call for a finished result partition.
-        */
-       @Test
-       public void testRequestPartitionState() throws Exception {
-               final JobGraph producerConsumerJobGraph = 
producerConsumerJobGraph();
+       private void testJobMasterAPIWithMockExecution(JobGraph graph, 
BiConsumer<TaskDeploymentDescriptor, JobMaster> consumer) throws  Exception{
                final JobMaster jobMaster = createJobMaster(
                        configuration,
-                       producerConsumerJobGraph,
+                       graph,
                        haServices,
                        new TestingJobManagerSharedServicesBuilder().build(),
                        heartbeatServices);
@@ -874,9 +946,9 @@ public void testRequestPartitionState() throws Exception {
                        final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
                        final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
                                
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-                                         
tddFuture.complete(taskDeploymentDescriptor);
-                                         return 
CompletableFuture.completedFuture(Acknowledge.get());
-                                 })
+                                       
tddFuture.complete(taskDeploymentDescriptor);
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
                                .createTestingTaskExecutorGateway();
                        
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
 
@@ -898,20 +970,90 @@ public void testRequestPartitionState() throws Exception {
 
                        // obtain tdd for the result partition ids
                        final TaskDeploymentDescriptor tdd = tddFuture.get();
+                       consumer.accept(tdd, jobMaster);
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
 
-                       assertThat(tdd.getProducedPartitions(), hasSize(1));
-                       final ResultPartitionDeploymentDescriptor partition = 
tdd.getProducedPartitions().iterator().next();
+       /**
+        * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
+        * call for a finished result partition.
+        */
+       @Test
+       public void testRequestPartitionState() throws Exception {
+               JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
+               testJobMasterAPIWithMockExecution(producerConsumerJobGraph, 
(tdd, jobMaster) ->{
+                       try{
+                               final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+                               assertThat(tdd.getProducedPartitions(), 
hasSize(1));
+                               final ResultPartitionDeploymentDescriptor 
partition = tdd.getProducedPartitions().iterator().next();
 
-                       final ExecutionAttemptID executionAttemptId = 
tdd.getExecutionAttemptId();
-                       final ExecutionAttemptID copiedExecutionAttemptId = new 
ExecutionAttemptID(executionAttemptId.getLowerPart(), 
executionAttemptId.getUpperPart());
+                               final ExecutionAttemptID executionAttemptId = 
tdd.getExecutionAttemptId();
+                               final ExecutionAttemptID 
copiedExecutionAttemptId = new 
ExecutionAttemptID(executionAttemptId.getLowerPart(), 
executionAttemptId.getUpperPart());
 
-                       // finish the producer task
-                       jobMasterGateway.updateTaskExecutionState(new 
TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, 
ExecutionState.FINISHED)).get();
+                               // finish the producer task
+                               jobMasterGateway.updateTaskExecutionState(new 
TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, 
ExecutionState.FINISHED)).get();
 
-                       // request the state of the result partition of the 
producer
-                       final CompletableFuture<ExecutionState> 
partitionStateFuture = 
jobMasterGateway.requestPartitionState(partition.getResultId(), new 
ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+                               // request the state of the result partition of 
the producer
+                               final CompletableFuture<ExecutionState> 
partitionStateFuture = 
jobMasterGateway.requestPartitionState(partition.getResultId(), new 
ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+
+                               assertThat(partitionStateFuture.get(), 
equalTo(ExecutionState.FINISHED));
+                       }
+                       catch (Exception e) {
+                               Assert.fail(e.toString());
+                       }
+               });
+       }
+
+       /**
+        * Tests that the timeout in {@link 
JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+        * is respected.
+        */
+       @Test
+       public void testTriggerSavepointTimeout() throws Exception {
+               final JobMaster jobMaster = new JobMaster(
+                       rpcService,
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jmResourceId,
+                       jobGraph,
+                       haServices,
+                       DefaultSlotPoolFactory.fromConfiguration(configuration, 
rpcService),
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices,
+                       blobServer,
+                       UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+                       new NoOpOnCompletionActions(),
+                       testingFatalErrorHandler,
+                       JobMasterTest.class.getClassLoader()) {
+
+                       @Override
+                       public CompletableFuture<String> triggerSavepoint(
+                                       @Nullable final String targetDirectory,
+                                       final boolean cancelJob,
+                                       final Time timeout) {
+                               return new CompletableFuture<>();
+                       }
+               };
+
+               try {
+                       final CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+                       final CompletableFuture<String> 
savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, 
Time.milliseconds(1));
+                       final CompletableFuture<String> 
savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, 
RpcUtils.INF_TIMEOUT);
+
+                       try {
+                               
savepointFutureLowTimeout.get(testingTimeout.getSize(), 
testingTimeout.getUnit());
+                               fail();
+                       } catch (final ExecutionException e) {
+                               final Throwable cause = 
ExceptionUtils.stripExecutionException(e);
+                               assertThat(cause, 
instanceOf(TimeoutException.class));
+                       }
 
-                       assertThat(partitionStateFuture.get(), 
equalTo(ExecutionState.FINISHED));
+                       assertThat(savepointFutureHighTimeout.isDone(), 
is(equalTo(false)));
                } finally {
                        RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 68858bc0cea..0c9dd49c1d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.Preconditions;
@@ -91,6 +90,8 @@
 
        private final JobVertexID jobVertexID;
 
+       private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
+
        private final BroadcastVariableManager bcVarManager = new 
BroadcastVariableManager();
 
        private final AccumulatorRegistry accumulatorRegistry;
@@ -127,7 +128,8 @@ protected MockEnvironment(
                int parallelism,
                int subtaskIndex,
                ClassLoader userCodeClassLoader,
-               TaskMetricGroup taskMetricGroup) {
+               TaskMetricGroup taskMetricGroup,
+               TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
 
                this.jobID = jobID;
                this.jobVertexID = jobVertexID;
@@ -140,6 +142,7 @@ protected MockEnvironment(
 
                this.memManager = new MemoryManager(memorySize, 1);
                this.ioManager = new IOManagerAsync();
+               this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 
                this.executionConfig = executionConfig;
                this.inputSplitProvider = inputSplitProvider;
@@ -212,7 +215,7 @@ public Configuration getJobConfiguration() {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TestingTaskManagerRuntimeInfo();
+               return this.taskManagerRuntimeInfo;
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfcc5f312e0..34a6ec492dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -27,6 +27,8 @@
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 public class MockEnvironmentBuilder {
        private String taskName = "mock-task";
@@ -43,6 +45,7 @@
        private JobID jobID = new JobID();
        private JobVertexID jobVertexID = new JobVertexID();
        private TaskMetricGroup taskMetricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+       private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
        public MockEnvironmentBuilder setTaskName(String taskName) {
                this.taskName = taskName;
@@ -79,6 +82,11 @@ public MockEnvironmentBuilder 
setExecutionConfig(ExecutionConfig executionConfig
                return this;
        }
 
+       public MockEnvironmentBuilder 
setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo){
+               this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
+               return this;
+       }
+
        public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
                this.maxParallelism = maxParallelism;
                return this;
@@ -129,6 +137,7 @@ public MockEnvironment build() {
                        parallelism,
                        subtaskIndex,
                        userCodeClassLoader,
-                       taskMetricGroup);
+                       taskMetricGroup,
+                       taskManagerRuntimeInfo);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 649c6d03a6a..2634268c947 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -117,6 +117,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2916,6 +2917,52 @@ public void testMapState() throws Exception {
                backend.dispose();
        }
 
+       /**
+        * Verify iterator of {@link MapState} supporting arbitrary access, see 
[FLINK-10267] to know more details.
+        */
+       @Test
+       public void testMapStateIteratorArbitraryAccess() throws Exception {
+               MapStateDescriptor<Integer, Long> kvId = new 
MapStateDescriptor<>("id", Integer.class, Long.class);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               try {
+                       MapState<Integer, Long> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.setCurrentKey(1);
+                       int stateSize = 4096;
+                       for (int i = 0; i < stateSize; i++) {
+                               state.put(i, i * 2L);
+                       }
+                       Iterator<Map.Entry<Integer, Long>> iterator = 
state.iterator();
+                       int iteratorCount = 0;
+                       while (iterator.hasNext()) {
+                               Map.Entry<Integer, Long> entry = 
iterator.next();
+                               assertEquals(iteratorCount, (int) 
entry.getKey());
+                               switch (ThreadLocalRandom.current().nextInt() % 
3) {
+                                       case 0: // remove twice
+                                               iterator.remove();
+                                               try {
+                                                       iterator.remove();
+                                                       fail();
+                                               } catch (IllegalStateException 
e) {
+                                                       // ignore expected 
exception
+                                               }
+                                               break;
+                                       case 1: // hasNext -> remove
+                                               iterator.hasNext();
+                                               iterator.remove();
+                                               break;
+                                       case 2: // nothing to do
+                                               break;
+                               }
+                               iteratorCount++;
+                       }
+                       assertEquals(stateSize, iteratorCount);
+               } finally {
+                       backend.dispose();
+               }
+       }
+
        /**
         * Verify that {@link ValueStateDescriptor} allows {@code null} as 
default.
         */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5c9f7f9f30c..cb656b53b1b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -498,6 +498,7 @@ public UV setValue(UV value) {
                 * have the same prefix, hence we can stop iterating once 
coming across an
                 * entry with a different prefix.
                 */
+               @Nonnull
                private final byte[] keyPrefixBytes;
 
                /**
@@ -508,6 +509,9 @@ public UV setValue(UV value) {
 
                /** A in-memory cache for the entries in the rocksdb. */
                private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
+
+               /** The entry pointing to the current position which is last 
returned by calling {@link #nextEntry()}. */
+               private RocksDBMapEntry currentEntry;
                private int cacheIndex = 0;
 
                private final TypeSerializer<UK> keySerializer;
@@ -537,12 +541,11 @@ public boolean hasNext() {
 
                @Override
                public void remove() {
-                       if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
+                       if (currentEntry == null || currentEntry.deleted) {
                                throw new IllegalStateException("The remove 
operation must be called after a valid next operation.");
                        }
 
-                       RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
-                       lastEntry.remove();
+                       currentEntry.remove();
                }
 
                final RocksDBMapEntry nextEntry() {
@@ -556,10 +559,10 @@ final RocksDBMapEntry nextEntry() {
                                return null;
                        }
 
-                       RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+                       this.currentEntry = cacheEntries.get(cacheIndex);
                        cacheIndex++;
 
-                       return entry;
+                       return currentEntry;
                }
 
                private void loadCache() {
@@ -577,12 +580,11 @@ private void loadCache() {
                        try (RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
                                /*
-                                * The iteration starts from the prefix bytes 
at the first loading. The cache then is
-                                * reloaded when the next entry to return is 
the last one in the cache. At that time,
-                                * we will start the iterating from the last 
returned entry.
-                                */
-                               RocksDBMapEntry lastEntry = cacheEntries.size() 
== 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-                               byte[] startBytes = (lastEntry == null ? 
keyPrefixBytes : lastEntry.rawKeyBytes);
+                                * The iteration starts from the prefix bytes 
at the first loading. After #nextEntry() is called,
+                                * the currentEntry points to the last returned 
entry, and at that time, we will start
+                                * the iterating from currentEntry if reloading 
cache is needed.
+                                */
+                               byte[] startBytes = (currentEntry == null ? 
keyPrefixBytes : currentEntry.rawKeyBytes);
 
                                cacheEntries.clear();
                                cacheIndex = 0;
@@ -590,10 +592,10 @@ private void loadCache() {
                                iterator.seek(startBytes);
 
                                /*
-                                * If the last returned entry is not deleted, 
it will be the first entry in the
-                                * iterating. Skip it to avoid redundant access 
in such cases.
+                                * If the entry pointing to the current 
position is not removed, it will be the first entry in the
+                                * new iterating. Skip it to avoid redundant 
access in such cases.
                                 */
-                               if (lastEntry != null && !lastEntry.deleted) {
+                               if (currentEntry != null && 
!currentEntry.deleted) {
                                        iterator.next();
                                }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 0407cc7e32a..a415a83b5d8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -30,6 +30,8 @@
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -43,6 +45,8 @@
  */
 public class RocksDBResource extends ExternalResource {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBResource.class);
+
        /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
        private final OptionsFactory optionsFactory;
 
@@ -74,11 +78,25 @@ public RocksDBResource() {
                this(new OptionsFactory() {
                        @Override
                        public DBOptions createDBOptions(DBOptions 
currentOptions) {
+                               //close it before reuse the reference.
+                               try {
+                                       currentOptions.close();
+                               } catch (Exception e) {
+                                       LOG.error("Close previous DBOptions's 
instance failed.", e);
+                               }
+
                                return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createDBOptions();
                        }
 
                        @Override
                        public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
+                               //close it before reuse the reference.
+                               try {
+                                       currentOptions.close();
+                               } catch (Exception e) {
+                                       LOG.error("Close previous DBOptions's 
instance failed.", e);
+                               }
+
                                return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions();
                        }
                });
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 480f981bc75..9c36dab75fd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -29,6 +29,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import org.slf4j.Logger;
@@ -201,6 +202,8 @@ protected JobExecutionResult executeRemotely(StreamGraph 
streamGraph, List<URL>
                configuration.setString(JobManagerOptions.ADDRESS, host);
                configuration.setInteger(JobManagerOptions.PORT, port);
 
+               configuration.setInteger(RestOptions.PORT, port);
+
                final ClusterClient<?> client;
                try {
                        if 
(CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 010628f8161..c0216e56e2e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -48,7 +48,7 @@ protected StreamContextEnvironment(ContextEnvironment ctx) {
 
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
-               Preconditions.checkNotNull("Streaming Job name should not be 
null.");
+               Preconditions.checkNotNull(jobName, "Streaming Job name should 
not be null.");
 
                StreamGraph streamGraph = this.getStreamGraph();
                streamGraph.setJobName(jobName);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
index d3ad0f9220f..591ebccea9f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -21,7 +21,7 @@
 
 /**
  * A stream data source that is executed in parallel. Upon execution, the 
runtime will
- * execute as many parallel instances of this function function as configured 
parallelism
+ * execute as many parallel instances of this function as configured 
parallelism
  * of the source.
  *
  * <p>This interface acts only as a marker to tell the system that this source 
may
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
index 94b85b69aba..46c1443664d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -22,7 +22,7 @@
 
 /**
  * Base class for implementing a parallel data source. Upon execution, the 
runtime will
- * execute as many parallel instances of this function function as configured 
parallelism
+ * execute as many parallel instances of this function as configured 
parallelism
  * of the source.
  *
  * <p>The data source has access to context information (such as the number of 
parallel
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f52168bd9b9..f3c22080ab7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,6 +72,7 @@
 
 import java.io.Closeable;
 import java.io.Serializable;
+import java.util.Locale;
 
 /**
  * Base class for all stream operators. Operators that contain a user function 
should extend the class
@@ -193,11 +194,33 @@ public void setup(StreamTask<?, ?> containingTask, 
StreamConfig config, Output<S
                                LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
                                historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
                        }
+
+                       final String configuredGranularity = 
taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+                       LatencyStats.Granularity granularity;
+                       try {
+                               granularity = 
LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+                       } catch (IllegalArgumentException iae) {
+                               granularity = LatencyStats.Granularity.OPERATOR;
+                               LOG.warn(
+                                       "Configured value {} option for {} is 
invalid. Defaulting to {}.",
+                                       configuredGranularity,
+                                       
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+                                       granularity);
+                       }
                        TaskManagerJobMetricGroup jobMetricGroup = 
this.metrics.parent().parent();
-                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"), historySize, 
container.getIndexInSubtaskGroup(), getOperatorID());
+                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"),
+                               historySize,
+                               container.getIndexInSubtaskGroup(),
+                               getOperatorID(),
+                               granularity);
                } catch (Exception e) {
                        LOG.warn("An error occurred while instantiating latency 
metrics.", e);
-                       this.latencyStats = new 
LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
 1, 0, new OperatorID());
+                       this.latencyStats = new LatencyStats(
+                               
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+                               1,
+                               0,
+                               new OperatorID(),
+                               LatencyStats.Granularity.SINGLE);
                }
 
                this.runtimeContext = new StreamingRuntimeContext(this, 
environment, container.getAccumulatorMap());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 5600d8f13cc..63dd3e4d427 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -62,12 +64,17 @@ public void run(final Object lockingObject,
 
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
 
-               LatencyMarksEmitter latencyEmitter = null;
-               if (getExecutionConfig().isLatencyTrackingEnabled()) {
+               final Configuration configuration = 
this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+               final long latencyTrackingInterval = 
getExecutionConfig().isLatencyTrackingConfigured()
+                       ? getExecutionConfig().getLatencyTrackingInterval()
+                       : configuration.getLong(MetricOptions.LATENCY_INTERVAL);
+
+               LatencyMarksEmitter<OUT> latencyEmitter = null;
+               if (latencyTrackingInterval > 0) {
                        latencyEmitter = new LatencyMarksEmitter<>(
                                getProcessingTimeService(),
                                collector,
-                               
getExecutionConfig().getLatencyTrackingInterval(),
+                               latencyTrackingInterval,
                                this.getOperatorID(),
                                getRuntimeContext().getIndexOfThisSubtask());
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
index 4f3d33ec6f9..926753dc78f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -34,23 +34,29 @@
        private final int historySize;
        private final int subtaskIndex;
        private final OperatorID operatorId;
+       private final Granularity granularity;
 
-       public LatencyStats(MetricGroup metricGroup, int historySize, int 
subtaskIndex, OperatorID operatorID) {
+       public LatencyStats(
+                       MetricGroup metricGroup,
+                       int historySize,
+                       int subtaskIndex,
+                       OperatorID operatorID,
+                       Granularity granularity) {
                this.metricGroup = metricGroup;
                this.historySize = historySize;
                this.subtaskIndex = subtaskIndex;
                this.operatorId = operatorID;
+               this.granularity = granularity;
        }
 
        public void reportLatency(LatencyMarker marker) {
-               String uniqueName =  "" + marker.getOperatorId() + 
marker.getSubtaskIndex() + operatorId + subtaskIndex;
+               final String uniqueName = 
granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
+
                DescriptiveStatisticsHistogram latencyHistogram = 
this.latencyStats.get(uniqueName);
                if (latencyHistogram == null) {
                        latencyHistogram = new 
DescriptiveStatisticsHistogram(this.historySize);
                        this.latencyStats.put(uniqueName, latencyHistogram);
-                       this.metricGroup
-                               .addGroup("source_id", 
String.valueOf(marker.getOperatorId()))
-                               .addGroup("source_subtask_index", 
String.valueOf(marker.getSubtaskIndex()))
+                       granularity.createSourceMetricGroups(metricGroup, 
marker, operatorId, subtaskIndex)
                                .addGroup("operator_id", 
String.valueOf(operatorId))
                                .addGroup("operator_subtask_index", 
String.valueOf(subtaskIndex))
                                .histogram("latency", latencyHistogram);
@@ -59,4 +65,62 @@ public void reportLatency(LatencyMarker marker) {
                long now = System.currentTimeMillis();
                latencyHistogram.update(now - marker.getMarkedTime());
        }
+
+       /**
+        * Granularity for latency metrics.
+        */
+       public enum Granularity {
+               SINGLE {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(operatorId) + 
operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base;
+                       }
+               },
+               OPERATOR {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(marker.getOperatorId()) + 
operatorId + operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base
+                                       .addGroup("source_id", 
String.valueOf(marker.getOperatorId()));
+                       }
+               },
+               SUBTASK {
+                       @Override
+                       String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex) {
+                               return String.valueOf(marker.getOperatorId()) + 
marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
+                       }
+
+                       @Override
+                       MetricGroup createSourceMetricGroups(
+                                       MetricGroup base,
+                                       LatencyMarker marker,
+                                       OperatorID operatorId,
+                                       int operatorSubtaskIndex) {
+                               return base
+                                       .addGroup("source_id", 
String.valueOf(marker.getOperatorId()))
+                                       .addGroup("source_subtask_index", 
String.valueOf(marker.getSubtaskIndex()));
+                       }
+               };
+
+               abstract String createUniqueHistogramName(LatencyMarker marker, 
OperatorID operatorId, int operatorSubtaskIndex);
+
+               abstract MetricGroup createSourceMetricGroups(MetricGroup base, 
LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
new file mode 100644
index 00000000000..60ee66f4246
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+/**
+ * Tests for the {@link RemoteStreamEnvironment}.
+ */
+public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
+
+       private static MiniCluster flink;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               final Configuration config = new Configuration();
+               config.setInteger(RestOptions.PORT, 0);
+
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(config)
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(1)
+                       .build();
+
+               flink = new MiniCluster(miniClusterConfiguration);
+
+               flink.start();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (flink != null) {
+                       flink.close();
+               }
+       }
+
+       /**
+        * Verifies that the port passed to the RemoteStreamEnvironment is used 
for connecting to the cluster.
+        */
+       @Test
+       public void testPortForwarding() throws Exception {
+               final Configuration clientConfiguration = new Configuration();
+               clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 
0);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                       flink.getRestAddress().getHost(),
+                       flink.getRestAddress().getPort(),
+                       clientConfiguration);
+
+               final DataStream<Integer> resultStream = env.fromElements(1)
+                       .map(x -> x * 2);
+
+               final Iterator<Integer> result = 
DataStreamUtils.collect(resultStream);
+               Assert.assertTrue(result.hasNext());
+               Assert.assertEquals(2, result.next().intValue());
+               Assert.assertFalse(result.hasNext());
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
new file mode 100644
index 00000000000..14d51474b57
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the emission of latency markers by {@link StreamSource} operators.
+ */
+public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {
+
+       private static final long maxProcessingTime = 100L;
+       private static final long latencyMarkInterval = 10L;
+
+       /**
+        * Verifies that by default no latency metrics are emitted.
+        */
+       @Test
+       public void testLatencyMarkEmissionDisabled() throws Exception {
+               testLatencyMarkEmission(0, (operator, timeProvider) -> {
+                       setupSourceOperator(operator, new ExecutionConfig(), 
MockEnvironment.builder().build(), timeProvider);
+               });
+       }
+
+       /**
+        * Verifies that latency metrics can be enabled via the {@link 
ExecutionConfig}.
+        */
+       @Test
+       public void testLatencyMarkEmissionEnabledViaExecutionConfig() throws 
Exception {
+               testLatencyMarkEmission((int) (maxProcessingTime / 
latencyMarkInterval) + 1, (operator, timeProvider) -> {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+                       setupSourceOperator(operator, executionConfig, 
MockEnvironment.builder().build(), timeProvider);
+               });
+       }
+
+       /**
+        * Verifies that latency metrics can be enabled via the configuration.
+        */
+       @Test
+       public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws 
Exception {
+               testLatencyMarkEmission((int) (maxProcessingTime / 
latencyMarkInterval) + 1, (operator, timeProvider) -> {
+                       Configuration tmConfig = new Configuration();
+                       tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 
latencyMarkInterval);
+
+                       Environment env = MockEnvironment.builder()
+                               .setTaskManagerRuntimeInfo(new 
TestingTaskManagerRuntimeInfo(tmConfig))
+                               .build();
+
+                       setupSourceOperator(operator, new ExecutionConfig(), 
env, timeProvider);
+               });
+       }
+
+       /**
+        * Verifies that latency metrics can be enabled via the {@link 
ExecutionConfig} even if they are disabled via
+        * the configuration.
+        */
+       @Test
+       public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() 
throws Exception {
+               testLatencyMarkEmission((int) (maxProcessingTime / 
latencyMarkInterval) + 1, (operator, timeProvider) -> {
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       
executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+                       Configuration tmConfig = new Configuration();
+                       tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
+
+                       Environment env = MockEnvironment.builder()
+                               .setTaskManagerRuntimeInfo(new 
TestingTaskManagerRuntimeInfo(tmConfig))
+                               .build();
+
+                       setupSourceOperator(operator, executionConfig, env, 
timeProvider);
+               });
+       }
+
+       /**
+        * Verifies that latency metrics can be disabled via the {@link 
ExecutionConfig} even if they are enabled via
+        * the configuration.
+        */
+       @Test
+       public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() 
throws Exception {
+               testLatencyMarkEmission(0, (operator, timeProvider) -> {
+                       Configuration tmConfig = new Configuration();
+                       tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 
latencyMarkInterval);
+
+                       Environment env = MockEnvironment.builder()
+                               .setTaskManagerRuntimeInfo(new 
TestingTaskManagerRuntimeInfo(tmConfig))
+                               .build();
+
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.setLatencyTrackingInterval(0);
+
+                       setupSourceOperator(operator, executionConfig, env, 
timeProvider);
+               });
+       }
+
+       private interface OperatorSetupOperation {
+               void setupSourceOperator(
+                       StreamSource<Long, ?> operator,
+                       TestProcessingTimeService testProcessingTimeService
+               );
+       }
+
+       private void testLatencyMarkEmission(int numberLatencyMarkers, 
OperatorSetupOperation operatorSetup) throws Exception {
+               final List<StreamElement> output = new ArrayList<>();
+
+               final TestProcessingTimeService testProcessingTimeService = new 
TestProcessingTimeService();
+               testProcessingTimeService.setCurrentTime(0L);
+               final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 
21L, maxProcessingTime);
+
+               // regular stream source operator
+               final StreamSource<Long, ProcessingTimeServiceSource> operator =
+                       new StreamSource<>(new 
ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
+
+               operatorSetup.setupSourceOperator(operator, 
testProcessingTimeService);
+
+               // run and wait to be stopped
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<Long>(output));
+
+               assertEquals(
+                       numberLatencyMarkers + 1, // + 1 is the final watermark 
element
+                       output.size());
+
+               long timestamp = 0L;
+
+               int i = 0;
+               // verify that its only latency markers + a final watermark
+               for (; i < numberLatencyMarkers; i++) {
+                       StreamElement se = output.get(i);
+                       Assert.assertTrue(se.isLatencyMarker());
+                       Assert.assertEquals(operator.getOperatorID(), 
se.asLatencyMarker().getOperatorId());
+                       Assert.assertEquals(0, 
se.asLatencyMarker().getSubtaskIndex());
+                       Assert.assertTrue(se.asLatencyMarker().getMarkedTime() 
== timestamp);
+
+                       timestamp += latencyMarkInterval;
+               }
+
+               Assert.assertTrue(output.get(i).isWatermark());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static <T> void setupSourceOperator(
+                       StreamSource<T, ?> operator,
+                       ExecutionConfig executionConfig,
+                       Environment env,
+                       ProcessingTimeService timeProvider) {
+
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               cfg.setStateBackend(new MemoryStateBackend());
+
+               cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               cfg.setOperatorID(new OperatorID());
+
+               StreamStatusMaintainer streamStatusMaintainer = 
mock(StreamStatusMaintainer.class);
+               
when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
+
+               StreamTask<?, ?> mockTask = mock(StreamTask.class);
+               when(mockTask.getName()).thenReturn("Mock Task");
+               when(mockTask.getCheckpointLock()).thenReturn(new Object());
+               when(mockTask.getConfiguration()).thenReturn(cfg);
+               when(mockTask.getEnvironment()).thenReturn(env);
+               when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+               
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, 
Accumulator<?, ?>>emptyMap());
+               
when(mockTask.getStreamStatusMaintainer()).thenReturn(streamStatusMaintainer);
+
+               doAnswer(new Answer<ProcessingTimeService>() {
+                       @Override
+                       public ProcessingTimeService answer(InvocationOnMock 
invocation) throws Throwable {
+                               if (timeProvider == null) {
+                                       throw new RuntimeException("The time 
provider is null.");
+                               }
+                               return timeProvider;
+                       }
+               }).when(mockTask).getProcessingTimeService();
+
+               operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class ProcessingTimeServiceSource implements 
SourceFunction<Long> {
+
+               private final TestProcessingTimeService processingTimeService;
+               private final List<Long> processingTimes;
+
+               private boolean cancelled = false;
+
+               private ProcessingTimeServiceSource(TestProcessingTimeService 
processingTimeService, List<Long> processingTimes) {
+                       this.processingTimeService = processingTimeService;
+                       this.processingTimes = processingTimes;
+               }
+
+               @Override
+               public void run(SourceContext<Long> ctx) throws Exception {
+                       for (Long processingTime : processingTimes) {
+                               if (cancelled) {
+                                       break;
+                               }
+
+                               
processingTimeService.setCurrentTime(processingTime);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       cancelled = true;
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
similarity index 75%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
index cf09a6ebdb8..4b5259e91c4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
@@ -43,13 +43,11 @@
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
 
-import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -63,7 +61,7 @@
  * Tests for {@link StreamSource} operators.
  */
 @SuppressWarnings("serial")
-public class StreamSourceOperatorTest {
+public class StreamSourceOperatorWatermarksTest {
 
        @Test
        public void testEmitMaxWatermarkForFiniteSource() throws Exception {
@@ -74,7 +72,7 @@ public void testEmitMaxWatermarkForFiniteSource() throws 
Exception {
 
                final List<StreamElement> output = new ArrayList<>();
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
                operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<String>(output));
 
                assertEquals(1, output.size());
@@ -90,7 +88,7 @@ public void testNoMaxWatermarkOnImmediateCancel() throws 
Exception {
                final StreamSource<String, InfiniteSource<String>> operator =
                                new StreamSource<>(new 
InfiniteSource<String>());
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
                operator.cancel();
 
                // run and exit
@@ -109,7 +107,7 @@ public void testNoMaxWatermarkOnAsyncCancel() throws 
Exception {
                final StreamSource<String, InfiniteSource<String>> operator =
                                new StreamSource<>(new 
InfiniteSource<String>());
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -141,7 +139,7 @@ public void testNoMaxWatermarkOnImmediateStop() throws 
Exception {
                final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
                operator.stop();
 
                // run and stop
@@ -159,7 +157,7 @@ public void testNoMaxWatermarkOnAsyncStop() throws 
Exception {
                final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -178,53 +176,6 @@ public void run() {
                assertTrue(output.isEmpty());
        }
 
-       /**
-        * Test that latency marks are emitted.
-        */
-       @Test
-       public void testLatencyMarkEmission() throws Exception {
-               final List<StreamElement> output = new ArrayList<>();
-
-               final long maxProcessingTime = 100L;
-               final long latencyMarkInterval = 10L;
-
-               final TestProcessingTimeService testProcessingTimeService = new 
TestProcessingTimeService();
-               testProcessingTimeService.setCurrentTime(0L);
-               final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 
21L, maxProcessingTime);
-
-               // regular stream source operator
-               final StreamSource<Long, ProcessingTimeServiceSource> operator =
-                               new StreamSource<>(new 
ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
-
-               // emit latency marks every 10 milliseconds.
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
latencyMarkInterval, testProcessingTimeService);
-
-               // run and wait to be stopped
-               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<Long>(output));
-
-               int numberLatencyMarkers = (int) (maxProcessingTime / 
latencyMarkInterval) + 1;
-
-               assertEquals(
-                       numberLatencyMarkers + 1, // + 1 is the final watermark 
element
-                       output.size());
-
-               long timestamp = 0L;
-
-               int i = 0;
-               // and that its only latency markers + a final watermark
-               for (; i < output.size() - 1; i++) {
-                       StreamElement se = output.get(i);
-                       Assert.assertTrue(se.isLatencyMarker());
-                       Assert.assertEquals(operator.getOperatorID(), 
se.asLatencyMarker().getOperatorId());
-                       Assert.assertEquals(0, 
se.asLatencyMarker().getSubtaskIndex());
-                       Assert.assertTrue(se.asLatencyMarker().getMarkedTime() 
== timestamp);
-
-                       timestamp += latencyMarkInterval;
-               }
-
-               Assert.assertTrue(output.get(i).isWatermark());
-       }
-
        @Test
        public void testAutomaticWatermarkContext() throws Exception {
 
@@ -236,7 +187,7 @@ public void testAutomaticWatermarkContext() throws 
Exception {
                TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                processingTimeService.setCurrentTime(0);
 
-               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, 0, processingTimeService);
+               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, processingTimeService);
 
                final List<StreamElement> output = new ArrayList<>();
 
@@ -271,21 +222,18 @@ public void testAutomaticWatermarkContext() throws 
Exception {
        @SuppressWarnings("unchecked")
        private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
                        TimeCharacteristic timeChar,
-                       long watermarkInterval,
-                       long latencyMarkInterval) {
-               setupSourceOperator(operator, timeChar, watermarkInterval, 
latencyMarkInterval, new TestProcessingTimeService());
+                       long watermarkInterval) {
+               setupSourceOperator(operator, timeChar, watermarkInterval, new 
TestProcessingTimeService());
        }
 
        @SuppressWarnings("unchecked")
        private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
                                                                                
                TimeCharacteristic timeChar,
                                                                                
                long watermarkInterval,
-                                                                               
                long latencyMarkInterval,
                                                                                
                final ProcessingTimeService timeProvider) {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setAutoWatermarkInterval(watermarkInterval);
-               executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStateBackend(new MemoryStateBackend());
@@ -355,33 +303,4 @@ public void stop() {
                        running = false;
                }
        }
-
-       private static final class ProcessingTimeServiceSource implements 
SourceFunction<Long> {
-
-               private final TestProcessingTimeService processingTimeService;
-               private final List<Long> processingTimes;
-
-               private boolean cancelled = false;
-
-               private ProcessingTimeServiceSource(TestProcessingTimeService 
processingTimeService, List<Long> processingTimes) {
-                       this.processingTimeService = processingTimeService;
-                       this.processingTimes = processingTimes;
-               }
-
-               @Override
-               public void run(SourceContext<Long> ctx) throws Exception {
-                       for (Long processingTime : processingTimes) {
-                               if (cancelled) {
-                                       break;
-                               }
-
-                               
processingTimeService.setCurrentTime(processingTime);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       cancelled = true;
-               }
-       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
new file mode 100644
index 00000000000..ef14dcbb96b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Tests for the {@link LatencyStats}.
+ */
+public class LatencyStatsTest extends TestLogger {
+
+       private static final OperatorID OPERATOR_ID = new OperatorID();
+       private static final OperatorID SOURCE_ID_1 = new OperatorID();
+       private static final OperatorID SOURCE_ID_2 = new OperatorID();
+
+       private static final int OPERATOR_SUBTASK_INDEX = 64;
+
+       private static final String PARENT_GROUP_NAME = "parent";
+
+       @Test
+       public void testLatencyStatsSingle() {
+               testLatencyStats(LatencyStats.Granularity.SINGLE, registrations 
-> {
+                       Assert.assertEquals(1, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0);
+                               Assert.assertEquals(5, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       @Test
+       public void testLatencyStatsOperator() {
+               testLatencyStats(LatencyStats.Granularity.OPERATOR, 
registrations -> {
+                       Assert.assertEquals(2, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0, SOURCE_ID_1);
+                               Assert.assertEquals(3, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(1);
+                               assertName(registration.f0, SOURCE_ID_2);
+                               Assert.assertEquals(2, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       @Test
+       public void testLatencyStatsSubtask() {
+               testLatencyStats(LatencyStats.Granularity.SUBTASK, 
registrations -> {
+                       Assert.assertEquals(4, registrations.size());
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(0);
+                               assertName(registration.f0, SOURCE_ID_1, 0);
+                               Assert.assertEquals(2, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(1);
+                               assertName(registration.f0, SOURCE_ID_1, 1);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(2);
+                               assertName(registration.f0, SOURCE_ID_2, 2);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+
+                       {
+                               final Tuple2<String, Histogram> registration = 
registrations.get(3);
+                               assertName(registration.f0, SOURCE_ID_2, 3);
+                               Assert.assertEquals(1, 
registration.f1.getCount());
+                       }
+               });
+       }
+
+       private static void testLatencyStats(
+               final LatencyStats.Granularity granularity,
+               final Consumer<List<Tuple2<String, Histogram>>> verifier) {
+
+               final AbstractMetricGroup<?> dummyGroup = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+               final TestMetricRegistry registry = new TestMetricRegistry();
+               final MetricGroup parentGroup = new 
GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
+
+               final LatencyStats latencyStats = new LatencyStats(
+                       parentGroup,
+                       MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
+                       OPERATOR_SUBTASK_INDEX,
+                       OPERATOR_ID,
+                       granularity);
+
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
0));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
0));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 
1));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 
2));
+               latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 
3));
+
+               verifier.accept(registry.latencyHistograms);
+       }
+
+       /**
+        * Removes all parts from the metric identifier preceding the 
latency-related parts.
+        */
+       private static String sanitizeName(final String registrationName) {
+               return 
registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + 
PARENT_GROUP_NAME.length() + 1);
+       }
+
+       private static void assertName(final String registrationName) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static void assertName(final String registrationName, final 
OperatorID sourceId) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("source_id." + sourceId +
+                       ".operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static void assertName(final String registrationName, final 
OperatorID sourceId, final int sourceIndex) {
+               final String sanitizedName = sanitizeName(registrationName);
+               Assert.assertEquals("source_id." + sourceId +
+                       ".source_subtask_index." + sourceIndex +
+                       ".operator_id." + OPERATOR_ID +
+                       ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+                       ".latency", sanitizedName);
+       }
+
+       private static class TestMetricRegistry implements MetricRegistry {
+
+               private final List<Tuple2<String, Histogram>> latencyHistograms 
= new ArrayList<>(4);
+
+               @Override
+               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+                       if (metric instanceof Histogram) {
+                               
latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), 
(Histogram) metric));
+                       }
+               }
+
+               @Override
+               public char getDelimiter() {
+                       return '.';
+               }
+
+               @Override
+               public char getDelimiter(int index) {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberReporters() {
+                       return 0;
+               }
+
+               @Override
+               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+
+               }
+
+               @Override
+               public ScopeFormats getScopeFormats() {
+                       return null;
+               }
+
+               @Nullable
+               @Override
+               public String getMetricQueryServicePath() {
+                       return null;
+               }
+       }
+}
diff --git a/tools/maven/suppressions-core.xml 
b/tools/maven/suppressions-core.xml
index db5d16a3671..5c1a914f97c 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -23,10 +23,6 @@ under the License.
        "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd";>
 
 <suppressions>
-       <suppress
-               files="(.*)api[/\\]java[/\\]functions[/\\](.*)"
-               
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
        <suppress
                files="(.*)api[/\\]java[/\\]typeutils[/\\]runtime[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -76,7 +72,7 @@ under the License.
                checks="AvoidStarImport|ArrayTypeStyle|Regexp"/>
 
        <suppress
-               
files="(.*)api[/\\]common[/\\](distributions|restartstrategy)[/\\](.*)"
+               files="(.*)api[/\\]common[/\\]distributions[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 
        <suppress
@@ -91,10 +87,6 @@ under the License.
                files="(.*)test[/\\](.*)core[/\\]io[/\\](.*)"
                checks="AvoidStarImport"/>
 
-       <suppress
-               files="(.*)migration[/\\](.*)"
-               
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
        <suppress
                files="(.*)types[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -103,11 +95,6 @@ under the License.
                files="(.*)test[/\\](.*)types[/\\](.*)"
                checks="AvoidStarImport|NeedBraces"/>
 
-       <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
-       <suppress
-               files="(.*)test[/\\](.*)util[/\\](.*)"
-               checks="UnusedImports|AvoidStarImport"/>
-
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
diff --git a/tools/maven/suppressions-runtime.xml 
b/tools/maven/suppressions-runtime.xml
index 33a92e3075f..5efc9744037 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -44,13 +44,6 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)runtime[/\\]clusterframework[/\\](.*)"
                checks="AvoidStarImport"/>
-       <suppress
-               files="(.*)runtime[/\\]concurrent[/\\](.*)"
-               
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-       <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
-       <suppress
-               files="(.*)test[/\\](.*)runtime[/\\]concurrent[/\\](.*)"
-               checks="AvoidStarImport"/>
        <suppress
                files="(.*)runtime[/\\]execution[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -80,18 +73,18 @@ under the License.
                files="(.*)test[/\\](.*)runtime[/\\]instance[/\\](.*)"
                checks="AvoidStarImport"/>
        <suppress
-               files="(.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
+               files="(.*)runtime[/\\]io[/\\]disk[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
        <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
        <suppress
-               
files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
+               files="(.*)test[/\\](.*)runtime[/\\]io[/\\]disk[/\\](.*)"
                checks="AvoidStarImport|UnusedImports"/>
        <suppress
-               
files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+               
files="(.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
        <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
        <suppress
-               
files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+               
files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
                checks="AvoidStarImport|UnusedImports"/>
        <!--Test class copied from the netty project-->
        <suppress
@@ -132,13 +125,6 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)runtime[/\\]messages[/\\](.*)"
                checks="AvoidStarImport"/>
-       <suppress
-               files="(.*)runtime[/\\]minicluster[/\\](.*)"
-               
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-       <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
-       <suppress
-               files="(.*)test[/\\](.*)runtime[/\\]minicluster[/\\](.*)"
-               checks="AvoidStarImport"/>
        <suppress
                files="(.*)runtime[/\\]operators[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -184,10 +170,6 @@ under the License.
        <suppress
                files="(.*)runtime[/\\]testutils[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-       <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
-       <suppress
-               files="(.*)test[/\\](.*)runtime[/\\]testutils[/\\](.*)"
-               checks="AvoidStarImport"/>
        <suppress
                files="(.*)runtime[/\\]util[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -198,10 +180,6 @@ under the License.
        <suppress
                files="(.*)runtime[/\\]zookeeper[/\\](.*)"
                
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-       <!--Only additional checks for test sources. Those checks were present 
in the "pre-strict" checkstyle but were not applied to test sources. We do not 
want to suppress them for sources directory-->
-       <suppress
-               files="(.*)test[/\\](.*)runtime[/\\]zookeeper[/\\](.*)"
-               checks="AvoidStarImport"/>
        <suppress
                files="(.*)StateBackendTestBase.java"
                checks="FileLength"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> -------------------------------------------------------
>
>                 Key: FLINK-10205
>                 URL: https://issues.apache.org/jira/browse/FLINK-10205
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>            Reporter: JIN SUN
>            Assignee: JIN SUN
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to