[flink] Git Push Summary

2018-02-28 Thread tzulitai
Repository: flink
Updated Tags:  refs/tags/release-1.4.2-rc2 [created] 0f4978eb6


[5/5] flink git commit: [FLINK-7805][flip6] Recover YARN containers after AM restart.

2018-02-28 Thread trohrmann
[FLINK-7805][flip6] Recover YARN containers after AM restart.

Recover previously running containers after a restart of the ApplicationMaster.
This is a port of a feature that was already implemented prior to FLIP-6.
Extract RegisterApplicationMasterResponseReflector class into separate file.

This closes #5597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e92eb391
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e92eb391
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e92eb391

Branch: refs/heads/release-1.5
Commit: e92eb391cada27b49d32e7b11453fb5f06e19880
Parents: 94bbd56
Author: gyao 
Authored: Wed Feb 28 13:20:23 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:43:39 2018 +0100

--
 ...isterApplicationMasterResponseReflector.java | 102 
 .../flink/yarn/YarnFlinkResourceManager.java|  52 -
 .../apache/flink/yarn/YarnResourceManager.java  |  17 ++-
 ...rApplicationMasterResponseReflectorTest.java | 117 +++
 4 files changed, 235 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e92eb391/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
new file mode 100644
index 000..13b5745
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Looks up the method {@link 
RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}
+ * once and saves the method. This saves computation time on subsequent calls.
+ */
+class RegisterApplicationMasterResponseReflector {
+
+   private final Logger logger;
+
+   /**
+* Reflected method {@link 
RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
+*/
+   private Method method;
+
+   RegisterApplicationMasterResponseReflector(final Logger log) {
+   this(log, RegisterApplicationMasterResponse.class);
+   }
+
+   @VisibleForTesting
+   RegisterApplicationMasterResponseReflector(final Logger log, final 
Class clazz) {
+   this.logger = requireNonNull(log);
+   requireNonNull(clazz);
+
+   try {
+   method = 
clazz.getMethod("getContainersFromPreviousAttempts");
+   } catch (NoSuchMethodException e) {
+   // that happens in earlier Hadoop versions (pre 2.2)
+   logger.info("Cannot reconnect to previously allocated 
containers. " +
+   "This YARN version does not support 
'getContainersFromPreviousAttempts()'");
+   }
+   }
+
+   /**
+* Checks if a YARN application still has registered containers. If the 
application master
+* registered at the ResourceManager for the first time, this list will 
be empty. If the
+* application master registered a repeated time (after a failure and 
recovery), this list
+* will contain the containers that were previously allocated.
+*
+* @param response The response object from the registration at the 
ResourceManager.
+* @return A list with containers from previous application attempt.
+*/
+   List getContainersFromPrevio

[2/5] flink git commit: [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the

2018-02-28 Thread trohrmann
[hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb37502
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb37502
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb37502

Branch: refs/heads/release-1.5
Commit: adb3750226971f7c67a0d3069103b56e4fee1c27
Parents: b724792
Author: gyao 
Authored: Wed Feb 28 13:07:04 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:43:38 2018 +0100

--
 .../src/test/java/org/apache/flink/yarn/YarnTestBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/adb37502/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
--
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index c863e14..7bca321 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -102,7 +102,7 @@ public abstract class YarnTestBase extends TestLogger {
"Started SelectChannelConnector@0.0.0.0:8081" // Jetty 
should start on a random port in YARN mode.
};
 
-   /** These strings are white-listed, overriding teh prohibited strings. 
*/
+   /** These strings are white-listed, overriding the prohibited strings. 
*/
protected static final String[] WHITELISTED_STRINGS = {
"akka.remote.RemoteTransportExceptionNoStackTrace",
// workaround for annoying InterruptedException logging:



[4/5] flink git commit: [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase

2018-02-28 Thread trohrmann
[hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase

Test swapped actual and expected arguments.
Remove catching Throwable in test; instead propagate all exceptions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94bbd564
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94bbd564
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94bbd564

Branch: refs/heads/release-1.5
Commit: 94bbd564ce5214b3366cc6d299fcb99ae62a2bd8
Parents: adb3750
Author: gyao 
Authored: Wed Feb 28 13:08:25 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:43:38 2018 +0100

--
 .../flink/yarn/YARNSessionCapacitySchedulerITCase.java   | 11 ---
 1 file changed, 4 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/94bbd564/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
--
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 3a674ad..d00a9c4 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -416,7 +416,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 * Test a fire-and-forget job submission to a YARN cluster.
 */
@Test(timeout = 6)
-   public void testDetachedPerJobYarnCluster() throws IOException {
+   public void testDetachedPerJobYarnCluster() throws Exception {
LOG.info("Starting testDetachedPerJobYarnCluster()");
 
File exampleJarLocation = new 
File("target/programs/BatchWordCount.jar");
@@ -432,7 +432,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 * Test a fire-and-forget job submission to a YARN cluster.
 */
@Test(timeout = 6)
-   public void testDetachedPerJobYarnClusterWithStreamingJob() throws 
IOException {
+   public void testDetachedPerJobYarnClusterWithStreamingJob() throws 
Exception {
LOG.info("Starting 
testDetachedPerJobYarnClusterWithStreamingJob()");
 
File exampleJarLocation = new 
File("target/programs/StreamingWordCount.jar");
@@ -444,7 +444,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
LOG.info("Finished 
testDetachedPerJobYarnClusterWithStreamingJob()");
}
 
-   private void testDetachedPerJobYarnClusterInternal(String job) throws 
IOException {
+   private void testDetachedPerJobYarnClusterInternal(String job) throws 
Exception {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
yc.start();
@@ -575,9 +575,6 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
} while (rep.getYarnApplicationState() == 
YarnApplicationState.RUNNING);
 
verifyApplicationTags(rep);
-   } catch (Throwable t) {
-   LOG.warn("Error while detached yarn session was 
running", t);
-   Assert.fail(t.getMessage());
} finally {
 
//cleanup the yarn-properties file
@@ -625,7 +622,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
@SuppressWarnings("unchecked")
Set applicationTags = (Set) 
applicationTagsMethod.invoke(report);
 
-   Assert.assertEquals(applicationTags, 
Collections.singleton("test-tag"));
+   Assert.assertEquals(Collections.singleton("test-tag"), 
applicationTags);
}
 
@After



[3/5] flink git commit: [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the

2018-02-28 Thread trohrmann
[hotfix][Javadoc] Fix typo in YARN Utils: teh -> the


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7247929
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7247929
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7247929

Branch: refs/heads/release-1.5
Commit: b7247929d0745b3b83306d0c93d97faf4ece4107
Parents: f60e46d
Author: gyao 
Authored: Wed Feb 28 13:06:00 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:43:38 2018 +0100

--
 flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b7247929/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
--
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 9ae5b54..ff2478e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -320,7 +320,7 @@ public final class Utils {
 *
 * @return The launch context for the TaskManager processes.
 *
-* @throws Exception Thrown if teh launch context could not be created, 
for example if
+* @throws Exception Thrown if the launch context could not be created, 
for example if
 * the resources could not be copied.
 */
static ContainerLaunchContext createTaskExecutorContext(



[1/5] flink git commit: [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService

2018-02-28 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.5 26c8f6c2a -> e92eb391c


[hotfix] Add missing space to log message in ZooKeeperLeaderElectionService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f60e46da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f60e46da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f60e46da

Branch: refs/heads/release-1.5
Commit: f60e46dafa8950d5e40cd8a3286c172ecaea6b73
Parents: 26c8f6c
Author: gyao 
Authored: Wed Feb 28 13:04:19 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:43:37 2018 +0100

--
 .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f60e46da/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 59d3592..dc0f3ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -199,7 +199,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
}
}
} else {
-   LOG.warn("The leader session ID {} was confirmed even 
though the" +
+   LOG.warn("The leader session ID {} was confirmed even 
though the " +
"corresponding JobManager was not 
elected as the leader.", leaderSessionID);
}
}



[2/5] flink git commit: [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the

2018-02-28 Thread trohrmann
[hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f409f920
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f409f920
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f409f920

Branch: refs/heads/master
Commit: f409f9208f340a6b741818a56a9a07b236fb0a7f
Parents: 1987445
Author: gyao 
Authored: Wed Feb 28 13:07:04 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:37:13 2018 +0100

--
 .../src/test/java/org/apache/flink/yarn/YarnTestBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f409f920/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
--
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index c863e14..7bca321 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -102,7 +102,7 @@ public abstract class YarnTestBase extends TestLogger {
"Started SelectChannelConnector@0.0.0.0:8081" // Jetty 
should start on a random port in YARN mode.
};
 
-   /** These strings are white-listed, overriding teh prohibited strings. 
*/
+   /** These strings are white-listed, overriding the prohibited strings. 
*/
protected static final String[] WHITELISTED_STRINGS = {
"akka.remote.RemoteTransportExceptionNoStackTrace",
// workaround for annoying InterruptedException logging:



[5/5] flink git commit: [FLINK-7805][flip6] Recover YARN containers after AM restart.

2018-02-28 Thread trohrmann
[FLINK-7805][flip6] Recover YARN containers after AM restart.

Recover previously running containers after a restart of the ApplicationMaster.
This is a port of a feature that was already implemented prior to FLIP-6.
Extract RegisterApplicationMasterResponseReflector class into separate file.

This closes #5597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45397fe9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45397fe9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45397fe9

Branch: refs/heads/master
Commit: 45397fe974e1390cd39a34fc2eb216f3771ddf06
Parents: 035257e
Author: gyao 
Authored: Wed Feb 28 13:20:23 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:41:53 2018 +0100

--
 ...isterApplicationMasterResponseReflector.java | 102 
 .../flink/yarn/YarnFlinkResourceManager.java|  52 -
 .../apache/flink/yarn/YarnResourceManager.java  |  17 ++-
 ...rApplicationMasterResponseReflectorTest.java | 117 +++
 4 files changed, 235 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
new file mode 100644
index 000..13b5745
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Looks up the method {@link 
RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}
+ * once and saves the method. This saves computation time on subsequent calls.
+ */
+class RegisterApplicationMasterResponseReflector {
+
+   private final Logger logger;
+
+   /**
+* Reflected method {@link 
RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
+*/
+   private Method method;
+
+   RegisterApplicationMasterResponseReflector(final Logger log) {
+   this(log, RegisterApplicationMasterResponse.class);
+   }
+
+   @VisibleForTesting
+   RegisterApplicationMasterResponseReflector(final Logger log, final 
Class clazz) {
+   this.logger = requireNonNull(log);
+   requireNonNull(clazz);
+
+   try {
+   method = 
clazz.getMethod("getContainersFromPreviousAttempts");
+   } catch (NoSuchMethodException e) {
+   // that happens in earlier Hadoop versions (pre 2.2)
+   logger.info("Cannot reconnect to previously allocated 
containers. " +
+   "This YARN version does not support 
'getContainersFromPreviousAttempts()'");
+   }
+   }
+
+   /**
+* Checks if a YARN application still has registered containers. If the 
application master
+* registered at the ResourceManager for the first time, this list will 
be empty. If the
+* application master registered a repeated time (after a failure and 
recovery), this list
+* will contain the containers that were previously allocated.
+*
+* @param response The response object from the registration at the 
ResourceManager.
+* @return A list with containers from previous application attempt.
+*/
+   List getContainersFromPreviousAtt

[3/5] flink git commit: [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the

2018-02-28 Thread trohrmann
[hotfix][Javadoc] Fix typo in YARN Utils: teh -> the


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19874459
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19874459
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19874459

Branch: refs/heads/master
Commit: 19874459d5fc750158eea9dcc54be81d5d4e08c9
Parents: b19769d
Author: gyao 
Authored: Wed Feb 28 13:06:00 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:37:13 2018 +0100

--
 flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/19874459/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
--
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 9ae5b54..ff2478e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -320,7 +320,7 @@ public final class Utils {
 *
 * @return The launch context for the TaskManager processes.
 *
-* @throws Exception Thrown if teh launch context could not be created, 
for example if
+* @throws Exception Thrown if the launch context could not be created, 
for example if
 * the resources could not be copied.
 */
static ContainerLaunchContext createTaskExecutorContext(



[4/5] flink git commit: [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase

2018-02-28 Thread trohrmann
[hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase

Test swapped actual and expected arguments.
Remove catching Throwable in test; instead propagate all exceptions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/035257e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/035257e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/035257e4

Branch: refs/heads/master
Commit: 035257e425133de9368bbef268c38f179bdc68c2
Parents: f409f92
Author: gyao 
Authored: Wed Feb 28 13:08:25 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:37:14 2018 +0100

--
 .../flink/yarn/YARNSessionCapacitySchedulerITCase.java   | 11 ---
 1 file changed, 4 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/035257e4/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
--
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 3a674ad..d00a9c4 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -416,7 +416,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 * Test a fire-and-forget job submission to a YARN cluster.
 */
@Test(timeout = 6)
-   public void testDetachedPerJobYarnCluster() throws IOException {
+   public void testDetachedPerJobYarnCluster() throws Exception {
LOG.info("Starting testDetachedPerJobYarnCluster()");
 
File exampleJarLocation = new 
File("target/programs/BatchWordCount.jar");
@@ -432,7 +432,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 * Test a fire-and-forget job submission to a YARN cluster.
 */
@Test(timeout = 6)
-   public void testDetachedPerJobYarnClusterWithStreamingJob() throws 
IOException {
+   public void testDetachedPerJobYarnClusterWithStreamingJob() throws 
Exception {
LOG.info("Starting 
testDetachedPerJobYarnClusterWithStreamingJob()");
 
File exampleJarLocation = new 
File("target/programs/StreamingWordCount.jar");
@@ -444,7 +444,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
LOG.info("Finished 
testDetachedPerJobYarnClusterWithStreamingJob()");
}
 
-   private void testDetachedPerJobYarnClusterInternal(String job) throws 
IOException {
+   private void testDetachedPerJobYarnClusterInternal(String job) throws 
Exception {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
yc.start();
@@ -575,9 +575,6 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
} while (rep.getYarnApplicationState() == 
YarnApplicationState.RUNNING);
 
verifyApplicationTags(rep);
-   } catch (Throwable t) {
-   LOG.warn("Error while detached yarn session was 
running", t);
-   Assert.fail(t.getMessage());
} finally {
 
//cleanup the yarn-properties file
@@ -625,7 +622,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
@SuppressWarnings("unchecked")
Set applicationTags = (Set) 
applicationTagsMethod.invoke(report);
 
-   Assert.assertEquals(applicationTags, 
Collections.singleton("test-tag"));
+   Assert.assertEquals(Collections.singleton("test-tag"), 
applicationTags);
}
 
@After



[1/5] flink git commit: [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService

2018-02-28 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 273fea4aa -> 45397fe97


[hotfix] Add missing space to log message in ZooKeeperLeaderElectionService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b19769db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b19769db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b19769db

Branch: refs/heads/master
Commit: b19769db7f916b2eb40a9a68c0ee60d0b27da0a2
Parents: 273fea4
Author: gyao 
Authored: Wed Feb 28 13:04:19 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 18:37:12 2018 +0100

--
 .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b19769db/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 59d3592..dc0f3ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -199,7 +199,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
}
}
} else {
-   LOG.warn("The leader session ID {} was confirmed even 
though the" +
+   LOG.warn("The leader session ID {} was confirmed even 
though the " +
"corresponding JobManager was not 
elected as the leader.", leaderSessionID);
}
}



flink git commit: [hotfix] [tests] Fix SelfConnectionITCase

2018-02-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.5 926566651 -> 26c8f6c2a


[hotfix] [tests] Fix SelfConnectionITCase

The test previously did not fail on failed execution, and thus evaluated 
incomplete results
from a failed execution with th expected results.

This cleans up serialization warnings and uses lambdas where possible, to make 
the code
more readable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26c8f6c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26c8f6c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26c8f6c2

Branch: refs/heads/release-1.5
Commit: 26c8f6c2a3ff75ffb954c816a57908318a2d8099
Parents: 9265666
Author: Stephan Ewen 
Authored: Wed Feb 28 12:15:30 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 17:54:11 2018 +0100

--
 .../streaming/runtime/SelfConnectionITCase.java | 63 
 1 file changed, 12 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/26c8f6c2/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
index b302513..a8023a0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
@@ -18,8 +18,6 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -38,6 +36,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Integration tests for connected streams.
  */
+@SuppressWarnings("serial")
 public class SelfConnectionITCase extends AbstractTestBase {
 
/**
@@ -46,26 +45,17 @@ public class SelfConnectionITCase extends AbstractTestBase {
@Test
public void differentDataStreamSameChain() throws Exception {
 
-   TestListResultSink resultSink = new 
TestListResultSink();
+   TestListResultSink resultSink = new 
TestListResultSink<>();
 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
 
DataStream src = env.fromElements(1, 3, 5);
 
-   DataStream stringMap = src.map(new MapFunction() {
-   private static final long serialVersionUID = 1L;
-
-   @Override
-   public String map(Integer value) throws Exception {
-   return "x " + value;
-   }
-   });
+   DataStream stringMap = src.map(value -> "x " + value);
 
stringMap.connect(src).map(new CoMapFunction() {
 
-   private static final long serialVersionUID = 1L;
-
@Override
public String map1(String value) {
return value;
@@ -94,55 +84,30 @@ public class SelfConnectionITCase extends AbstractTestBase {
 * (This is not actually self-connect.)
 */
@Test
-   public void differentDataStreamDifferentChain() {
+   public void differentDataStreamDifferentChain() throws Exception {
 
-   TestListResultSink resultSink = new 
TestListResultSink();
+   TestListResultSink resultSink = new 
TestListResultSink<>();
 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
 
DataStream src = env.fromElements(1, 3, 
5).disableChaining();
 
-   DataStream stringMap = src.flatMap(new 
FlatMapFunction() {
-
-   private static final long serialVersionUID = 1L;
+   DataStream stringMap = src
+   .flatMap(new FlatMapFunction() 
{
 
@Override
public void flatMap(Integer value, Collector 
out) throws Exception {
out.collect("x " + value);
}
-   }).keyBy(new KeySelector() {
-
-   private static final long serialVersionUID = 1L;
-
-   @Overri

flink git commit: [hotfix] [tests] Fix SelfConnectionITCase

2018-02-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master e8de53817 -> 273fea4aa


[hotfix] [tests] Fix SelfConnectionITCase

The test previously did not fail on failed execution, and thus evaluated 
incomplete results
from a failed execution with th expected results.

This cleans up serialization warnings and uses lambdas where possible, to make 
the code
more readable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273fea4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273fea4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273fea4a

Branch: refs/heads/master
Commit: 273fea4aa36123c0501845e4bc3f8777f203e596
Parents: e8de538
Author: Stephan Ewen 
Authored: Wed Feb 28 12:15:30 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 17:48:32 2018 +0100

--
 .../streaming/runtime/SelfConnectionITCase.java | 63 
 1 file changed, 12 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/273fea4a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
index b302513..a8023a0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
@@ -18,8 +18,6 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -38,6 +36,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Integration tests for connected streams.
  */
+@SuppressWarnings("serial")
 public class SelfConnectionITCase extends AbstractTestBase {
 
/**
@@ -46,26 +45,17 @@ public class SelfConnectionITCase extends AbstractTestBase {
@Test
public void differentDataStreamSameChain() throws Exception {
 
-   TestListResultSink resultSink = new 
TestListResultSink();
+   TestListResultSink resultSink = new 
TestListResultSink<>();
 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
 
DataStream src = env.fromElements(1, 3, 5);
 
-   DataStream stringMap = src.map(new MapFunction() {
-   private static final long serialVersionUID = 1L;
-
-   @Override
-   public String map(Integer value) throws Exception {
-   return "x " + value;
-   }
-   });
+   DataStream stringMap = src.map(value -> "x " + value);
 
stringMap.connect(src).map(new CoMapFunction() {
 
-   private static final long serialVersionUID = 1L;
-
@Override
public String map1(String value) {
return value;
@@ -94,55 +84,30 @@ public class SelfConnectionITCase extends AbstractTestBase {
 * (This is not actually self-connect.)
 */
@Test
-   public void differentDataStreamDifferentChain() {
+   public void differentDataStreamDifferentChain() throws Exception {
 
-   TestListResultSink resultSink = new 
TestListResultSink();
+   TestListResultSink resultSink = new 
TestListResultSink<>();
 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
 
DataStream src = env.fromElements(1, 3, 
5).disableChaining();
 
-   DataStream stringMap = src.flatMap(new 
FlatMapFunction() {
-
-   private static final long serialVersionUID = 1L;
+   DataStream stringMap = src
+   .flatMap(new FlatMapFunction() 
{
 
@Override
public void flatMap(Integer value, Collector 
out) throws Exception {
out.collect("x " + value);
}
-   }).keyBy(new KeySelector() {
-
-   private static final long serialVersionUID = 1L;
-
-   @Override
-  

[2/2] flink git commit: [FLINK-8737][network] disallow creating a union of UnionInputGate instances

2018-02-28 Thread srichter
[FLINK-8737][network] disallow creating a union of UnionInputGate instances

Recently, the pollNextBufferOrEvent() was added but not implemented but this is
used in getNextBufferOrEvent() and thus any UnionInputGate containing a 
UnionInputGate
would have failed already. There should be no use case for wiring up inputs
this way. Therefore, fail early when trying to construct this.

(cherry picked from commit e8de538)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92656665
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92656665
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92656665

Branch: refs/heads/release-1.5
Commit: 9265666517830350a4a7037029e347f33df1bea2
Parents: 18ff2ce
Author: Nico Kruber 
Authored: Mon Feb 26 17:52:37 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:35:13 2018 +0100

--
 .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/92656665/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 44cdd52..742592a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * ++
  * 
  *
- * It is possible to recursively union union input gates.
+ * It is NOT possible to recursively union union input gates.
  */
 public class UnionInputGate implements InputGate, InputGateListener {
 
@@ -103,6 +103,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
int currentNumberOfInputChannels = 0;
 
for (InputGate inputGate : inputGates) {
+   if (inputGate instanceof UnionInputGate) {
+   // if we want to add support for this, we need 
to implement pollNextBufferOrEvent()
+   throw new UnsupportedOperationException("Cannot 
union a union of input gates.");
+   }
+
// The offset to use for buffer or event instances 
received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
inputGatesWithRemainingData.add(inputGate);



[1/2] flink git commit: [hotfix][network] minor improvements in UnionInputGate

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.5 f14532760 -> 926566651


[hotfix][network] minor improvements in UnionInputGate

(cherry picked from commit 4203557)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18ff2ce1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18ff2ce1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18ff2ce1

Branch: refs/heads/release-1.5
Commit: 18ff2ce15bdb1e7bd246e438e47527a24559c86d
Parents: f145327
Author: Nico Kruber 
Authored: Mon Feb 26 17:50:10 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:35:06 2018 +0100

--
 .../io/network/partition/consumer/UnionInputGate.java | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/18ff2ce1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 393e087..44cdd52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * Input gate wrapper to union the input from multiple input gates.
  *
- *  Each input gate has input channels attached from which it reads data. 
At each input gate, the
+ * Each input gate has input channels attached from which it reads data. At 
each input gate, the
  * input channels have unique IDs from 0 (inclusive) to the number of input 
channels (exclusive).
  *
  * 
@@ -49,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * +--+--+
  * 
  *
- * The union input gate maps these IDs from 0 to the *total* number of input 
channels across all
+ * The union input gate maps these IDs from 0 to the *total* number of 
input channels across all
  * unioned input gates, e.g. the channels of input gate 0 keep their original 
indexes and the
  * channel indexes of input gate 1 are set off by 2 to 2--4.
  *
@@ -183,11 +183,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
 
-   return Optional.ofNullable(bufferOrEvent);
+   return Optional.of(bufferOrEvent);
}
 
@Override
-   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
throw new UnsupportedOperationException();
}
 
@@ -217,7 +217,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
private final BufferOrEvent bufferOrEvent;
private final boolean moreInputGatesAvailable;
 
-   public InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
+   InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
this.inputGate = checkNotNull(inputGate);
this.bufferOrEvent = checkNotNull(bufferOrEvent);
this.moreInputGatesAvailable = moreInputGatesAvailable;



[2/2] flink git commit: [FLINK-8737][network] disallow creating a union of UnionInputGate instances

2018-02-28 Thread srichter
[FLINK-8737][network] disallow creating a union of UnionInputGate instances

Recently, the pollNextBufferOrEvent() was added but not implemented but this is
used in getNextBufferOrEvent() and thus any UnionInputGate containing a 
UnionInputGate
would have failed already. There should be no use case for wiring up inputs
this way. Therefore, fail early when trying to construct this.

This closes #5583.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8de5381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8de5381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8de5381

Branch: refs/heads/master
Commit: e8de53817c60917776c7264623cf8adfd7886f93
Parents: 4203557
Author: Nico Kruber 
Authored: Mon Feb 26 17:52:37 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:33:48 2018 +0100

--
 .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e8de5381/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 44cdd52..742592a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * ++
  * 
  *
- * It is possible to recursively union union input gates.
+ * It is NOT possible to recursively union union input gates.
  */
 public class UnionInputGate implements InputGate, InputGateListener {
 
@@ -103,6 +103,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
int currentNumberOfInputChannels = 0;
 
for (InputGate inputGate : inputGates) {
+   if (inputGate instanceof UnionInputGate) {
+   // if we want to add support for this, we need 
to implement pollNextBufferOrEvent()
+   throw new UnsupportedOperationException("Cannot 
union a union of input gates.");
+   }
+
// The offset to use for buffer or event instances 
received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), 
currentNumberOfInputChannels);
inputGatesWithRemainingData.add(inputGate);



[1/2] flink git commit: [hotfix][network] minor improvements in UnionInputGate

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master 6165b3db5 -> e8de53817


[hotfix][network] minor improvements in UnionInputGate


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42035571
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42035571
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42035571

Branch: refs/heads/master
Commit: 420355715e43d5c870bd7f89808430fc959cedc4
Parents: 6165b3d
Author: Nico Kruber 
Authored: Mon Feb 26 17:50:10 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:33:14 2018 +0100

--
 .../io/network/partition/consumer/UnionInputGate.java | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/42035571/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 393e087..44cdd52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * Input gate wrapper to union the input from multiple input gates.
  *
- *  Each input gate has input channels attached from which it reads data. 
At each input gate, the
+ * Each input gate has input channels attached from which it reads data. At 
each input gate, the
  * input channels have unique IDs from 0 (inclusive) to the number of input 
channels (exclusive).
  *
  * 
@@ -49,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * +--+--+
  * 
  *
- * The union input gate maps these IDs from 0 to the *total* number of input 
channels across all
+ * The union input gate maps these IDs from 0 to the *total* number of 
input channels across all
  * unioned input gates, e.g. the channels of input gate 0 keep their original 
indexes and the
  * channel indexes of input gate 1 are set off by 2 to 2--4.
  *
@@ -183,11 +183,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
 
-   return Optional.ofNullable(bufferOrEvent);
+   return Optional.of(bufferOrEvent);
}
 
@Override
-   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
throw new UnsupportedOperationException();
}
 
@@ -217,7 +217,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
private final BufferOrEvent bufferOrEvent;
private final boolean moreInputGatesAvailable;
 
-   public InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
+   InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
this.inputGate = checkNotNull(inputGate);
this.bufferOrEvent = checkNotNull(bufferOrEvent);
this.moreInputGatesAvailable = moreInputGatesAvailable;



[1/2] flink git commit: [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.5 d5338c415 -> f14532760


[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly

(cherry picked from commit 6e9e0dd)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32384ed9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32384ed9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32384ed9

Branch: refs/heads/release-1.5
Commit: 32384ed9b00cf0e1961d355dc4080f25a2156e58
Parents: d5338c4
Author: Zhijiang 
Authored: Thu Feb 22 15:41:38 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:24:39 2018 +0100

--
 .../partition/consumer/RemoteInputChannel.java  |  6 +++
 .../consumer/RemoteInputChannelTest.java| 42 +++-
 2 files changed, 37 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/32384ed9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 8174359..990166f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -337,6 +337,11 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
return numRequiredBuffers - initialCredit;
}
 
+   @VisibleForTesting
+   public boolean isWaitingForFloatingBuffers() {
+   return isWaitingForFloatingBuffers;
+   }
+
/**
 * The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
 * currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
@@ -362,6 +367,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   isWaitingForFloatingBuffers = false;
buffer.recycleBuffer();
return false;
}

http://git-wip-us.apache.org/repos/asf/flink/blob/32384ed9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 7c8ed18..e3e6623 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -357,6 +357,7 @@ public class RemoteInputChannelTest {
16, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
// Increase the backlog
inputChannel.onSenderBacklog(16);
@@ -370,11 +371,12 @@ public class RemoteInputChannelTest {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
-   // Recycle one floating buffer
-   floatingBufferQueue.poll().recycleBuffer();
+   // Recycle one exclusive buffer
+   exclusiveBuffer.recycleBuffer();
 
-   // Assign the floating buffer to the listener and the 
channel is still waiting for more floating buffers
+  

[2/2] flink git commit: [hotfix] Fix package private and comments

2018-02-28 Thread srichter
[hotfix] Fix package private and comments

(cherry picked from commit 6165b3d)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1453276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1453276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1453276

Branch: refs/heads/release-1.5
Commit: f1453276095c55264f7b4097d16e2987a44b3f33
Parents: 32384ed
Author: Zhijiang 
Authored: Fri Feb 23 02:55:57 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:24:43 2018 +0100

--
 .../partition/consumer/RemoteInputChannel.java|  2 +-
 .../consumer/RemoteInputChannelTest.java  | 18 +-
 2 files changed, 10 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f1453276/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 990166f..0f70d44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -338,7 +338,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
}
 
@VisibleForTesting
-   public boolean isWaitingForFloatingBuffers() {
+   boolean isWaitingForFloatingBuffers() {
return isWaitingForFloatingBuffers;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1453276/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index e3e6623..97a5688 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -355,7 +355,7 @@ public class RemoteInputChannelTest {
13, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 16 buffers required in 
the channel",
16, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -369,7 +369,7 @@ public class RemoteInputChannelTest {
13, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -383,7 +383,7 @@ public class RemoteInputChannelTest {
14, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -397,7 +397,7 @@ public class RemoteInputChannelTest {
15, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assert

[2/2] flink git commit: [hotfix] Fix package private and comments

2018-02-28 Thread srichter
[hotfix] Fix package private and comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6165b3db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6165b3db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6165b3db

Branch: refs/heads/master
Commit: 6165b3db5587170bed1a40bb1e5f2f3613f24e3f
Parents: 6e9e0dd
Author: Zhijiang 
Authored: Fri Feb 23 09:55:57 2018 +0800
Committer: Stefan Richter 
Committed: Wed Feb 28 17:23:06 2018 +0100

--
 .../partition/consumer/RemoteInputChannel.java|  2 +-
 .../consumer/RemoteInputChannelTest.java  | 18 +-
 2 files changed, 10 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6165b3db/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 990166f..0f70d44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -338,7 +338,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
}
 
@VisibleForTesting
-   public boolean isWaitingForFloatingBuffers() {
+   boolean isWaitingForFloatingBuffers() {
return isWaitingForFloatingBuffers;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6165b3db/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index e3e6623..97a5688 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -355,7 +355,7 @@ public class RemoteInputChannelTest {
13, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 16 buffers required in 
the channel",
16, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -369,7 +369,7 @@ public class RemoteInputChannelTest {
13, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -383,7 +383,7 @@ public class RemoteInputChannelTest {
14, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool",
+   assertEquals("There should be 0 buffers available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
@@ -397,7 +397,7 @@ public class RemoteInputChannelTest {
15, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 18 buffers required in 
the channel",
18, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available

[1/2] flink git commit: [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master b9b7416f4 -> 6165b3db5


[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly

This closes #5558.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e9e0dd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e9e0dd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e9e0dd6

Branch: refs/heads/master
Commit: 6e9e0dd6eb3c4fdb1168cc4e294d9fa52641ddb0
Parents: b9b7416
Author: Zhijiang 
Authored: Thu Feb 22 22:41:38 2018 +0800
Committer: Stefan Richter 
Committed: Wed Feb 28 17:22:47 2018 +0100

--
 .../partition/consumer/RemoteInputChannel.java  |  6 +++
 .../consumer/RemoteInputChannelTest.java| 42 +++-
 2 files changed, 37 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6e9e0dd6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 8174359..990166f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -337,6 +337,11 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
return numRequiredBuffers - initialCredit;
}
 
+   @VisibleForTesting
+   public boolean isWaitingForFloatingBuffers() {
+   return isWaitingForFloatingBuffers;
+   }
+
/**
 * The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
 * currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
@@ -362,6 +367,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
// Important: double check the isReleased state inside 
synchronized block, so there is no
// race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+   isWaitingForFloatingBuffers = false;
buffer.recycleBuffer();
return false;
}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e9e0dd6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 7c8ed18..e3e6623 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -357,6 +357,7 @@ public class RemoteInputChannelTest {
16, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
// Increase the backlog
inputChannel.onSenderBacklog(16);
@@ -370,11 +371,12 @@ public class RemoteInputChannelTest {
18, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 0 buffer available in 
local pool",
0, 
bufferPool.getNumberOfAvailableMemorySegments());
+   assertTrue(inputChannel.isWaitingForFloatingBuffers());
 
-   // Recycle one floating buffer
-   floatingBufferQueue.poll().recycleBuffer();
+   // Recycle one exclusive buffer
+   exclusiveBuffer.recycleBuffer();
 
-   // Assign the floating buffer to the listener and the 
channel is still waiting for more floating buffers
+   // The exclusi

[5/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition

2018-02-28 Thread srichter
[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
This fixes the problem by moving buffersAvailable handling to Supartitions and 
adds stress test
for flushAlways (without this fix this test is dead locking).

(cherry picked from commit ebd39f3)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb6a307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb6a307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb6a307

Branch: refs/heads/release-1.5
Commit: 8eb6a30798c09d171e3eb8019b53e677252bd5ba
Parents: 8e62f90
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:28:20 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:17:02 2018 +0100

--
 .../CreditBasedSequenceNumberingViewReader.java | 10 +---
 .../netty/SequenceNumberingViewReader.java  |  7 +--
 .../partition/PipelinedSubpartition.java| 37 --
 .../partition/PipelinedSubpartitionView.java|  5 ++
 .../partition/ResultSubpartitionView.java   |  2 +
 .../partition/SpillableSubpartition.java|  1 -
 .../partition/SpillableSubpartitionView.java| 28 ---
 .../partition/SpilledSubpartitionView.java  |  8 +++
 .../network/buffer/BufferBuilderTestUtils.java  |  4 ++
 .../netty/CancelPartitionRequestTest.java   |  5 ++
 .../netty/PartitionRequestQueueTest.java| 26 --
 .../partition/PipelinedSubpartitionTest.java| 53 
 .../partition/SpillableSubpartitionTest.java|  9 ++--
 .../network/partition/SubpartitionTestBase.java |  5 ++
 .../StreamNetworkThroughputBenchmarkTests.java  |  8 +++
 15 files changed, 173 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index d02b2bf..9acbbac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the new network 
credit-based mode.
@@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
private final InputChannelID receiverId;
 
-   private final AtomicBoolean buffersAvailable = new AtomicBoolean();
-
private final PartitionRequestQueue requestQueue;
 
private volatile ResultSubpartitionView subpartitionView;
@@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
@Override
public boolean isAvailable() {
// BEWARE: this must be in sync with #isAvailable()!
-   return buffersAvailable.get() &&
+   return hasBuffersAvailable() &&
(numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
}
 
@@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
@VisibleForTesting
boolean hasBuffersAvailable() {
-   return buffersAvailable.get();
+   return subpartitionView.isAvailable();
}
 
@Override
public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
-   buffersAvailable.set(next.isMoreAvailable());
sequenceNumber++;
 
if (next.buffer().isBuffer() && --numCreditsAvailable < 
0) {
@@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
@Override
public void notifyDataAvailable() {
-   buffersAvailable.set(true);
requestQueue.notifyReaderNonEmpty(this);
}
 
@@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
return "CreditBasedSequenceN

[7/7] flink git commit: [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent

2018-02-28 Thread srichter
[FLINK-8750][runtime] Improve detection of no remaining data after 
EndOfPartitionEvent

Because of race condition between:
  1. releasing inputChannelsWithData lock in this method and reaching this place
  2. empty data notification that re-enqueues a channel
we can end up with moreAvailable flag set to true, while we expect no more data.

This commit detects such situation, makes a correct assertion and turn off 
moreAvailable flag.

(cherry picked from commit b9b7416)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5338c41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5338c41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5338c41

Branch: refs/heads/release-1.5
Commit: d5338c4154e5de029b3b30e3ef0a0732bf7f68e7
Parents: 61a34a6
Author: Piotr Nowojski 
Authored: Tue Feb 27 10:39:00 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:17:16 2018 +0100

--
 .../runtime/io/network/partition/consumer/SingleInputGate.java | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d5338c41/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index be4035c..b9091b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -554,6 +554,12 @@ public class SingleInputGate implements InputGate {

channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
 
if 
(channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+   // Because of race condition between:
+   // 1. releasing inputChannelsWithData 
lock in this method and reaching this place
+   // 2. empty data notification that 
re-enqueues a channel
+   // we can end up with moreAvailable 
flag set to true, while we expect no more data.
+   checkState(!moreAvailable || 
!pollNextBufferOrEvent().isPresent());
+   moreAvailable = false;
hasReceivedAllEndOfPartitionEvents = 
true;
}
 



[1/7] flink git commit: [hotfix][tests] Deduplicate code in SingleInputGateTest

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.5 623e94459 -> d5338c415


[hotfix][tests] Deduplicate code in SingleInputGateTest

(cherry picked from commit 67a547a)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb459cc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb459cc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb459cc6

Branch: refs/heads/release-1.5
Commit: bb459cc68f8dc4bd042b61e365e583d4e96b3e0e
Parents: 623e944
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:37:37 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:16:44 2018 +0100

--
 .../partition/consumer/SingleInputGateTest.java | 66 
 1 file changed, 25 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bb459cc6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 0dd0875..e94411d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -77,14 +77,7 @@ public class SingleInputGateTest {
@Test(timeout = 120 * 1000)
public void testBasicGetNextLogic() throws Exception {
// Setup
-   final SingleInputGate inputGate = new SingleInputGate(
-   "Test Task Name", new JobID(),
-   new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-   0, 2,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-   assertEquals(ResultPartitionType.PIPELINED, 
inputGate.getConsumedPartitionType());
+   final SingleInputGate inputGate = createInputGate();
 
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -135,14 +128,8 @@ public class SingleInputGateTest {

any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
// Setup reader with one local and one unknown input channel
-   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
-   final SingleInputGate inputGate = new SingleInputGate(
-   "Test Task Name", new JobID(),
-   resultId, ResultPartitionType.PIPELINED,
-   0, 2,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   final SingleInputGate inputGate = createInputGate();
final BufferPool bufferPool = mock(BufferPool.class);

when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -190,14 +177,7 @@ public class SingleInputGateTest {
 */
@Test
public void testUpdateChannelBeforeRequest() throws Exception {
-   SingleInputGate inputGate = new SingleInputGate(
-   "t1",
-   new JobID(),
-   new IntermediateDataSetID(),
-   ResultPartitionType.PIPELINED,
-   0,
-   1,
-   mock(TaskActions.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   SingleInputGate inputGate = createInputGate(1);
 
ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
@@ -230,15 +210,7 @@ public class SingleInputGateTest {
final AtomicReference asyncException = new 
AtomicReference<>();
 
// Setup the input gate with a single channel that does nothing
-   final SingleInputGate inputGate = new SingleInputGate(
-   "InputGate",
-   new JobID(),
-   new IntermediateDataSetID(),
-   ResultPartitionType.PIPELINED,
-   0,
-   1,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   final SingleInputGate inputGate = createInp

[3/7] flink git commit: [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate

2018-02-28 Thread srichter
[FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate

Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
might incorrectly return false. This might caused some dead locks.

(cherry picked from commit 6c9e267)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/651462e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/651462e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/651462e6

Branch: refs/heads/release-1.5
Commit: 651462e6b22c51ce14bd9ea6db389ef6a1f38e55
Parents: 6b7a448
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:20:21 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:16:55 2018 +0100

--
 .../partition/consumer/BufferOrEvent.java   |  6 +-
 .../partition/consumer/SingleInputGate.java |  1 +
 .../partition/consumer/UnionInputGate.java  |  9 ++-
 .../partition/consumer/SingleInputGateTest.java | 62 
 .../partition/consumer/TestInputChannel.java| 14 -
 .../partition/consumer/UnionInputGateTest.java  | 33 ++-
 6 files changed, 92 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 3e93ae6..d1da438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -39,7 +39,7 @@ public class BufferOrEvent {
 * This is not needed outside of the input gate unioning logic and 
cannot
 * be set outside of the consumer package.
 */
-   private final boolean moreAvailable;
+   private boolean moreAvailable;
 
private int channelIndex;
 
@@ -99,4 +99,8 @@ public class BufferOrEvent {
return String.format("BufferOrEvent [%s, channelIndex = %d]",
isBuffer() ? buffer : event, channelIndex);
}
+
+   public void setMoreAvailable(boolean moreAvailable) {
+   this.moreAvailable = moreAvailable;
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a1f3cdc..be4035c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -540,6 +540,7 @@ public class SingleInputGate implements InputGate {
// will come for that channel
if (result.get().moreAvailable()) {
queueChannel(currentChannel);
+   moreAvailable = true;
}
 
final Buffer buffer = result.get().buffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 481599c..393e087 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -181,6 +181,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
+   bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
 
return Optional.ofNullable(bufferOrEvent);
}
@@ -193,18 +194,20 @@ public class UnionInputGate i

[6/7] flink git commit: [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

2018-02-28 Thread srichter
[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

For example, previously if the method was used to check for EndOfPartitionEvent
and the Buffer contained huge custom event, the even had to be deserialized 
before
performing the actual check. Now we are quickly entering the correct if/else 
branch
and doing full costly deserialization only if we have to.

Other calls to isEvent() then checking against EndOfPartitionEvent were not 
used.

(cherry picked from commit 767027f)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a34a69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a34a69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a34a69

Branch: refs/heads/release-1.5
Commit: 61a34a691e7d5233f18ac72a1ab8fb09b53c4753
Parents: 8eb6a30
Author: Piotr Nowojski 
Authored: Mon Feb 26 16:13:06 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:17:04 2018 +0100

--
 .../api/serialization/EventSerializer.java  | 57 ++--
 .../io/network/netty/PartitionRequestQueue.java |  3 +-
 .../api/serialization/EventSerializerTest.java  | 39 +-
 3 files changed, 42 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/61a34a69/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index d7fb7e8..8d76bb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -101,16 +101,15 @@ public class EventSerializer {
}
 
/**
-* Identifies whether the given buffer encodes the given event.
+* Identifies whether the given buffer encodes the given event. Custom 
events are not supported.
 *
 * Pre-condition: This buffer must encode some 
event!
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
if (buffer.remaining() < 4) {
throw new IOException("Incomplete event");
}
@@ -122,38 +121,16 @@ public class EventSerializer {
try {
int type = buffer.getInt();
 
-   switch (type) {
-   case END_OF_PARTITION_EVENT:
-   return 
eventClass.equals(EndOfPartitionEvent.class);
-   case CHECKPOINT_BARRIER_EVENT:
-   return 
eventClass.equals(CheckpointBarrier.class);
-   case END_OF_SUPERSTEP_EVENT:
-   return 
eventClass.equals(EndOfSuperstepEvent.class);
-   case CANCEL_CHECKPOINT_MARKER_EVENT:
-   return 
eventClass.equals(CancelCheckpointMarker.class);
-   case OTHER_EVENT:
-   try {
-   final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-   final String className = 
deserializer.readUTF();
-
-   final Class clazz;
-   try {
-   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
-   }
-   catch (ClassNotFoundException 
e) {
-   throw new 
IOException("Could not load event class '" + className + "'.", e);
-   }
-   catch (ClassCastException e) {
-   throw new 
IOException("The class '" + className + "' is not a valid subclass of '"
-  

[4/7] flink git commit: [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase

2018-02-28 Thread srichter
[hotfix][tests] Do not hide original exception in 
SuccessAfterNetworkBuffersFailureITCase

(cherry picked from commit 2c2e189)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e62f907
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e62f907
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e62f907

Branch: refs/heads/release-1.5
Commit: 8e62f90739e2319491df983917dc7ab484de2550
Parents: 651462e
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:27:54 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:16:58 2018 +0100

--
 ...SuccessAfterNetworkBuffersFailureITCase.java | 40 ++--
 1 file changed, 11 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8e62f907/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index dc19ad1..dbd0f79 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -65,38 +65,20 @@ public class SuccessAfterNetworkBuffersFailureITCase 
extends TestLogger {
}
 
@Test
-   public void testSuccessfulProgramAfterFailure() {
+   public void testSuccessfulProgramAfterFailure() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   runConnectedComponents(env);
+
try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-   try {
-   runConnectedComponents(env);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Program Execution should have 
succeeded.");
-   }
-
-   try {
-   runKMeans(env);
-   fail("This program execution should have 
failed.");
-   }
-   catch (JobExecutionException e) {
-   
assertTrue(e.getCause().getMessage().contains("Insufficient number of network 
buffers"));
-   }
-
-   try {
-   runConnectedComponents(env);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Program Execution should have 
succeeded.");
-   }
+   runKMeans(env);
+   fail("This program execution should have failed.");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (JobExecutionException e) {
+   
assertTrue(e.getCause().getMessage().contains("Insufficient number of network 
buffers"));
}
+
+   runConnectedComponents(env);
}
 
private static void runConnectedComponents(ExecutionEnvironment env) 
throws Exception {



[2/7] flink git commit: [hotfix][runtime] Remove duplicated check

2018-02-28 Thread srichter
[hotfix][runtime] Remove duplicated check

(cherry picked from commit 42f71f6)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b7a4480
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b7a4480
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b7a4480

Branch: refs/heads/release-1.5
Commit: 6b7a4480ef8610df3ff21eb2811b9a0a3c58c912
Parents: bb459cc
Author: Piotr Nowojski 
Authored: Fri Feb 23 12:11:14 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:16:51 2018 +0100

--
 .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ---
 1 file changed, 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6b7a4480/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 5a547ea..481599c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -177,13 +177,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
}
}
 
-   if (bufferOrEvent.moreAvailable()) {
-   // this buffer or event was now removed from the 
non-empty gates queue
-   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
-   // will come for that gate
-   queueInputGate(inputGate);
-   }
-
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 



[7/7] flink git commit: [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent

2018-02-28 Thread srichter
[FLINK-8750][runtime] Improve detection of no remaining data after 
EndOfPartitionEvent

Because of race condition between:
  1. releasing inputChannelsWithData lock in this method and reaching this place
  2. empty data notification that re-enqueues a channel
we can end up with moreAvailable flag set to true, while we expect no more data.

This commit detects such situation, makes a correct assertion and turn off 
moreAvailable flag.

This closes #5588.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9b7416f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9b7416f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9b7416f

Branch: refs/heads/master
Commit: b9b7416f4d6a708d22029a5e971af5b1f67e3296
Parents: 767027f
Author: Piotr Nowojski 
Authored: Tue Feb 27 10:39:00 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:13:24 2018 +0100

--
 .../runtime/io/network/partition/consumer/SingleInputGate.java | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b9b7416f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index be4035c..b9091b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -554,6 +554,12 @@ public class SingleInputGate implements InputGate {

channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
 
if 
(channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+   // Because of race condition between:
+   // 1. releasing inputChannelsWithData 
lock in this method and reaching this place
+   // 2. empty data notification that 
re-enqueues a channel
+   // we can end up with moreAvailable 
flag set to true, while we expect no more data.
+   checkState(!moreAvailable || 
!pollNextBufferOrEvent().isPresent());
+   moreAvailable = false;
hasReceivedAllEndOfPartitionEvents = 
true;
}
 



[3/7] flink git commit: [hotfix][runtime] Remove duplicated check

2018-02-28 Thread srichter
[hotfix][runtime] Remove duplicated check


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42f71f61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42f71f61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42f71f61

Branch: refs/heads/master
Commit: 42f71f61c1ae683cd39887cb3d921c0d0cb67619
Parents: 67a547a
Author: Piotr Nowojski 
Authored: Fri Feb 23 12:11:14 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ---
 1 file changed, 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/42f71f61/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 5a547ea..481599c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -177,13 +177,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
}
}
 
-   if (bufferOrEvent.moreAvailable()) {
-   // this buffer or event was now removed from the 
non-empty gates queue
-   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
-   // will come for that gate
-   queueInputGate(inputGate);
-   }
-
// Set the channel index to identify the input channel (across 
all unioned input gates)
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 



[5/7] flink git commit: [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate

2018-02-28 Thread srichter
[FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate

Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
might incorrectly return false. This might caused some dead locks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c9e2671
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c9e2671
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c9e2671

Branch: refs/heads/master
Commit: 6c9e26713502a5256520ca3c1619e1da952666d9
Parents: 42f71f6
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:20:21 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 .../partition/consumer/BufferOrEvent.java   |  6 +-
 .../partition/consumer/SingleInputGate.java |  1 +
 .../partition/consumer/UnionInputGate.java  |  9 ++-
 .../partition/consumer/SingleInputGateTest.java | 62 
 .../partition/consumer/TestInputChannel.java| 14 -
 .../partition/consumer/UnionInputGateTest.java  | 33 ++-
 6 files changed, 92 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 3e93ae6..d1da438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -39,7 +39,7 @@ public class BufferOrEvent {
 * This is not needed outside of the input gate unioning logic and 
cannot
 * be set outside of the consumer package.
 */
-   private final boolean moreAvailable;
+   private boolean moreAvailable;
 
private int channelIndex;
 
@@ -99,4 +99,8 @@ public class BufferOrEvent {
return String.format("BufferOrEvent [%s, channelIndex = %d]",
isBuffer() ? buffer : event, channelIndex);
}
+
+   public void setMoreAvailable(boolean moreAvailable) {
+   this.moreAvailable = moreAvailable;
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a1f3cdc..be4035c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -540,6 +540,7 @@ public class SingleInputGate implements InputGate {
// will come for that channel
if (result.get().moreAvailable()) {
queueChannel(currentChannel);
+   moreAvailable = true;
}
 
final Buffer buffer = result.get().buffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 481599c..393e087 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -181,6 +181,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
+   bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
 
return Optional.ofNullable(bufferOrEvent);
}
@@ -193,18 +194,20 @@ public class UnionInputGate implements InputGate, 
InputGateListener {

[4/7] flink git commit: [hotfix][tests] Deduplicate code in SingleInputGateTest

2018-02-28 Thread srichter
[hotfix][tests] Deduplicate code in SingleInputGateTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67a547ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67a547ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67a547ad

Branch: refs/heads/master
Commit: 67a547ad438c33d3fbcbe23cd03f009fdc8dd021
Parents: af8efe9
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:37:37 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 .../partition/consumer/SingleInputGateTest.java | 66 
 1 file changed, 25 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/67a547ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 0dd0875..e94411d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -77,14 +77,7 @@ public class SingleInputGateTest {
@Test(timeout = 120 * 1000)
public void testBasicGetNextLogic() throws Exception {
// Setup
-   final SingleInputGate inputGate = new SingleInputGate(
-   "Test Task Name", new JobID(),
-   new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-   0, 2,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-   assertEquals(ResultPartitionType.PIPELINED, 
inputGate.getConsumedPartitionType());
+   final SingleInputGate inputGate = createInputGate();
 
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -135,14 +128,8 @@ public class SingleInputGateTest {

any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
// Setup reader with one local and one unknown input channel
-   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
-   final SingleInputGate inputGate = new SingleInputGate(
-   "Test Task Name", new JobID(),
-   resultId, ResultPartitionType.PIPELINED,
-   0, 2,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   final SingleInputGate inputGate = createInputGate();
final BufferPool bufferPool = mock(BufferPool.class);

when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -190,14 +177,7 @@ public class SingleInputGateTest {
 */
@Test
public void testUpdateChannelBeforeRequest() throws Exception {
-   SingleInputGate inputGate = new SingleInputGate(
-   "t1",
-   new JobID(),
-   new IntermediateDataSetID(),
-   ResultPartitionType.PIPELINED,
-   0,
-   1,
-   mock(TaskActions.class), 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   SingleInputGate inputGate = createInputGate(1);
 
ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
@@ -230,15 +210,7 @@ public class SingleInputGateTest {
final AtomicReference asyncException = new 
AtomicReference<>();
 
// Setup the input gate with a single channel that does nothing
-   final SingleInputGate inputGate = new SingleInputGate(
-   "InputGate",
-   new JobID(),
-   new IntermediateDataSetID(),
-   ResultPartitionType.PIPELINED,
-   0,
-   1,
-   mock(TaskActions.class),
-   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+   final SingleInputGate inputGate = createInputGate(1);
 
InputChannel unknown = new UnknownInputChannel(
inputGate,
@@ -410,15 +382,

[2/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition

2018-02-28 Thread srichter
[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
This fixes the problem by moving buffersAvailable handling to Supartitions and 
adds stress test
for flushAlways (without this fix this test is dead locking).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebd39f32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebd39f32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebd39f32

Branch: refs/heads/master
Commit: ebd39f3244a7c56f0d9a3cc4dba4d3f50efb36ad
Parents: 2c2e189
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:28:20 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 .../CreditBasedSequenceNumberingViewReader.java | 10 +---
 .../netty/SequenceNumberingViewReader.java  |  7 +--
 .../partition/PipelinedSubpartition.java| 37 --
 .../partition/PipelinedSubpartitionView.java|  5 ++
 .../partition/ResultSubpartitionView.java   |  2 +
 .../partition/SpillableSubpartition.java|  1 -
 .../partition/SpillableSubpartitionView.java| 28 ---
 .../partition/SpilledSubpartitionView.java  |  8 +++
 .../network/buffer/BufferBuilderTestUtils.java  |  4 ++
 .../netty/CancelPartitionRequestTest.java   |  5 ++
 .../netty/PartitionRequestQueueTest.java| 26 --
 .../partition/PipelinedSubpartitionTest.java| 53 
 .../partition/SpillableSubpartitionTest.java|  9 ++--
 .../network/partition/SubpartitionTestBase.java |  5 ++
 .../StreamNetworkThroughputBenchmarkTests.java  |  8 +++
 15 files changed, 173 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ebd39f32/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index d02b2bf..9acbbac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Simple wrapper for the subpartition view used in the new network 
credit-based mode.
@@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
private final InputChannelID receiverId;
 
-   private final AtomicBoolean buffersAvailable = new AtomicBoolean();
-
private final PartitionRequestQueue requestQueue;
 
private volatile ResultSubpartitionView subpartitionView;
@@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
@Override
public boolean isAvailable() {
// BEWARE: this must be in sync with #isAvailable()!
-   return buffersAvailable.get() &&
+   return hasBuffersAvailable() &&
(numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
}
 
@@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
@VisibleForTesting
boolean hasBuffersAvailable() {
-   return buffersAvailable.get();
+   return subpartitionView.isAvailable();
}
 
@Override
public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next != null) {
-   buffersAvailable.set(next.isMoreAvailable());
sequenceNumber++;
 
if (next.buffer().isBuffer() && --numCreditsAvailable < 
0) {
@@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
 
@Override
public void notifyDataAvailable() {
-   buffersAvailable.set(true);
requestQueue.notifyReaderNonEmpty(this);
}
 
@@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
return "CreditBasedSequenceNumberingViewReader{" +
   

[1/7] flink git commit: [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master af8efe92c -> b9b7416f4


[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

For example, previously if the method was used to check for EndOfPartitionEvent
and the Buffer contained huge custom event, the even had to be deserialized 
before
performing the actual check. Now we are quickly entering the correct if/else 
branch
and doing full costly deserialization only if we have to.

Other calls to isEvent() then checking against EndOfPartitionEvent were not 
used.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/767027ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/767027ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/767027ff

Branch: refs/heads/master
Commit: 767027ff6f388f808842760b7cf8bc807f8e6913
Parents: ebd39f3
Author: Piotr Nowojski 
Authored: Mon Feb 26 16:13:06 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 .../api/serialization/EventSerializer.java  | 57 ++--
 .../io/network/netty/PartitionRequestQueue.java |  3 +-
 .../api/serialization/EventSerializerTest.java  | 39 +-
 3 files changed, 42 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/767027ff/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index d7fb7e8..8d76bb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -101,16 +101,15 @@ public class EventSerializer {
}
 
/**
-* Identifies whether the given buffer encodes the given event.
+* Identifies whether the given buffer encodes the given event. Custom 
events are not supported.
 *
 * Pre-condition: This buffer must encode some 
event!
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
if (buffer.remaining() < 4) {
throw new IOException("Incomplete event");
}
@@ -122,38 +121,16 @@ public class EventSerializer {
try {
int type = buffer.getInt();
 
-   switch (type) {
-   case END_OF_PARTITION_EVENT:
-   return 
eventClass.equals(EndOfPartitionEvent.class);
-   case CHECKPOINT_BARRIER_EVENT:
-   return 
eventClass.equals(CheckpointBarrier.class);
-   case END_OF_SUPERSTEP_EVENT:
-   return 
eventClass.equals(EndOfSuperstepEvent.class);
-   case CANCEL_CHECKPOINT_MARKER_EVENT:
-   return 
eventClass.equals(CancelCheckpointMarker.class);
-   case OTHER_EVENT:
-   try {
-   final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-   final String className = 
deserializer.readUTF();
-
-   final Class clazz;
-   try {
-   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
-   }
-   catch (ClassNotFoundException 
e) {
-   throw new 
IOException("Could not load event class '" + className + "'.", e);
-   }
-   catch (ClassCastException e) {
-   throw new 
IOException("The class '" + className + "' is not

[6/7] flink git commit: [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase

2018-02-28 Thread srichter
[hotfix][tests] Do not hide original exception in 
SuccessAfterNetworkBuffersFailureITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c2e1896
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c2e1896
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c2e1896

Branch: refs/heads/master
Commit: 2c2e1896bba7805e6afa96cfd9040729c35c2742
Parents: 6c9e267
Author: Piotr Nowojski 
Authored: Fri Feb 23 11:27:54 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 17:12:52 2018 +0100

--
 ...SuccessAfterNetworkBuffersFailureITCase.java | 40 ++--
 1 file changed, 11 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2c2e1896/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index dc19ad1..dbd0f79 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -65,38 +65,20 @@ public class SuccessAfterNetworkBuffersFailureITCase 
extends TestLogger {
}
 
@Test
-   public void testSuccessfulProgramAfterFailure() {
+   public void testSuccessfulProgramAfterFailure() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   runConnectedComponents(env);
+
try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-   try {
-   runConnectedComponents(env);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Program Execution should have 
succeeded.");
-   }
-
-   try {
-   runKMeans(env);
-   fail("This program execution should have 
failed.");
-   }
-   catch (JobExecutionException e) {
-   
assertTrue(e.getCause().getMessage().contains("Insufficient number of network 
buffers"));
-   }
-
-   try {
-   runConnectedComponents(env);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Program Execution should have 
succeeded.");
-   }
+   runKMeans(env);
+   fail("This program execution should have failed.");
}
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   catch (JobExecutionException e) {
+   
assertTrue(e.getCause().getMessage().contains("Insufficient number of network 
buffers"));
}
+
+   runConnectedComponents(env);
}
 
private static void runConnectedComponents(ExecutionEnvironment env) 
throws Exception {



flink git commit: [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.5 cf854ccbc -> 623e94459


[FLINK-8557][checkpointing] Remove illegal characters from operator description 
text before using it to construct the instance directory in RocksDB

(cherry picked from commit 66474da)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/623e9445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/623e9445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/623e9445

Branch: refs/heads/release-1.5
Commit: 623e94459795a191703b880fcfa4f162c92ae458
Parents: cf854cc
Author: Stefan Richter 
Authored: Wed Feb 28 14:25:55 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 15:48:50 2018 +0100

--
 .../contrib/streaming/state/RocksDBStateBackend.java  | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/623e9445/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f60cb2c..9389295 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -396,10 +396,14 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);
 
-   lazyInitializeForJob(env, operatorIdentifier);
+   // replace all characters that are not legal for filenames with 
underscore
+   String fileCompatibleIdentifier = 
operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
 
-   File instanceBasePath =
-   new File(getNextStoragePath(), "job-" + jobId + 
"_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
+   lazyInitializeForJob(env, fileCompatibleIdentifier);
+
+   File instanceBasePath = new File(
+   getNextStoragePath(),
+   "job_" + jobId + "_op_" + fileCompatibleIdentifier + 
"_uuid_" + UUID.randomUUID());
 
LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();



flink git commit: [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master f57793082 -> af8efe92c


[FLINK-8557][checkpointing] Remove illegal characters from operator description 
text before using it to construct the instance directory in RocksDB

This closes #5598.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af8efe92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af8efe92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af8efe92

Branch: refs/heads/master
Commit: af8efe92c340ad21284d775ce74b15b774b67bf7
Parents: f577930
Author: Stefan Richter 
Authored: Wed Feb 28 14:25:55 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 15:48:01 2018 +0100

--
 .../contrib/streaming/state/RocksDBStateBackend.java  | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/af8efe92/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f60cb2c..9389295 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -396,10 +396,14 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);
 
-   lazyInitializeForJob(env, operatorIdentifier);
+   // replace all characters that are not legal for filenames with 
underscore
+   String fileCompatibleIdentifier = 
operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
 
-   File instanceBasePath =
-   new File(getNextStoragePath(), "job-" + jobId + 
"_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
+   lazyInitializeForJob(env, fileCompatibleIdentifier);
+
+   File instanceBasePath = new File(
+   getNextStoragePath(),
+   "job_" + jobId + "_op_" + fileCompatibleIdentifier + 
"_uuid_" + UUID.randomUUID());
 
LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();



flink git commit: [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource

2018-02-28 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.5 59b607b0c -> cf854ccbc


[hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf854ccb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf854ccb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf854ccb

Branch: refs/heads/release-1.5
Commit: cf854ccbc6fdbf112095c471705c8799aee64a45
Parents: 59b607b
Author: Aljoscha Krettek 
Authored: Wed Feb 28 15:35:13 2018 +0100
Committer: Aljoscha Krettek 
Committed: Wed Feb 28 15:38:42 2018 +0100

--
 .../java/org/apache/flink/test/util/MiniClusterResource.java| 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cf854ccb/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9bb1ae9..1c5da62 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
@@ -144,6 +145,10 @@ public class MiniClusterResource extends ExternalResource {
private JobExecutorService startFlip6MiniCluster() throws Exception {
final Configuration configuration = 
miniClusterResourceConfiguration.getConfiguration();
 
+   // we need to set this since a lot of test expect this because 
TestBaseUtils.startCluster()
+   // enabled this by default
+   
configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
+
// set rest port to 0 to avoid clashes with concurrent 
MiniClusters
configuration.setInteger(RestOptions.REST_PORT, 0);
 



flink git commit: [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource

2018-02-28 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 296f9ff74 -> f57793082


[hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5779308
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5779308
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5779308

Branch: refs/heads/master
Commit: f57793082f92b8052e4fdccc0aaf16f0c55777a8
Parents: 296f9ff
Author: Aljoscha Krettek 
Authored: Wed Feb 28 15:35:13 2018 +0100
Committer: Aljoscha Krettek 
Committed: Wed Feb 28 15:35:13 2018 +0100

--
 .../java/org/apache/flink/test/util/MiniClusterResource.java| 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f5779308/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9bb1ae9..1c5da62 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
@@ -144,6 +145,10 @@ public class MiniClusterResource extends ExternalResource {
private JobExecutorService startFlip6MiniCluster() throws Exception {
final Configuration configuration = 
miniClusterResourceConfiguration.getConfiguration();
 
+   // we need to set this since a lot of test expect this because 
TestBaseUtils.startCluster()
+   // enabled this by default
+   
configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
+
// set rest port to 0 to avoid clashes with concurrent 
MiniClusters
configuration.setInteger(RestOptions.REST_PORT, 0);
 



[flink-shaded] branch master updated: (#39) Properly hide jackson dependencies

2018-02-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git


The following commit(s) were added to refs/heads/master by this push:
 new d53464b  (#39) Properly hide jackson dependencies
d53464b is described below

commit d53464bb80d08ff6f8538faecb9b479f9caf34d2
Author: zentol 
AuthorDate: Wed Feb 28 14:19:28 2018 +0100

(#39) Properly hide jackson dependencies
---
 .../flink-shaded-jackson-2/pom.xml | 12 
 .../pom.xml| 12 
 flink-shaded-jackson-parent/pom.xml| 36 --
 3 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml 
b/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml
index 167020a..4a7c753 100644
--- a/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml
+++ b/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml
@@ -34,6 +34,18 @@ under the License.
 
 
 
+com.fasterxml.jackson.core
+jackson-core
+
+
+com.fasterxml.jackson.core
+jackson-annotations
+
+
+com.fasterxml.jackson.core
+jackson-databind
+
+
 com.fasterxml.jackson.dataformat
 jackson-dataformat-yaml
 ${jackson.version}
diff --git 
a/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml 
b/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml
index 9079575..e723489 100644
--- 
a/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml
+++ 
b/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml
@@ -34,6 +34,18 @@ under the License.
 
 
 
+com.fasterxml.jackson.core
+jackson-core
+
+
+com.fasterxml.jackson.core
+jackson-annotations
+
+
+com.fasterxml.jackson.core
+jackson-databind
+
+
 com.fasterxml.jackson.module
 jackson-module-jsonSchema
 ${jackson.version}
diff --git a/flink-shaded-jackson-parent/pom.xml 
b/flink-shaded-jackson-parent/pom.xml
index 0e3073e..bdffc4c 100644
--- a/flink-shaded-jackson-parent/pom.xml
+++ b/flink-shaded-jackson-parent/pom.xml
@@ -43,23 +43,25 @@ under the License.
 flink-shaded-jackson-module-jsonSchema-2
 
 
-
-
-com.fasterxml.jackson.core
-jackson-core
-${jackson.version}
-
-
-com.fasterxml.jackson.core
-jackson-annotations
-${jackson.version}
-
-
-com.fasterxml.jackson.core
-jackson-databind
-${jackson.version}
-
-
+
+
+
+com.fasterxml.jackson.core
+jackson-core
+${jackson.version}
+
+
+com.fasterxml.jackson.core
+jackson-annotations
+${jackson.version}
+
+
+com.fasterxml.jackson.core
+jackson-databind
+${jackson.version}
+
+
+
 
 
 

-- 
To stop receiving notification emails like this one, please contact
ches...@apache.org.


flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.4 51231c471 -> 93f823fff


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93f823ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93f823ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93f823ff

Branch: refs/heads/release-1.4
Commit: 93f823fff7b1517cc3f2484fa0091ebdc9e546d0
Parents: 51231c4
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:32:34 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 7ffad55..a95bdf7 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+   scalaTypes.add("scala.Tuple6");
+   scala

flink git commit: [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.5 302aaeb02 -> 59b607b0c


[FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery

(cherry picked from commit 296f9ff)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59b607b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59b607b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59b607b0

Branch: refs/heads/release-1.5
Commit: 59b607b0c411b7d01b97db302d0f124b28ef0d0e
Parents: 302aaeb
Author: sihuazhou 
Authored: Mon Feb 26 03:54:53 2018 +0100
Committer: Stefan Richter 
Committed: Wed Feb 28 13:35:30 2018 +0100

--
 .../runtime/state/TaskLocalStateStore.java  |   8 ++
 .../runtime/state/TaskLocalStateStoreImpl.java  | 108 +--
 .../runtime/state/TaskStateManagerImpl.java |   6 +-
 .../state/TaskLocalStateStoreImplTest.java  |  42 ++--
 .../runtime/state/TestTaskLocalStateStore.java  |   6 ++
 5 files changed, 130 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/59b607b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index 7089894..686f4f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -24,6 +24,8 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.function.LongPredicate;
+
 /**
  * Classes  that implement this interface serve as a task-manager-level local 
storage for local checkpointed state.
  * The purpose is to provide  access to a state that is stored locally for a 
faster recovery compared to the state that
@@ -62,4 +64,10 @@ public interface TaskLocalStateStore {
 * and removes all local states with a checkpoint id that is smaller 
than the newly confirmed checkpoint id.
 */
void confirmCheckpoint(long confirmedCheckpointId);
+
+   /**
+* Remove all checkpoints from the store that match the given predicate.
+* @param matcher the predicate that selects the checkpoints for 
pruning.
+*/
+   void pruneMatchingCheckpoints(LongPredicate matcher);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59b607b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index bb4f011..29adc4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -35,9 +36,10 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
 
 /**
  * Main implementation of a {@link TaskLocalStateStore}.
@@ -56,7 +59,8 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
 
/** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
-   private static final TaskStateSnapshot NULL_DUMMY = new 
TaskStateSnapshot(0);
+   @VisibleForTesting
+   static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0);
 
/** JobID from the owning subtask. */
@Nonnull
@@ -103,14 +107,36 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor) {
 
-   this

flink git commit: [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery

2018-02-28 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master 6c837d738 -> 296f9ff74


[FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery

This closes #5578.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/296f9ff7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/296f9ff7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/296f9ff7

Branch: refs/heads/master
Commit: 296f9ff742425d348d72ff598da31022aea70778
Parents: 6c837d7
Author: sihuazhou 
Authored: Mon Feb 26 10:54:53 2018 +0800
Committer: Stefan Richter 
Committed: Wed Feb 28 13:34:26 2018 +0100

--
 .../runtime/state/TaskLocalStateStore.java  |   8 ++
 .../runtime/state/TaskLocalStateStoreImpl.java  | 108 +--
 .../runtime/state/TaskStateManagerImpl.java |   6 +-
 .../state/TaskLocalStateStoreImplTest.java  |  42 ++--
 .../runtime/state/TestTaskLocalStateStore.java  |   6 ++
 5 files changed, 130 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/296f9ff7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index 7089894..686f4f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -24,6 +24,8 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.function.LongPredicate;
+
 /**
  * Classes  that implement this interface serve as a task-manager-level local 
storage for local checkpointed state.
  * The purpose is to provide  access to a state that is stored locally for a 
faster recovery compared to the state that
@@ -62,4 +64,10 @@ public interface TaskLocalStateStore {
 * and removes all local states with a checkpoint id that is smaller 
than the newly confirmed checkpoint id.
 */
void confirmCheckpoint(long confirmedCheckpointId);
+
+   /**
+* Remove all checkpoints from the store that match the given predicate.
+* @param matcher the predicate that selects the checkpoints for 
pruning.
+*/
+   void pruneMatchingCheckpoints(LongPredicate matcher);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/296f9ff7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index bb4f011..29adc4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -35,9 +36,10 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +48,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
 
 /**
  * Main implementation of a {@link TaskLocalStateStore}.
@@ -56,7 +59,8 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
 
/** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
-   private static final TaskStateSnapshot NULL_DUMMY = new 
TaskStateSnapshot(0);
+   @VisibleForTesting
+   static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0);
 
/** JobID from the owning subtask. */
@Nonnull
@@ -103,14 +107,36 @@ public class TaskLocalStateStoreImpl implements 
TaskLocalStateStore {
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull Executor discardExecutor) {
 
-   this.lock = new Object();
-

flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.5 08e615027 -> 302aaeb02


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0

Branch: refs/heads/release-1.5
Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30
Parents: 08e6150
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:30:59 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 11e3990..978d270 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+   scalaTypes.add("scala.Tuple6");
+   scala

flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

2018-02-28 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master e8d168509 -> 6c837d738


[FLINK-8451] [serializers] Make Scala tuple serializer deserialization more 
failure tolerant

This closes #5567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c837d73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c837d73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c837d73

Branch: refs/heads/master
Commit: 6c837d738f90ccf140ad2288cd24f3706babd63c
Parents: e8d1685
Author: Timo Walther 
Authored: Thu Feb 22 17:22:54 2018 +0100
Committer: Timo Walther 
Committed: Wed Feb 28 13:28:12 2018 +0100

--
 .../runtime/TupleSerializerConfigSnapshot.java  |   2 +-
 .../apache/flink/util/InstantiationUtil.java|  56 +--
 .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes
 .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes
 .../TupleSerializerCompatibilityTest.scala  |  86 +
 ...leSerializerCompatibilityTestGenerator.scala |  94 +++
 6 files changed, 231 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 705099e..eac5200 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends 
CompositeTypeSeriali
super.read(in);
 
try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
-   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
+   tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), 
true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple 
class in classpath.", e);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 11e3990..978d270 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -113,7 +113,7 @@ public final class InstantiationUtil {
 *
 * This can be removed once 1.2 is no longer supported.
 */
-   private static Set scalaSerializerClassnames = new HashSet<>();
+   private static final Set scalaSerializerClassnames = new 
HashSet<>();
static {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
@@ -121,11 +121,54 @@ public final class InstantiationUtil {

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-   
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");

scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}
 
/**
+* The serialVersionUID might change between Scala versions and since 
those classes are
+* part of the tuple serializer config snapshots we need to ignore them.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451
+*/
+   private static final Set scalaTypes = new HashSet<>();
+   static {
+   scalaTypes.add("scala.Tuple1");
+   scalaTypes.add("scala.Tuple2");
+   scalaTypes.add("scala.Tuple3");
+   scalaTypes.add("scala.Tuple4");
+   scalaTypes.add("scala.Tuple5");
+   scalaTypes.add("scala.Tuple6");
+   scalaTypes.add(

[3/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString

2018-02-28 Thread trohrmann
[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to 
convertValueToString

This closes #5587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51d5bc6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51d5bc6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51d5bc6c

Branch: refs/heads/release-1.5
Commit: 51d5bc6c5151c2aed3f932f84c35da43689501ec
Parents: acf1147
Author: vinoyang 
Authored: Tue Feb 27 14:43:52 2018 +0800
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:28:22 2018 +0100

--
 .../handlers/AllowNonRestoredStateQueryParameter.java  | 2 +-
 .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +-
 .../runtime/webmonitor/handlers/StringQueryParameter.java  | 2 +-
 .../handlers/AllowNonRestoredStateQueryParameterTest.java  | 4 ++--
 .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageQueryParameter.java | 6 +++---
 .../rest/messages/RescalingParallelismQueryParameter.java  | 2 +-
 .../runtime/rest/messages/TerminationModeQueryParameter.java   | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameter.java  | 2 +-
 .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +-
 .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameterTest.java  | 2 +-
 13 files changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
index 7ad014e..2ddde3a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
@@ -39,7 +39,7 @@ public class AllowNonRestoredStateQueryParameter extends 
MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
index 26cb16c..2ade7eb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
@@ -38,7 +38,7 @@ public class ParallelismQueryParameter extends 
MessageQueryParameter {
}
 
@Override
-   public String convertStringToValue(final Integer value) {
+   public String convertValueToString(final Integer value) {
return value.toString();
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
index 226c592..52c0967 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
@@ -35,7 +35,7 @@ public abstract class StringQueryParameter extends 
MessageQueryParameter
}
 
@Override
-   public final String convertStringToValue(final String value) {
+   public final String convertValueToString(final String value) {
return value;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
--
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handle

[1/4] flink git commit: [FLINK-8730][REST] JSON serialize entire SerializedThrowable

2018-02-28 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.5 23358ff87 -> 08e615027


[FLINK-8730][REST] JSON serialize entire SerializedThrowable

Do not only serialize the serialized exception but the entire
SerializedThrowable object. This makes it possible to throw the
SerializedThrowable itself without deserializing it.

This closes #5546.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acf11479
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acf11479
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acf11479

Branch: refs/heads/release-1.5
Commit: acf114793c708f0ab207008c25195f6f65796e5f
Parents: 2f6cb37
Author: gyao 
Authored: Wed Feb 21 16:02:01 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:28:21 2018 +0100

--
 .../apache/flink/util/SerializedThrowable.java  | 14 
 .../json/SerializedThrowableDeserializer.java   | 15 +++--
 .../json/SerializedThrowableSerializer.java |  7 +-
 .../json/SerializedThrowableSerializerTest.java | 71 
 4 files changed, 83 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java 
b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index de6358c..13f8d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -25,8 +25,6 @@ import java.lang.ref.WeakReference;
 import java.util.HashSet;
 import java.util.Set;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * Utility class for dealing with user-defined Throwable types that are 
serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the 
default
@@ -64,18 +62,6 @@ public class SerializedThrowable extends Exception 
implements Serializable {
this(exception, new HashSet<>());
}
 
-   /**
-* Creates a new SerializedThrowable from a serialized exception 
provided as a byte array.
-*/
-   public SerializedThrowable(
-   final byte[] serializedException,
-   final String originalErrorClassName,
-   final String fullStringifiedStackTrace) {
-   this.serializedException = requireNonNull(serializedException);
-   this.originalErrorClassName = 
requireNonNull(originalErrorClassName);
-   this.fullStringifiedStackTrace = 
requireNonNull(fullStringifiedStackTrace);
-   }
-
private SerializedThrowable(Throwable exception, Set 
alreadySeen) {
super(getMessageOrError(exception));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
index 3217cce..d0f71ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.json;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,9 +28,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS;
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION;
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE;
+import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_THROWABLE;
 
 /**
  * JSON deserializer for {@link SerializedThrowable}.
@@ -48,10 +47,12 @@ public class SerializedThrowableDeserializer extends 
StdDeserializerhttp://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
--

[2/4] flink git commit: [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor

2018-02-28 Thread trohrmann
[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor

This closes #5591.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f6cb37c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f6cb37c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f6cb37c

Branch: refs/heads/release-1.5
Commit: 2f6cb37c775106bb684ef9c608585e7a72056460
Parents: 23358ff
Author: gyao 
Authored: Tue Feb 27 16:58:53 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:28:21 2018 +0100

--
 .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2f6cb37c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index e6c36f6..6b93016 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -482,7 +482,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
 
ApplicationReport report = startAppMaster(
-   new Configuration(flinkConfiguration),
+   flinkConfiguration,
yarnClusterEntrypoint,
jobGraph,
yarnClient,



[4/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue

2018-02-28 Thread trohrmann
[FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to 
convertStringToValue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08e61502
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08e61502
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08e61502

Branch: refs/heads/release-1.5
Commit: 08e615027acd426537dc580139a61bd4082b7c3f
Parents: 51d5bc6
Author: Till Rohrmann 
Authored: Wed Feb 28 10:11:44 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:28:22 2018 +0100

--
 .../handlers/AllowNonRestoredStateQueryParameter.java  | 2 +-
 .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +-
 .../runtime/webmonitor/handlers/StringQueryParameter.java  | 2 +-
 .../handlers/AllowNonRestoredStateQueryParameterTest.java  | 6 +++---
 .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++--
 .../rest/messages/RescalingParallelismQueryParameter.java  | 2 +-
 .../runtime/rest/messages/TerminationModeQueryParameter.java   | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameter.java  | 2 +-
 .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +-
 .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameterTest.java  | 2 +-
 13 files changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
index 2ddde3a..19734d8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
@@ -34,7 +34,7 @@ public class AllowNonRestoredStateQueryParameter extends 
MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
index 2ade7eb..398bcb0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
@@ -33,7 +33,7 @@ public class ParallelismQueryParameter extends 
MessageQueryParameter {
}
 
@Override
-   public Integer convertValueFromString(final String value) {
+   public Integer convertStringToValue(final String value) {
return Integer.valueOf(value);
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
index 52c0967..67e83ff 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
@@ -30,7 +30,7 @@ public abstract class StringQueryParameter extends 
MessageQueryParameter
}
 
@Override
-   public final String convertValueFromString(final String value) {
+   public final String convertStringToValue(final String value) {
return value;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
--
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/A

[3/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString

2018-02-28 Thread trohrmann
[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to 
convertValueToString

This closes #5587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a69035d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a69035d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a69035d

Branch: refs/heads/master
Commit: 9a69035d8c6b62bb6229d75fed6ab43407e35e04
Parents: 970d94e
Author: vinoyang 
Authored: Tue Feb 27 14:43:52 2018 +0800
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:27:06 2018 +0100

--
 .../handlers/AllowNonRestoredStateQueryParameter.java  | 2 +-
 .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +-
 .../runtime/webmonitor/handlers/StringQueryParameter.java  | 2 +-
 .../handlers/AllowNonRestoredStateQueryParameterTest.java  | 4 ++--
 .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageQueryParameter.java | 6 +++---
 .../rest/messages/RescalingParallelismQueryParameter.java  | 2 +-
 .../runtime/rest/messages/TerminationModeQueryParameter.java   | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameter.java  | 2 +-
 .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +-
 .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameterTest.java  | 2 +-
 13 files changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
index 7ad014e..2ddde3a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
@@ -39,7 +39,7 @@ public class AllowNonRestoredStateQueryParameter extends 
MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
index 26cb16c..2ade7eb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
@@ -38,7 +38,7 @@ public class ParallelismQueryParameter extends 
MessageQueryParameter {
}
 
@Override
-   public String convertStringToValue(final Integer value) {
+   public String convertValueToString(final Integer value) {
return value.toString();
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
index 226c592..52c0967 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
@@ -35,7 +35,7 @@ public abstract class StringQueryParameter extends 
MessageQueryParameter
}
 
@Override
-   public final String convertStringToValue(final String value) {
+   public final String convertValueToString(final String value) {
return value;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
--
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/Al

[1/4] flink git commit: [FLINK-8730][REST] JSON serialize entire SerializedThrowable

2018-02-28 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master f89fce83e -> e8d168509


[FLINK-8730][REST] JSON serialize entire SerializedThrowable

Do not only serialize the serialized exception but the entire
SerializedThrowable object. This makes it possible to throw the
SerializedThrowable itself without deserializing it.

This closes #5546.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970d94e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970d94e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970d94e8

Branch: refs/heads/master
Commit: 970d94e821d2341d200baf692ee2fbbc85e395b5
Parents: 71baa27
Author: gyao 
Authored: Wed Feb 21 16:02:01 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:27:05 2018 +0100

--
 .../apache/flink/util/SerializedThrowable.java  | 14 
 .../json/SerializedThrowableDeserializer.java   | 15 +++--
 .../json/SerializedThrowableSerializer.java |  7 +-
 .../json/SerializedThrowableSerializerTest.java | 71 
 4 files changed, 83 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java 
b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index de6358c..13f8d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -25,8 +25,6 @@ import java.lang.ref.WeakReference;
 import java.util.HashSet;
 import java.util.Set;
 
-import static java.util.Objects.requireNonNull;
-
 /**
  * Utility class for dealing with user-defined Throwable types that are 
serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the 
default
@@ -64,18 +62,6 @@ public class SerializedThrowable extends Exception 
implements Serializable {
this(exception, new HashSet<>());
}
 
-   /**
-* Creates a new SerializedThrowable from a serialized exception 
provided as a byte array.
-*/
-   public SerializedThrowable(
-   final byte[] serializedException,
-   final String originalErrorClassName,
-   final String fullStringifiedStackTrace) {
-   this.serializedException = requireNonNull(serializedException);
-   this.originalErrorClassName = 
requireNonNull(originalErrorClassName);
-   this.fullStringifiedStackTrace = 
requireNonNull(fullStringifiedStackTrace);
-   }
-
private SerializedThrowable(Throwable exception, Set 
alreadySeen) {
super(getMessageOrError(exception));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
index 3217cce..d0f71ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.json;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,9 +28,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS;
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION;
-import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE;
+import static 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_THROWABLE;
 
 /**
  * JSON deserializer for {@link SerializedThrowable}.
@@ -48,10 +47,12 @@ public class SerializedThrowableDeserializer extends 
StdDeserializerhttp://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java

[4/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue

2018-02-28 Thread trohrmann
[FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to 
convertStringToValue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8d16850
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8d16850
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8d16850

Branch: refs/heads/master
Commit: e8d16850948231070dbc3344c2b287d07e2647a7
Parents: 9a69035
Author: Till Rohrmann 
Authored: Wed Feb 28 10:11:44 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:27:06 2018 +0100

--
 .../handlers/AllowNonRestoredStateQueryParameter.java  | 2 +-
 .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +-
 .../runtime/webmonitor/handlers/StringQueryParameter.java  | 2 +-
 .../handlers/AllowNonRestoredStateQueryParameterTest.java  | 6 +++---
 .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++--
 .../rest/messages/RescalingParallelismQueryParameter.java  | 2 +-
 .../runtime/rest/messages/TerminationModeQueryParameter.java   | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameter.java  | 2 +-
 .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +-
 .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +-
 .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +-
 .../rest/messages/job/metrics/MetricsFilterParameterTest.java  | 2 +-
 13 files changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
index 2ddde3a..19734d8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
@@ -34,7 +34,7 @@ public class AllowNonRestoredStateQueryParameter extends 
MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
index 2ade7eb..398bcb0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
@@ -33,7 +33,7 @@ public class ParallelismQueryParameter extends 
MessageQueryParameter {
}
 
@Override
-   public Integer convertValueFromString(final String value) {
+   public Integer convertStringToValue(final String value) {
return Integer.valueOf(value);
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
index 52c0967..67e83ff 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
@@ -30,7 +30,7 @@ public abstract class StringQueryParameter extends 
MessageQueryParameter
}
 
@Override
-   public final String convertValueFromString(final String value) {
+   public final String convertStringToValue(final String value) {
return value;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
--
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowN

[2/4] flink git commit: [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor

2018-02-28 Thread trohrmann
[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor

This closes #5591.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71baa278
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71baa278
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71baa278

Branch: refs/heads/master
Commit: 71baa2784edab8b851dabe15855a023c73b4fc1c
Parents: f89fce8
Author: gyao 
Authored: Tue Feb 27 16:58:53 2018 +0100
Committer: Till Rohrmann 
Committed: Wed Feb 28 13:27:05 2018 +0100

--
 .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/71baa278/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index e6c36f6..6b93016 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -482,7 +482,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
 
ApplicationReport report = startAppMaster(
-   new Configuration(flinkConfiguration),
+   flinkConfiguration,
yarnClusterEntrypoint,
jobGraph,
yarnClient,



[4/9] flink git commit: [FLINK-8764] [quickstarts] Make quickstarts work out of the box for IDE and JAR packaging

2018-02-28 Thread sewen
[FLINK-8764] [quickstarts] Make quickstarts work out of the box for IDE and JAR 
packaging

  - All Flink and Scala dependencies are properly set to provided
  - That way, Maven JAR packaging behaves correctly by default
  - Eclipse adds 'provided' dependencies to the classpath when running 
programs, so works out of the box
  - There is a profile that automatically activates in IntelliJ that adds the 
necessary
dependencies in 'compile' scope to make it run out of the box.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa306385
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa306385
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa306385

Branch: refs/heads/release-1.4
Commit: fa306385fe190450251fd902a544d5d3fa2ca664
Parents: 6bd35a3
Author: Stephan Ewen 
Authored: Fri Feb 23 10:53:22 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:42:33 2018 +0100

--
 .../main/resources/archetype-resources/pom.xml  | 231 -
 .../main/resources/archetype-resources/pom.xml  | 246 ---
 2 files changed, 207 insertions(+), 270 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fa306385/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 84584af..6ac05b0 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -50,161 +50,53 @@ under the License.


 
-   
-


-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   
+   

org.apache.flink
flink-java
${flink.version}
+   provided


-   
org.apache.flink
-   
flink-clients_${scala.binary.version}
+   
flink-streaming-java_${scala.binary.version}
${flink.version}
+   provided

+
+   
+
+   
 
-   
+   
+   

org.slf4j
slf4j-log4j12
${slf4j.version}
+   runtime


log4j
log4j
${log4j.version}
+   runtime


 
-   
-   
-   
-   build-jar
-
-   
-   false
-   
-
-   
-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   flink-java
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-clients_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-streaming-java_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.slf4j
-   slf4j-log4j12
-   ${slf4j.version}
-   provided
-   
-   
-   log4j
-   log4j
-   ${log4j.version}
-   

[7/9] flink git commit: [FLINK-8767] [quickstarts] Set the maven.compiler.source and .target properties for Java Quickstart

2018-02-28 Thread sewen
[FLINK-8767] [quickstarts] Set the maven.compiler.source and .target properties 
for Java Quickstart

Setting these properties helps properly pinning the Java version in IntelliJ.
Without these properties, Java version keeps switching back to 1.5 in some 
setups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb0a0fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb0a0fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb0a0fb

Branch: refs/heads/release-1.4
Commit: 4eb0a0fb9f41b29f6b87ce77f5e985f2b9fa30a3
Parents: 23c37cf
Author: Stephan Ewen 
Authored: Fri Feb 23 11:18:36 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:43:35 2018 +0100

--
 .../src/main/resources/archetype-resources/pom.xml   | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb0a0fb/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index d53415a..0ca6eb9 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -31,7 +31,10 @@ under the License.


UTF-8
@project.version@
+   1.8
2.11
+   ${java.version}
+   ${java.version}

 

@@ -100,8 +103,8 @@ under the License.
maven-compiler-plugin
3.1

-   1.8
-   1.8
+   ${java.version}
+   ${java.version}


 
@@ -158,8 +161,8 @@ under the License.


maven-compiler-plugin

-   1.8
-   1.8
+   ${java.version}
+   ${java.version}
jdt





[5/9] flink git commit: [FLINK-8765] [quickstarts] Simplify quickstart properties

2018-02-28 Thread sewen
[FLINK-8765] [quickstarts] Simplify quickstart properties

This does not pull out the slf4j and log4j version into properties any more,
making the quickstarts a bit simpler.

Given that both versions are used only once, and only for the feature to have
convenience logging in the IDE, the versions might as well be defined directly
in the dependencies.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11e19a56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11e19a56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11e19a56

Branch: refs/heads/release-1.4
Commit: 11e19a56bfdedb4f41ecc3dcf1cd4120191cdb1a
Parents: fa30638
Author: Stephan Ewen 
Authored: Fri Feb 23 11:10:43 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:43:01 2018 +0100

--
 .../src/main/resources/archetype-resources/pom.xml | 6 ++
 .../src/main/resources/archetype-resources/pom.xml | 6 ++
 2 files changed, 4 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/11e19a56/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 6ac05b0..50b0351 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -31,8 +31,6 @@ under the License.


UTF-8
@project.version@
-   @slf4j.version@
-   @log4j.version@

@scala.binary.version@

 
@@ -82,13 +80,13 @@ under the License.

org.slf4j
slf4j-log4j12
-   ${slf4j.version}
+   1.7.7
runtime


log4j
log4j
-   ${log4j.version}
+   1.2.17
runtime



http://git-wip-us.apache.org/repos/asf/flink/blob/11e19a56/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index b318a19..4a866c3 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -46,8 +46,6 @@ under the License.


UTF-8
@project.version@
-   @slf4j.version@
-   @log4j.version@
2.11
2.11.11

@@ -92,13 +90,13 @@ under the License.

org.slf4j
slf4j-log4j12
-   ${slf4j.version}
+   1.7.7
runtime


log4j
log4j
-   ${log4j.version}
+   1.2.17
runtime





[9/9] flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies

2018-02-28 Thread sewen
[FLINK-8791] [docs] Fix documentation about configuring dependencies


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51231c47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51231c47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51231c47

Branch: refs/heads/release-1.4
Commit: 51231c471bca5e400e1bf152d3cfbdb68c809567
Parents: 0c848aa
Author: Stephan Ewen 
Authored: Mon Feb 26 16:41:24 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:44:49 2018 +0100

--
 docs/dev/linking.md |  96 
 docs/dev/linking_with_flink.md  | 146 ---
 docs/redirects/linking_with_flink.md|  25 ++
 docs/redirects/linking_with_optional_modules.md |  25 ++
 docs/start/dependencies.md  | 244 +++
 5 files changed, 294 insertions(+), 242 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/51231c47/docs/dev/linking.md
--
diff --git a/docs/dev/linking.md b/docs/dev/linking.md
deleted file mode 100644
index 78ef544..000
--- a/docs/dev/linking.md
+++ /dev/null
@@ -1,96 +0,0 @@

-nav-title: "Linking with Optional Modules"
-title: "Linking with modules not contained in the binary distribution"
-nav-parent_id: start
-nav-pos: 10

-
-
-The binary distribution contains jar packages in the `lib` folder that are 
automatically
-provided to the classpath of your distributed programs. Almost all of Flink 
classes are
-located there with a few exceptions, for example the streaming connectors and 
some freshly
-added modules. To run code depending on these modules you need to make them 
accessible
-during runtime, for which we suggest two options:
-
-1. Either copy the required jar files to the `lib` folder onto all of your 
TaskManagers.
-Note that you have to restart your TaskManagers after this.
-2. Or package them with your code.
-
-The latter version is recommended as it respects the classloader management in 
Flink.
-
-### Packaging dependencies with your usercode with Maven
-
-To provide these dependencies not included by Flink we suggest two options 
with Maven.
-
-1. The maven assembly plugin builds a so-called uber-jar (executable jar) 
containing all your dependencies.
-The assembly configuration is straight-forward, but the resulting jar might 
become bulky.
-See 
[maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html)
 for further information.
-2. The maven unpack plugin unpacks the relevant parts of the dependencies and
-then packages it with your code.
-
-Using the latter approach in order to bundle the Kafka connector, 
`flink-connector-kafka`
-you would need to add the classes from both the connector and the Kafka API 
itself. Add
-the following to your plugins section.
-
-~~~xml
-
-org.apache.maven.plugins
-maven-dependency-plugin
-2.9
-
-
-unpack
-
-prepare-package
-
-unpack
-
-
-
-
-
-org.apache.flink
-flink-connector-kafka
-{{ site.version }}
-jar
-false
-
${project.build.directory}/classes
-org/apache/flink/**
-
-
-
-org.apache.kafka
-kafka_
-
-jar
-false
-
${project.build.directory}/classes
-kafka/**
-
-
-
-
-
-
-~~~
-
-Now when running `mvn clean package` the produced jar includes the required 
dependencies.
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/51231c47/docs/dev/linking_with_flink.md
--
diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md
deleted file mode 100644
index f2380b2..000
--- a/docs/dev/linking_with_flink.md
+++ /dev/null
@@ -1,146 +0,0 @@

-title: "Linking with Flink"
-nav-parent_id: start
-nav-pos: 2

-
-
-To write programs with Flink, you need to include the Flink library 
corresponding to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
-create a blank project from a template (a Maven Archetyp

[8/9] flink git commit: [FLINK-8764] [docs] Adjust quickstart documentation

2018-02-28 Thread sewen
[FLINK-8764] [docs] Adjust quickstart documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c848aa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c848aa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c848aa9

Branch: refs/heads/release-1.4
Commit: 0c848aa98b466764873f2372af3ffd64cd8f71ac
Parents: 4eb0a0f
Author: Stephan Ewen 
Authored: Mon Feb 26 12:19:00 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:43:58 2018 +0100

--
 docs/quickstart/java_api_quickstart.md  | 119 ++-
 docs/quickstart/scala_api_quickstart.md |  95 ++---
 2 files changed, 52 insertions(+), 162 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0c848aa9/docs/quickstart/java_api_quickstart.md
--
diff --git a/docs/quickstart/java_api_quickstart.md 
b/docs/quickstart/java_api_quickstart.md
index baf14de..9a32591 100644
--- a/docs/quickstart/java_api_quickstart.md
+++ b/docs/quickstart/java_api_quickstart.md
@@ -1,6 +1,6 @@
 ---
-title: "Sample Project using the Java API"
-nav-title: Sample Project in Java
+title: "Project Template for Java"
+nav-title: Project Template for Java
 nav-parent_id: start
 nav-pos: 0
 ---
@@ -86,120 +86,51 @@ quickstart/
 │   └── myorg
 │   └── quickstart
 │   ├── BatchJob.java
-│   ├── SocketTextStreamWordCount.java
-│   ├── StreamingJob.java
-│   └── WordCount.java
+│   └── StreamingJob.java
 └── resources
 └── log4j.properties
 {% endhighlight %}
 
-The sample project is a __Maven project__, which contains four classes. 
_StreamingJob_ and _BatchJob_ are basic skeleton programs, 
_SocketTextStreamWordCount_ is a working streaming example and _WordCountJob_ 
is a working batch example. Please note that the _main_ method of all classes 
allow you to start Flink in a development/testing mode.
+The sample project is a __Maven project__, which contains two classes: 
_StreamingJob_ and _BatchJob_ are the basic skeleton programs for a 
*DataStream* and *DataSet* program.
+The _main_ method is the entry point of the program, both for in-IDE 
testing/execution and for proper deployments.
 
 We recommend you __import this project into your IDE__ to develop and
-test it. If you use Eclipse, the [m2e plugin](http://www.eclipse.org/m2e/)
+test it. IntelliJ IDEA supports Maven projects out of the box.
+If you use Eclipse, the [m2e plugin](http://www.eclipse.org/m2e/)
 allows to [import Maven 
projects](http://books.sonatype.com/m2eclipse-book/reference/creating-sect-importing-projects.html#fig-creating-import).
 Some Eclipse bundles include that plugin by default, others require you
-to install it manually. The IntelliJ IDE supports Maven projects out of
-the box.
+to install it manually. 
 
-
-*A note to Mac OS X users*: The default JVM heapsize for Java is too
+*A note to Mac OS X users*: The default JVM heapsize for Java mey be too
 small for Flink. You have to manually increase it. In Eclipse, choose
 `Run Configurations -> Arguments` and write into the `VM Arguments`
 box: `-Xmx800m`.
 
 ## Build Project
 
-If you want to __build your project__, go to your project directory and
-issue the `mvn clean install -Pbuild-jar` command. You will
-__find a jar__ that runs on every Flink cluster with a compatible
-version, __target/original-your-artifact-id-your-version.jar__. There
-is also a fat-jar in __target/your-artifact-id-your-version.jar__ which,
-additionally, contains all dependencies that were added to the Maven
-project.
+If you want to __build/package your project__, go to your project directory and
+run the '`mvn clean package`' command.
+You will __find a JAR file__ that contains your application, plus connectors 
and libraries
+that you may have added as dependencoes to the application: 
`target/-.jar`.
+
+__Note:__ If you use a different class than *StreamingJob* as the 
application's main class / entry point,
+we recommend you change the `mainClass` setting in the `pom.xml` file 
accordingly. That way, the Flink
+can run time application from the JAR file without additionally specifying the 
main class.
 
 ## Next Steps
 
 Write your application!
 
-The quickstart project contains a `WordCount` implementation, the
-"Hello World" of Big Data processing systems. The goal of `WordCount`
-is to determine the frequencies of words in a text, e.g., how often do
-the terms "the" or "house" occur in all Wikipedia texts.
-
-__Sample Input__:
-
-~~~bash
-big data is big
-~~~
-
-__Sample Output__:
-
-~~~bash
-big 2
-data 1
-

[2/9] flink git commit: [hotfix] [quickstarts] Fix header and package declaration order.

2018-02-28 Thread sewen
[hotfix] [quickstarts] Fix header and package declaration order.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a60fcb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a60fcb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a60fcb0

Branch: refs/heads/release-1.4
Commit: 4a60fcb0f41e96221108a43fc7037e08ea047758
Parents: 179cfdd
Author: Stephan Ewen 
Authored: Wed Feb 21 20:30:35 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:35:48 2018 +0100

--
 .../resources/archetype-resources/src/main/java/BatchJob.java  | 6 +++---
 .../archetype-resources/src/main/java/StreamingJob.java| 6 +++---
 .../archetype-resources/src/main/scala/BatchJob.scala  | 6 +++---
 .../archetype-resources/src/main/scala/StreamingJob.scala  | 6 +++---
 4 files changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
index 9711924..9515791 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
@@ -1,6 +1,4 @@
-package ${package};
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +16,8 @@ package ${package};
  * limitations under the License.
  */
 
+package ${package};
+
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
index 6027e75..4091889 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
@@ -1,6 +1,4 @@
-package ${package};
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +16,8 @@ package ${package};
  * limitations under the License.
  */
 
+package ${package};
+
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
index a533da9..46520b7 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
@@ -1,6 +1,4 @@
-package ${package}
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +16,8 @@ package ${package}
  * limitations under the License.
  */
 
+package ${package}
+
 import org.apache.flink.api.scala._
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
index 7c950b1..20115a7 100644
--- 
a/flink-quickstart/fli

[3/9] flink git commit: [hotfix] [quickstarts] Fix block comments in program stubs.

2018-02-28 Thread sewen
[hotfix] [quickstarts] Fix block comments in program stubs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd35a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd35a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd35a3c

Branch: refs/heads/release-1.4
Commit: 6bd35a3c537eaa54c55312a6b3703208582b9229
Parents: 4a60fcb
Author: Stephan Ewen 
Authored: Wed Feb 21 21:05:23 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:36:08 2018 +0100

--
 .../main/resources/archetype-resources/src/main/java/BatchJob.java | 2 +-
 .../resources/archetype-resources/src/main/java/StreamingJob.java  | 2 +-
 .../resources/archetype-resources/src/main/scala/BatchJob.scala| 2 +-
 .../archetype-resources/src/main/scala/StreamingJob.scala  | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
index 9515791..db2ee60 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
@@ -36,7 +36,7 @@ public class BatchJob {
// set up the batch execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-   /**
+   /*
 * Here, you can start creating your execution plan for Flink.
 *
 * Start with getting some data from the environment, like

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
index 4091889..5bcee21 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java
@@ -38,7 +38,7 @@ public class StreamingJob {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-   /**
+   /*
 * Here, you can start creating your execution plan for Flink.
 *
 * Start with getting some data from the environment, like

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
index 46520b7..329e6c2 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala
@@ -36,7 +36,7 @@ object BatchJob {
 // set up the batch execution environment
 val env = ExecutionEnvironment.getExecutionEnvironment
 
-/**
+/*
  * Here, you can start creating your execution plan for Flink.
  *
  * Start with getting some data from the environment, like

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala
index 20115a7..bab0bb9 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src

[6/9] flink git commit: [FLINK-8766] [quickstarts] Pin scala runtime version for Java Quickstart

2018-02-28 Thread sewen
[FLINK-8766] [quickstarts] Pin scala runtime version for Java Quickstart

Followup to FLINK-7414, which pinned the scala version for the Scala Quickstart


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23c37cf4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23c37cf4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23c37cf4

Branch: refs/heads/release-1.4
Commit: 23c37cf43cac94ecaa43641d264f7422bc1c57ed
Parents: 11e19a5
Author: Stephan Ewen 
Authored: Fri Feb 23 11:13:01 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:43:19 2018 +0100

--
 .../src/main/resources/archetype-resources/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/23c37cf4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 50b0351..d53415a 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -31,7 +31,7 @@ under the License.


UTF-8
@project.version@
-   
@scala.binary.version@
+   2.11

 




[1/9] flink git commit: [FLINK-8762] [quickstarts] Make 'StreamingJob' the default main class and remove WordCount example from the quickstart.

2018-02-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.4 392cfaaed -> 51231c471


[FLINK-8762] [quickstarts] Make 'StreamingJob' the default main class and 
remove WordCount example from the quickstart.

The packaged example jobs have been reported to not be terribly helpful and
simply create noise in the initial project setup.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/179cfdd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/179cfdd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/179cfdd4

Branch: refs/heads/release-1.4
Commit: 179cfdd4c6fd5f38ab01c2a55b17f251cc173dbc
Parents: 392cfaa
Author: Stephan Ewen 
Authored: Wed Feb 21 20:20:51 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:35:33 2018 +0100

--
 .../main/resources/archetype-resources/pom.xml  |   6 +-
 .../src/main/java/BatchJob.java |  20 +---
 .../main/java/SocketTextStreamWordCount.java| 108 ---
 .../src/main/java/StreamingJob.java |  21 ++--
 .../src/main/java/WordCount.java|  94 
 .../main/resources/archetype-resources/pom.xml  |   6 +-
 .../src/main/scala/BatchJob.scala   |  25 ++---
 .../main/scala/SocketTextStreamWordCount.scala  |  69 
 .../src/main/scala/StreamingJob.scala   |  24 ++---
 .../src/main/scala/WordCount.scala  |  53 -
 10 files changed, 27 insertions(+), 399 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 71bc0a1..84584af 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -189,15 +189,11 @@ under the License.






-   
-   




http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
index d0e68a4..9711924 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java
@@ -23,22 +23,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 /**
  * Skeleton for a Flink Batch Job.
  *
- * For a full example of a Flink Batch Job, see the WordCountJob.java file 
in the
- * same package/directory or have a look at the website.
+ * For a tutorial how to write a Flink batch application, check the
+ * tutorials and examples on the http://flink.apache.org/docs/stable/";>Flink Website.
  *
- * You can also generate a .jar file that you can submit on your Flink
- * cluster.
- * Just type
- * mvn clean package
- * in the projects root directory.
- * You will find the jar in
- * target/${artifactId}-${version}.jar
- * From the CLI you can then run
- * ./bin/flink run -c ${package}.BatchJob 
target/${artifactId}-${version}.jar
- *
- * For more information on the CLI see:
- *
- * http://flink.apache.org/docs/latest/apis/cli.html
+ * To package your appliation into a JAR file for execution,
+ * change the main class in the POM.xml file to this class (simply search for 
'mainClass')
+ * and run 'mvn clean package' on the command line.
  */
 public class BatchJob {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java
--

svn commit: r25326 - in /release/flink/flink-shaded-3.0: ./ flink-shaded-3.0-src.tgz flink-shaded-3.0-src.tgz.asc flink-shaded-3.0-src.tgz.md5 flink-shaded-3.0-src.tgz.sha

2018-02-28 Thread chesnay
Author: chesnay
Date: Wed Feb 28 10:34:11 2018
New Revision: 25326

Log:
Add flink-shaded 3.0 release files

Added:
release/flink/flink-shaded-3.0/
release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz   (with props)
release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc
release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5
release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha

Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz
==
Binary file - no diff available.

Propchange: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz
--
svn:mime-type = application/octet-stream

Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc
==
--- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc (added)
+++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc Wed Feb 28 
10:34:11 2018
@@ -0,0 +1,17 @@
+-BEGIN PGP SIGNATURE-
+Version: GnuPG v1
+
+iQIcBAABAgAGBQJaiswJAAoJEMLu17ER1GS6quIQAJIIfjpZ1QZAq9JasaVxDvy0
+u8P7kRFPd8YZiy5ffdqc5mtZzBrKyM4m8CqFalFZsCxyIQHJL2orsMkdseUG798M
+QT3zFZI5kwACq3TahVfULWSGTWbu0lhv4Hfr4QhQwqd4Sd1SgLJHcE3oYxZZlX9h
+fmyDkbXmwvehgS8Wl27wXBjpybHjE6454Q1ajiFm7y6uLqcl+1SyRGEn0+4S2koL
+AT5znvFatoLLEdouOLhy8iP2Q9ewYNVJTmskgRmOOh7Hx0fwleZt9Tdid31XRvHB
+14o84ZU6dkA6wUhnl2xzoGezqXaNmDGOgH8zP0yHdFyQBUwImlpIv6h+5gC6JX/j
+eqjjtjeLZuVT56KXGYj3uFQQf11iwxou0LzOqvn6pN5SK77LXwRHAcnlNRXNVEDh
+Di/37Xoa8MZ5F1og/5ResFFgNXer7n2hNYnqCEQW28oIQOBRKQcpkKnDJjLYJk1L
+rSJ9Q+259nzE8ly0NxVOfpeiC1fWtcROnmNNdRruQ7pWaDdl0yNVrswvFpXWCDDO
+gyBgs5jdIPwsrKDtbDWA1zOzvWMcKXD3MSWa7OjeGuA/uja27cF3gN80/w4Gyiya
+VdzhTuJTczfevkPb81r4u851WzeDO9g7l2PhUpbjds2OMnEt6LMJFEyw5OnsHl8w
+qcWp3rApSoV0jVUhY4ml
+=KwVl
+-END PGP SIGNATURE-

Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5
==
--- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 (added)
+++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 Wed Feb 28 
10:34:11 2018
@@ -0,0 +1 @@
+88b20a832fc947a002e7c0740e0a9e9e  flink-shaded-3.0-src.tgz

Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha
==
--- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha (added)
+++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha Wed Feb 28 
10:34:11 2018
@@ -0,0 +1 @@
+982a14a4952814f08691105fe0e1567542511ab421af7a72c78452e53571313dbc6fc456ba31d0f23abee0118823d463db851a33cf8cd2510b3611dba6815b9d
  flink-shaded-3.0-src.tgz




flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies

2018-02-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master d9f2f2f83 -> f89fce83e


[FLINK-8791] [docs] Fix documentation about configuring dependencies

This closes #5586


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f89fce83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f89fce83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f89fce83

Branch: refs/heads/master
Commit: f89fce83eaadddcc3aa8dee2167188a2b53b0c8e
Parents: d9f2f2f
Author: Stephan Ewen 
Authored: Mon Feb 26 16:41:24 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:26:03 2018 +0100

--
 docs/dev/linking.md |  96 
 docs/dev/linking_with_flink.md  | 146 ---
 docs/redirects/linking_with_flink.md|  25 ++
 docs/redirects/linking_with_optional_modules.md |  25 ++
 docs/start/dependencies.md  | 244 +++
 5 files changed, 294 insertions(+), 242 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f89fce83/docs/dev/linking.md
--
diff --git a/docs/dev/linking.md b/docs/dev/linking.md
deleted file mode 100644
index 78ef544..000
--- a/docs/dev/linking.md
+++ /dev/null
@@ -1,96 +0,0 @@

-nav-title: "Linking with Optional Modules"
-title: "Linking with modules not contained in the binary distribution"
-nav-parent_id: start
-nav-pos: 10

-
-
-The binary distribution contains jar packages in the `lib` folder that are 
automatically
-provided to the classpath of your distributed programs. Almost all of Flink 
classes are
-located there with a few exceptions, for example the streaming connectors and 
some freshly
-added modules. To run code depending on these modules you need to make them 
accessible
-during runtime, for which we suggest two options:
-
-1. Either copy the required jar files to the `lib` folder onto all of your 
TaskManagers.
-Note that you have to restart your TaskManagers after this.
-2. Or package them with your code.
-
-The latter version is recommended as it respects the classloader management in 
Flink.
-
-### Packaging dependencies with your usercode with Maven
-
-To provide these dependencies not included by Flink we suggest two options 
with Maven.
-
-1. The maven assembly plugin builds a so-called uber-jar (executable jar) 
containing all your dependencies.
-The assembly configuration is straight-forward, but the resulting jar might 
become bulky.
-See 
[maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html)
 for further information.
-2. The maven unpack plugin unpacks the relevant parts of the dependencies and
-then packages it with your code.
-
-Using the latter approach in order to bundle the Kafka connector, 
`flink-connector-kafka`
-you would need to add the classes from both the connector and the Kafka API 
itself. Add
-the following to your plugins section.
-
-~~~xml
-
-org.apache.maven.plugins
-maven-dependency-plugin
-2.9
-
-
-unpack
-
-prepare-package
-
-unpack
-
-
-
-
-
-org.apache.flink
-flink-connector-kafka
-{{ site.version }}
-jar
-false
-
${project.build.directory}/classes
-org/apache/flink/**
-
-
-
-org.apache.kafka
-kafka_
-
-jar
-false
-
${project.build.directory}/classes
-kafka/**
-
-
-
-
-
-
-~~~
-
-Now when running `mvn clean package` the produced jar includes the required 
dependencies.
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/f89fce83/docs/dev/linking_with_flink.md
--
diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md
deleted file mode 100644
index f2380b2..000
--- a/docs/dev/linking_with_flink.md
+++ /dev/null
@@ -1,146 +0,0 @@

-title: "Linking with Flink"
-nav-parent_id: start
-nav-pos: 2

-
-
-To write programs with Flink, you need to include the Flink library 
corresponding to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quicks

flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies

2018-02-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.5 db2c510fb -> 23358ff87


[FLINK-8791] [docs] Fix documentation about configuring dependencies


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23358ff8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23358ff8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23358ff8

Branch: refs/heads/release-1.5
Commit: 23358ff87003fd6603c0ca19bc37f31944d2c494
Parents: db2c510
Author: Stephan Ewen 
Authored: Mon Feb 26 16:41:24 2018 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 28 11:17:02 2018 +0100

--
 docs/dev/linking.md |  96 
 docs/dev/linking_with_flink.md  | 146 ---
 docs/redirects/linking_with_flink.md|  25 ++
 docs/redirects/linking_with_optional_modules.md |  25 ++
 docs/start/dependencies.md  | 244 +++
 5 files changed, 294 insertions(+), 242 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/23358ff8/docs/dev/linking.md
--
diff --git a/docs/dev/linking.md b/docs/dev/linking.md
deleted file mode 100644
index 78ef544..000
--- a/docs/dev/linking.md
+++ /dev/null
@@ -1,96 +0,0 @@

-nav-title: "Linking with Optional Modules"
-title: "Linking with modules not contained in the binary distribution"
-nav-parent_id: start
-nav-pos: 10

-
-
-The binary distribution contains jar packages in the `lib` folder that are 
automatically
-provided to the classpath of your distributed programs. Almost all of Flink 
classes are
-located there with a few exceptions, for example the streaming connectors and 
some freshly
-added modules. To run code depending on these modules you need to make them 
accessible
-during runtime, for which we suggest two options:
-
-1. Either copy the required jar files to the `lib` folder onto all of your 
TaskManagers.
-Note that you have to restart your TaskManagers after this.
-2. Or package them with your code.
-
-The latter version is recommended as it respects the classloader management in 
Flink.
-
-### Packaging dependencies with your usercode with Maven
-
-To provide these dependencies not included by Flink we suggest two options 
with Maven.
-
-1. The maven assembly plugin builds a so-called uber-jar (executable jar) 
containing all your dependencies.
-The assembly configuration is straight-forward, but the resulting jar might 
become bulky.
-See 
[maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html)
 for further information.
-2. The maven unpack plugin unpacks the relevant parts of the dependencies and
-then packages it with your code.
-
-Using the latter approach in order to bundle the Kafka connector, 
`flink-connector-kafka`
-you would need to add the classes from both the connector and the Kafka API 
itself. Add
-the following to your plugins section.
-
-~~~xml
-
-org.apache.maven.plugins
-maven-dependency-plugin
-2.9
-
-
-unpack
-
-prepare-package
-
-unpack
-
-
-
-
-
-org.apache.flink
-flink-connector-kafka
-{{ site.version }}
-jar
-false
-
${project.build.directory}/classes
-org/apache/flink/**
-
-
-
-org.apache.kafka
-kafka_
-
-jar
-false
-
${project.build.directory}/classes
-kafka/**
-
-
-
-
-
-
-~~~
-
-Now when running `mvn clean package` the produced jar includes the required 
dependencies.
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/23358ff8/docs/dev/linking_with_flink.md
--
diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md
deleted file mode 100644
index f2380b2..000
--- a/docs/dev/linking_with_flink.md
+++ /dev/null
@@ -1,146 +0,0 @@

-title: "Linking with Flink"
-nav-parent_id: start
-nav-pos: 2

-
-
-To write programs with Flink, you need to include the Flink library 
corresponding to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quickstart/scal

[flink-shaded] tag 3.0 created (now fd0f8cc)

2018-02-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to tag 3.0
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git.


  at fd0f8cc  (commit)
No new revisions were added by this update.

-- 
To stop receiving notification emails like this one, please contact
ches...@apache.org.