[flink] Git Push Summary
Repository: flink Updated Tags: refs/tags/release-1.4.2-rc2 [created] 0f4978eb6
[5/5] flink git commit: [FLINK-7805][flip6] Recover YARN containers after AM restart.
[FLINK-7805][flip6] Recover YARN containers after AM restart. Recover previously running containers after a restart of the ApplicationMaster. This is a port of a feature that was already implemented prior to FLIP-6. Extract RegisterApplicationMasterResponseReflector class into separate file. This closes #5597. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e92eb391 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e92eb391 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e92eb391 Branch: refs/heads/release-1.5 Commit: e92eb391cada27b49d32e7b11453fb5f06e19880 Parents: 94bbd56 Author: gyao Authored: Wed Feb 28 13:20:23 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:43:39 2018 +0100 -- ...isterApplicationMasterResponseReflector.java | 102 .../flink/yarn/YarnFlinkResourceManager.java| 52 - .../apache/flink/yarn/YarnResourceManager.java | 17 ++- ...rApplicationMasterResponseReflectorTest.java | 117 +++ 4 files changed, 235 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e92eb391/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java new file mode 100644 index 000..13b5745 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Looks up the method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()} + * once and saves the method. This saves computation time on subsequent calls. + */ +class RegisterApplicationMasterResponseReflector { + + private final Logger logger; + + /** +* Reflected method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}. +*/ + private Method method; + + RegisterApplicationMasterResponseReflector(final Logger log) { + this(log, RegisterApplicationMasterResponse.class); + } + + @VisibleForTesting + RegisterApplicationMasterResponseReflector(final Logger log, final Class clazz) { + this.logger = requireNonNull(log); + requireNonNull(clazz); + + try { + method = clazz.getMethod("getContainersFromPreviousAttempts"); + } catch (NoSuchMethodException e) { + // that happens in earlier Hadoop versions (pre 2.2) + logger.info("Cannot reconnect to previously allocated containers. " + + "This YARN version does not support 'getContainersFromPreviousAttempts()'"); + } + } + + /** +* Checks if a YARN application still has registered containers. If the application master +* registered at the ResourceManager for the first time, this list will be empty. If the +* application master registered a repeated time (after a failure and recovery), this list +* will contain the containers that were previously allocated. +* +* @param response The response object from the registration at the ResourceManager. +* @return A list with containers from previous application attempt. +*/ + List getContainersFromPrevio
[2/5] flink git commit: [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the
[hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb37502 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb37502 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb37502 Branch: refs/heads/release-1.5 Commit: adb3750226971f7c67a0d3069103b56e4fee1c27 Parents: b724792 Author: gyao Authored: Wed Feb 28 13:07:04 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:43:38 2018 +0100 -- .../src/test/java/org/apache/flink/yarn/YarnTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/adb37502/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java -- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index c863e14..7bca321 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -102,7 +102,7 @@ public abstract class YarnTestBase extends TestLogger { "Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode. }; - /** These strings are white-listed, overriding teh prohibited strings. */ + /** These strings are white-listed, overriding the prohibited strings. */ protected static final String[] WHITELISTED_STRINGS = { "akka.remote.RemoteTransportExceptionNoStackTrace", // workaround for annoying InterruptedException logging:
[4/5] flink git commit: [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase
[hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase Test swapped actual and expected arguments. Remove catching Throwable in test; instead propagate all exceptions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94bbd564 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94bbd564 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94bbd564 Branch: refs/heads/release-1.5 Commit: 94bbd564ce5214b3366cc6d299fcb99ae62a2bd8 Parents: adb3750 Author: gyao Authored: Wed Feb 28 13:08:25 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:43:38 2018 +0100 -- .../flink/yarn/YARNSessionCapacitySchedulerITCase.java | 11 --- 1 file changed, 4 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/94bbd564/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java -- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 3a674ad..d00a9c4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -416,7 +416,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { * Test a fire-and-forget job submission to a YARN cluster. */ @Test(timeout = 6) - public void testDetachedPerJobYarnCluster() throws IOException { + public void testDetachedPerJobYarnCluster() throws Exception { LOG.info("Starting testDetachedPerJobYarnCluster()"); File exampleJarLocation = new File("target/programs/BatchWordCount.jar"); @@ -432,7 +432,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { * Test a fire-and-forget job submission to a YARN cluster. */ @Test(timeout = 6) - public void testDetachedPerJobYarnClusterWithStreamingJob() throws IOException { + public void testDetachedPerJobYarnClusterWithStreamingJob() throws Exception { LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); File exampleJarLocation = new File("target/programs/StreamingWordCount.jar"); @@ -444,7 +444,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); } - private void testDetachedPerJobYarnClusterInternal(String job) throws IOException { + private void testDetachedPerJobYarnClusterInternal(String job) throws Exception { YarnClient yc = YarnClient.createYarnClient(); yc.init(YARN_CONFIGURATION); yc.start(); @@ -575,9 +575,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { } while (rep.getYarnApplicationState() == YarnApplicationState.RUNNING); verifyApplicationTags(rep); - } catch (Throwable t) { - LOG.warn("Error while detached yarn session was running", t); - Assert.fail(t.getMessage()); } finally { //cleanup the yarn-properties file @@ -625,7 +622,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { @SuppressWarnings("unchecked") Set applicationTags = (Set) applicationTagsMethod.invoke(report); - Assert.assertEquals(applicationTags, Collections.singleton("test-tag")); + Assert.assertEquals(Collections.singleton("test-tag"), applicationTags); } @After
[3/5] flink git commit: [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the
[hotfix][Javadoc] Fix typo in YARN Utils: teh -> the Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7247929 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7247929 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7247929 Branch: refs/heads/release-1.5 Commit: b7247929d0745b3b83306d0c93d97faf4ece4107 Parents: f60e46d Author: gyao Authored: Wed Feb 28 13:06:00 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:43:38 2018 +0100 -- flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b7247929/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 9ae5b54..ff2478e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -320,7 +320,7 @@ public final class Utils { * * @return The launch context for the TaskManager processes. * -* @throws Exception Thrown if teh launch context could not be created, for example if +* @throws Exception Thrown if the launch context could not be created, for example if * the resources could not be copied. */ static ContainerLaunchContext createTaskExecutorContext(
[1/5] flink git commit: [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService
Repository: flink Updated Branches: refs/heads/release-1.5 26c8f6c2a -> e92eb391c [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f60e46da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f60e46da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f60e46da Branch: refs/heads/release-1.5 Commit: f60e46dafa8950d5e40cd8a3286c172ecaea6b73 Parents: 26c8f6c Author: gyao Authored: Wed Feb 28 13:04:19 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:43:37 2018 +0100 -- .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f60e46da/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 59d3592..dc0f3ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -199,7 +199,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le } } } else { - LOG.warn("The leader session ID {} was confirmed even though the" + + LOG.warn("The leader session ID {} was confirmed even though the " + "corresponding JobManager was not elected as the leader.", leaderSessionID); } }
[2/5] flink git commit: [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the
[hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f409f920 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f409f920 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f409f920 Branch: refs/heads/master Commit: f409f9208f340a6b741818a56a9a07b236fb0a7f Parents: 1987445 Author: gyao Authored: Wed Feb 28 13:07:04 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:37:13 2018 +0100 -- .../src/test/java/org/apache/flink/yarn/YarnTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f409f920/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java -- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index c863e14..7bca321 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -102,7 +102,7 @@ public abstract class YarnTestBase extends TestLogger { "Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode. }; - /** These strings are white-listed, overriding teh prohibited strings. */ + /** These strings are white-listed, overriding the prohibited strings. */ protected static final String[] WHITELISTED_STRINGS = { "akka.remote.RemoteTransportExceptionNoStackTrace", // workaround for annoying InterruptedException logging:
[5/5] flink git commit: [FLINK-7805][flip6] Recover YARN containers after AM restart.
[FLINK-7805][flip6] Recover YARN containers after AM restart. Recover previously running containers after a restart of the ApplicationMaster. This is a port of a feature that was already implemented prior to FLIP-6. Extract RegisterApplicationMasterResponseReflector class into separate file. This closes #5597. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45397fe9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45397fe9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45397fe9 Branch: refs/heads/master Commit: 45397fe974e1390cd39a34fc2eb216f3771ddf06 Parents: 035257e Author: gyao Authored: Wed Feb 28 13:20:23 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:41:53 2018 +0100 -- ...isterApplicationMasterResponseReflector.java | 102 .../flink/yarn/YarnFlinkResourceManager.java| 52 - .../apache/flink/yarn/YarnResourceManager.java | 17 ++- ...rApplicationMasterResponseReflectorTest.java | 117 +++ 4 files changed, 235 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java new file mode 100644 index 000..13b5745 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Looks up the method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()} + * once and saves the method. This saves computation time on subsequent calls. + */ +class RegisterApplicationMasterResponseReflector { + + private final Logger logger; + + /** +* Reflected method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}. +*/ + private Method method; + + RegisterApplicationMasterResponseReflector(final Logger log) { + this(log, RegisterApplicationMasterResponse.class); + } + + @VisibleForTesting + RegisterApplicationMasterResponseReflector(final Logger log, final Class clazz) { + this.logger = requireNonNull(log); + requireNonNull(clazz); + + try { + method = clazz.getMethod("getContainersFromPreviousAttempts"); + } catch (NoSuchMethodException e) { + // that happens in earlier Hadoop versions (pre 2.2) + logger.info("Cannot reconnect to previously allocated containers. " + + "This YARN version does not support 'getContainersFromPreviousAttempts()'"); + } + } + + /** +* Checks if a YARN application still has registered containers. If the application master +* registered at the ResourceManager for the first time, this list will be empty. If the +* application master registered a repeated time (after a failure and recovery), this list +* will contain the containers that were previously allocated. +* +* @param response The response object from the registration at the ResourceManager. +* @return A list with containers from previous application attempt. +*/ + List getContainersFromPreviousAtt
[3/5] flink git commit: [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the
[hotfix][Javadoc] Fix typo in YARN Utils: teh -> the Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19874459 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19874459 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19874459 Branch: refs/heads/master Commit: 19874459d5fc750158eea9dcc54be81d5d4e08c9 Parents: b19769d Author: gyao Authored: Wed Feb 28 13:06:00 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:37:13 2018 +0100 -- flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/19874459/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 9ae5b54..ff2478e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -320,7 +320,7 @@ public final class Utils { * * @return The launch context for the TaskManager processes. * -* @throws Exception Thrown if teh launch context could not be created, for example if +* @throws Exception Thrown if the launch context could not be created, for example if * the resources could not be copied. */ static ContainerLaunchContext createTaskExecutorContext(
[4/5] flink git commit: [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase
[hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase Test swapped actual and expected arguments. Remove catching Throwable in test; instead propagate all exceptions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/035257e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/035257e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/035257e4 Branch: refs/heads/master Commit: 035257e425133de9368bbef268c38f179bdc68c2 Parents: f409f92 Author: gyao Authored: Wed Feb 28 13:08:25 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:37:14 2018 +0100 -- .../flink/yarn/YARNSessionCapacitySchedulerITCase.java | 11 --- 1 file changed, 4 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/035257e4/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java -- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 3a674ad..d00a9c4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -416,7 +416,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { * Test a fire-and-forget job submission to a YARN cluster. */ @Test(timeout = 6) - public void testDetachedPerJobYarnCluster() throws IOException { + public void testDetachedPerJobYarnCluster() throws Exception { LOG.info("Starting testDetachedPerJobYarnCluster()"); File exampleJarLocation = new File("target/programs/BatchWordCount.jar"); @@ -432,7 +432,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { * Test a fire-and-forget job submission to a YARN cluster. */ @Test(timeout = 6) - public void testDetachedPerJobYarnClusterWithStreamingJob() throws IOException { + public void testDetachedPerJobYarnClusterWithStreamingJob() throws Exception { LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); File exampleJarLocation = new File("target/programs/StreamingWordCount.jar"); @@ -444,7 +444,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); } - private void testDetachedPerJobYarnClusterInternal(String job) throws IOException { + private void testDetachedPerJobYarnClusterInternal(String job) throws Exception { YarnClient yc = YarnClient.createYarnClient(); yc.init(YARN_CONFIGURATION); yc.start(); @@ -575,9 +575,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { } while (rep.getYarnApplicationState() == YarnApplicationState.RUNNING); verifyApplicationTags(rep); - } catch (Throwable t) { - LOG.warn("Error while detached yarn session was running", t); - Assert.fail(t.getMessage()); } finally { //cleanup the yarn-properties file @@ -625,7 +622,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { @SuppressWarnings("unchecked") Set applicationTags = (Set) applicationTagsMethod.invoke(report); - Assert.assertEquals(applicationTags, Collections.singleton("test-tag")); + Assert.assertEquals(Collections.singleton("test-tag"), applicationTags); } @After
[1/5] flink git commit: [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService
Repository: flink Updated Branches: refs/heads/master 273fea4aa -> 45397fe97 [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b19769db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b19769db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b19769db Branch: refs/heads/master Commit: b19769db7f916b2eb40a9a68c0ee60d0b27da0a2 Parents: 273fea4 Author: gyao Authored: Wed Feb 28 13:04:19 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 18:37:12 2018 +0100 -- .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b19769db/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 59d3592..dc0f3ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -199,7 +199,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le } } } else { - LOG.warn("The leader session ID {} was confirmed even though the" + + LOG.warn("The leader session ID {} was confirmed even though the " + "corresponding JobManager was not elected as the leader.", leaderSessionID); } }
flink git commit: [hotfix] [tests] Fix SelfConnectionITCase
Repository: flink Updated Branches: refs/heads/release-1.5 926566651 -> 26c8f6c2a [hotfix] [tests] Fix SelfConnectionITCase The test previously did not fail on failed execution, and thus evaluated incomplete results from a failed execution with th expected results. This cleans up serialization warnings and uses lambdas where possible, to make the code more readable. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26c8f6c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26c8f6c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26c8f6c2 Branch: refs/heads/release-1.5 Commit: 26c8f6c2a3ff75ffb954c816a57908318a2d8099 Parents: 9265666 Author: Stephan Ewen Authored: Wed Feb 28 12:15:30 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 17:54:11 2018 +0100 -- .../streaming/runtime/SelfConnectionITCase.java | 63 1 file changed, 12 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/26c8f6c2/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java index b302513..a8023a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java @@ -18,8 +18,6 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -38,6 +36,7 @@ import static org.junit.Assert.assertEquals; /** * Integration tests for connected streams. */ +@SuppressWarnings("serial") public class SelfConnectionITCase extends AbstractTestBase { /** @@ -46,26 +45,17 @@ public class SelfConnectionITCase extends AbstractTestBase { @Test public void differentDataStreamSameChain() throws Exception { - TestListResultSink resultSink = new TestListResultSink(); + TestListResultSink resultSink = new TestListResultSink<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream src = env.fromElements(1, 3, 5); - DataStream stringMap = src.map(new MapFunction() { - private static final long serialVersionUID = 1L; - - @Override - public String map(Integer value) throws Exception { - return "x " + value; - } - }); + DataStream stringMap = src.map(value -> "x " + value); stringMap.connect(src).map(new CoMapFunction() { - private static final long serialVersionUID = 1L; - @Override public String map1(String value) { return value; @@ -94,55 +84,30 @@ public class SelfConnectionITCase extends AbstractTestBase { * (This is not actually self-connect.) */ @Test - public void differentDataStreamDifferentChain() { + public void differentDataStreamDifferentChain() throws Exception { - TestListResultSink resultSink = new TestListResultSink(); + TestListResultSink resultSink = new TestListResultSink<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream src = env.fromElements(1, 3, 5).disableChaining(); - DataStream stringMap = src.flatMap(new FlatMapFunction() { - - private static final long serialVersionUID = 1L; + DataStream stringMap = src + .flatMap(new FlatMapFunction() { @Override public void flatMap(Integer value, Collector out) throws Exception { out.collect("x " + value); } - }).keyBy(new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Overri
flink git commit: [hotfix] [tests] Fix SelfConnectionITCase
Repository: flink Updated Branches: refs/heads/master e8de53817 -> 273fea4aa [hotfix] [tests] Fix SelfConnectionITCase The test previously did not fail on failed execution, and thus evaluated incomplete results from a failed execution with th expected results. This cleans up serialization warnings and uses lambdas where possible, to make the code more readable. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273fea4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273fea4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273fea4a Branch: refs/heads/master Commit: 273fea4aa36123c0501845e4bc3f8777f203e596 Parents: e8de538 Author: Stephan Ewen Authored: Wed Feb 28 12:15:30 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 17:48:32 2018 +0100 -- .../streaming/runtime/SelfConnectionITCase.java | 63 1 file changed, 12 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/273fea4a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java index b302513..a8023a0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java @@ -18,8 +18,6 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -38,6 +36,7 @@ import static org.junit.Assert.assertEquals; /** * Integration tests for connected streams. */ +@SuppressWarnings("serial") public class SelfConnectionITCase extends AbstractTestBase { /** @@ -46,26 +45,17 @@ public class SelfConnectionITCase extends AbstractTestBase { @Test public void differentDataStreamSameChain() throws Exception { - TestListResultSink resultSink = new TestListResultSink(); + TestListResultSink resultSink = new TestListResultSink<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream src = env.fromElements(1, 3, 5); - DataStream stringMap = src.map(new MapFunction() { - private static final long serialVersionUID = 1L; - - @Override - public String map(Integer value) throws Exception { - return "x " + value; - } - }); + DataStream stringMap = src.map(value -> "x " + value); stringMap.connect(src).map(new CoMapFunction() { - private static final long serialVersionUID = 1L; - @Override public String map1(String value) { return value; @@ -94,55 +84,30 @@ public class SelfConnectionITCase extends AbstractTestBase { * (This is not actually self-connect.) */ @Test - public void differentDataStreamDifferentChain() { + public void differentDataStreamDifferentChain() throws Exception { - TestListResultSink resultSink = new TestListResultSink(); + TestListResultSink resultSink = new TestListResultSink<>(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream src = env.fromElements(1, 3, 5).disableChaining(); - DataStream stringMap = src.flatMap(new FlatMapFunction() { - - private static final long serialVersionUID = 1L; + DataStream stringMap = src + .flatMap(new FlatMapFunction() { @Override public void flatMap(Integer value, Collector out) throws Exception { out.collect("x " + value); } - }).keyBy(new KeySelector() { - - private static final long serialVersionUID = 1L; - - @Override -
[2/2] flink git commit: [FLINK-8737][network] disallow creating a union of UnionInputGate instances
[FLINK-8737][network] disallow creating a union of UnionInputGate instances Recently, the pollNextBufferOrEvent() was added but not implemented but this is used in getNextBufferOrEvent() and thus any UnionInputGate containing a UnionInputGate would have failed already. There should be no use case for wiring up inputs this way. Therefore, fail early when trying to construct this. (cherry picked from commit e8de538) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92656665 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92656665 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92656665 Branch: refs/heads/release-1.5 Commit: 9265666517830350a4a7037029e347f33df1bea2 Parents: 18ff2ce Author: Nico Kruber Authored: Mon Feb 26 17:52:37 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:35:13 2018 +0100 -- .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/92656665/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 44cdd52..742592a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkState; * ++ * * - * It is possible to recursively union union input gates. + * It is NOT possible to recursively union union input gates. */ public class UnionInputGate implements InputGate, InputGateListener { @@ -103,6 +103,11 @@ public class UnionInputGate implements InputGate, InputGateListener { int currentNumberOfInputChannels = 0; for (InputGate inputGate : inputGates) { + if (inputGate instanceof UnionInputGate) { + // if we want to add support for this, we need to implement pollNextBufferOrEvent() + throw new UnsupportedOperationException("Cannot union a union of input gates."); + } + // The offset to use for buffer or event instances received from this input gate. inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); inputGatesWithRemainingData.add(inputGate);
[1/2] flink git commit: [hotfix][network] minor improvements in UnionInputGate
Repository: flink Updated Branches: refs/heads/release-1.5 f14532760 -> 926566651 [hotfix][network] minor improvements in UnionInputGate (cherry picked from commit 4203557) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18ff2ce1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18ff2ce1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18ff2ce1 Branch: refs/heads/release-1.5 Commit: 18ff2ce15bdb1e7bd246e438e47527a24559c86d Parents: f145327 Author: Nico Kruber Authored: Mon Feb 26 17:50:10 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:35:06 2018 +0100 -- .../io/network/partition/consumer/UnionInputGate.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/18ff2ce1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 393e087..44cdd52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Input gate wrapper to union the input from multiple input gates. * - * Each input gate has input channels attached from which it reads data. At each input gate, the + * Each input gate has input channels attached from which it reads data. At each input gate, the * input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive). * * @@ -49,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkState; * +--+--+ * * - * The union input gate maps these IDs from 0 to the *total* number of input channels across all + * The union input gate maps these IDs from 0 to the *total* number of input channels across all * unioned input gates, e.g. the channels of input gate 0 keep their original indexes and the * channel indexes of input gate 1 are set off by 2 to 2--4. * @@ -183,11 +183,11 @@ public class UnionInputGate implements InputGate, InputGateListener { bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); - return Optional.ofNullable(bufferOrEvent); + return Optional.of(bufferOrEvent); } @Override - public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + public Optional pollNextBufferOrEvent() throws UnsupportedOperationException { throw new UnsupportedOperationException(); } @@ -217,7 +217,7 @@ public class UnionInputGate implements InputGate, InputGateListener { private final BufferOrEvent bufferOrEvent; private final boolean moreInputGatesAvailable; - public InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { + InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { this.inputGate = checkNotNull(inputGate); this.bufferOrEvent = checkNotNull(bufferOrEvent); this.moreInputGatesAvailable = moreInputGatesAvailable;
[2/2] flink git commit: [FLINK-8737][network] disallow creating a union of UnionInputGate instances
[FLINK-8737][network] disallow creating a union of UnionInputGate instances Recently, the pollNextBufferOrEvent() was added but not implemented but this is used in getNextBufferOrEvent() and thus any UnionInputGate containing a UnionInputGate would have failed already. There should be no use case for wiring up inputs this way. Therefore, fail early when trying to construct this. This closes #5583. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8de5381 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8de5381 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8de5381 Branch: refs/heads/master Commit: e8de53817c60917776c7264623cf8adfd7886f93 Parents: 4203557 Author: Nico Kruber Authored: Mon Feb 26 17:52:37 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:33:48 2018 +0100 -- .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e8de5381/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 44cdd52..742592a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -61,7 +61,7 @@ import static org.apache.flink.util.Preconditions.checkState; * ++ * * - * It is possible to recursively union union input gates. + * It is NOT possible to recursively union union input gates. */ public class UnionInputGate implements InputGate, InputGateListener { @@ -103,6 +103,11 @@ public class UnionInputGate implements InputGate, InputGateListener { int currentNumberOfInputChannels = 0; for (InputGate inputGate : inputGates) { + if (inputGate instanceof UnionInputGate) { + // if we want to add support for this, we need to implement pollNextBufferOrEvent() + throw new UnsupportedOperationException("Cannot union a union of input gates."); + } + // The offset to use for buffer or event instances received from this input gate. inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); inputGatesWithRemainingData.add(inputGate);
[1/2] flink git commit: [hotfix][network] minor improvements in UnionInputGate
Repository: flink Updated Branches: refs/heads/master 6165b3db5 -> e8de53817 [hotfix][network] minor improvements in UnionInputGate Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42035571 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42035571 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42035571 Branch: refs/heads/master Commit: 420355715e43d5c870bd7f89808430fc959cedc4 Parents: 6165b3d Author: Nico Kruber Authored: Mon Feb 26 17:50:10 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:33:14 2018 +0100 -- .../io/network/partition/consumer/UnionInputGate.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/42035571/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 393e087..44cdd52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Input gate wrapper to union the input from multiple input gates. * - * Each input gate has input channels attached from which it reads data. At each input gate, the + * Each input gate has input channels attached from which it reads data. At each input gate, the * input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive). * * @@ -49,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkState; * +--+--+ * * - * The union input gate maps these IDs from 0 to the *total* number of input channels across all + * The union input gate maps these IDs from 0 to the *total* number of input channels across all * unioned input gates, e.g. the channels of input gate 0 keep their original indexes and the * channel indexes of input gate 1 are set off by 2 to 2--4. * @@ -183,11 +183,11 @@ public class UnionInputGate implements InputGate, InputGateListener { bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); - return Optional.ofNullable(bufferOrEvent); + return Optional.of(bufferOrEvent); } @Override - public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + public Optional pollNextBufferOrEvent() throws UnsupportedOperationException { throw new UnsupportedOperationException(); } @@ -217,7 +217,7 @@ public class UnionInputGate implements InputGate, InputGateListener { private final BufferOrEvent bufferOrEvent; private final boolean moreInputGatesAvailable; - public InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { + InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { this.inputGate = checkNotNull(inputGate); this.bufferOrEvent = checkNotNull(bufferOrEvent); this.moreInputGatesAvailable = moreInputGatesAvailable;
[1/2] flink git commit: [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly
Repository: flink Updated Branches: refs/heads/release-1.5 d5338c415 -> f14532760 [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly (cherry picked from commit 6e9e0dd) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32384ed9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32384ed9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32384ed9 Branch: refs/heads/release-1.5 Commit: 32384ed9b00cf0e1961d355dc4080f25a2156e58 Parents: d5338c4 Author: Zhijiang Authored: Thu Feb 22 15:41:38 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:24:39 2018 +0100 -- .../partition/consumer/RemoteInputChannel.java | 6 +++ .../consumer/RemoteInputChannelTest.java| 42 +++- 2 files changed, 37 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/32384ed9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 8174359..990166f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -337,6 +337,11 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, return numRequiredBuffers - initialCredit; } + @VisibleForTesting + public boolean isWaitingForFloatingBuffers() { + return isWaitingForFloatingBuffers; + } + /** * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, @@ -362,6 +367,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, // Important: double check the isReleased state inside synchronized block, so there is no // race condition when notifyBufferAvailable and releaseAllResources running in parallel. if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + isWaitingForFloatingBuffers = false; buffer.recycleBuffer(); return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/32384ed9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 7c8ed18..e3e6623 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -357,6 +357,7 @@ public class RemoteInputChannelTest { 16, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); // Increase the backlog inputChannel.onSenderBacklog(16); @@ -370,11 +371,12 @@ public class RemoteInputChannelTest { 18, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); - // Recycle one floating buffer - floatingBufferQueue.poll().recycleBuffer(); + // Recycle one exclusive buffer + exclusiveBuffer.recycleBuffer(); - // Assign the floating buffer to the listener and the channel is still waiting for more floating buffers +
[2/2] flink git commit: [hotfix] Fix package private and comments
[hotfix] Fix package private and comments (cherry picked from commit 6165b3d) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1453276 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1453276 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1453276 Branch: refs/heads/release-1.5 Commit: f1453276095c55264f7b4097d16e2987a44b3f33 Parents: 32384ed Author: Zhijiang Authored: Fri Feb 23 02:55:57 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:24:43 2018 +0100 -- .../partition/consumer/RemoteInputChannel.java| 2 +- .../consumer/RemoteInputChannelTest.java | 18 +- 2 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f1453276/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 990166f..0f70d44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -338,7 +338,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } @VisibleForTesting - public boolean isWaitingForFloatingBuffers() { + boolean isWaitingForFloatingBuffers() { return isWaitingForFloatingBuffers; } http://git-wip-us.apache.org/repos/asf/flink/blob/f1453276/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index e3e6623..97a5688 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -355,7 +355,7 @@ public class RemoteInputChannelTest { 13, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 16 buffers required in the channel", 16, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -369,7 +369,7 @@ public class RemoteInputChannelTest { 13, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -383,7 +383,7 @@ public class RemoteInputChannelTest { 14, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -397,7 +397,7 @@ public class RemoteInputChannelTest { 15, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assert
[2/2] flink git commit: [hotfix] Fix package private and comments
[hotfix] Fix package private and comments Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6165b3db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6165b3db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6165b3db Branch: refs/heads/master Commit: 6165b3db5587170bed1a40bb1e5f2f3613f24e3f Parents: 6e9e0dd Author: Zhijiang Authored: Fri Feb 23 09:55:57 2018 +0800 Committer: Stefan Richter Committed: Wed Feb 28 17:23:06 2018 +0100 -- .../partition/consumer/RemoteInputChannel.java| 2 +- .../consumer/RemoteInputChannelTest.java | 18 +- 2 files changed, 10 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6165b3db/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 990166f..0f70d44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -338,7 +338,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } @VisibleForTesting - public boolean isWaitingForFloatingBuffers() { + boolean isWaitingForFloatingBuffers() { return isWaitingForFloatingBuffers; } http://git-wip-us.apache.org/repos/asf/flink/blob/6165b3db/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index e3e6623..97a5688 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -355,7 +355,7 @@ public class RemoteInputChannelTest { 13, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 16 buffers required in the channel", 16, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -369,7 +369,7 @@ public class RemoteInputChannelTest { 13, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -383,7 +383,7 @@ public class RemoteInputChannelTest { 14, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", + assertEquals("There should be 0 buffers available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); assertTrue(inputChannel.isWaitingForFloatingBuffers()); @@ -397,7 +397,7 @@ public class RemoteInputChannelTest { 15, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 18 buffers required in the channel", 18, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available
[1/2] flink git commit: [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly
Repository: flink Updated Branches: refs/heads/master b9b7416f4 -> 6165b3db5 [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly This closes #5558. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e9e0dd6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e9e0dd6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e9e0dd6 Branch: refs/heads/master Commit: 6e9e0dd6eb3c4fdb1168cc4e294d9fa52641ddb0 Parents: b9b7416 Author: Zhijiang Authored: Thu Feb 22 22:41:38 2018 +0800 Committer: Stefan Richter Committed: Wed Feb 28 17:22:47 2018 +0100 -- .../partition/consumer/RemoteInputChannel.java | 6 +++ .../consumer/RemoteInputChannelTest.java| 42 +++- 2 files changed, 37 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6e9e0dd6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 8174359..990166f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -337,6 +337,11 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, return numRequiredBuffers - initialCredit; } + @VisibleForTesting + public boolean isWaitingForFloatingBuffers() { + return isWaitingForFloatingBuffers; + } + /** * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, @@ -362,6 +367,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, // Important: double check the isReleased state inside synchronized block, so there is no // race condition when notifyBufferAvailable and releaseAllResources running in parallel. if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + isWaitingForFloatingBuffers = false; buffer.recycleBuffer(); return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/6e9e0dd6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 7c8ed18..e3e6623 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -357,6 +357,7 @@ public class RemoteInputChannelTest { 16, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); // Increase the backlog inputChannel.onSenderBacklog(16); @@ -370,11 +371,12 @@ public class RemoteInputChannelTest { 18, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); - // Recycle one floating buffer - floatingBufferQueue.poll().recycleBuffer(); + // Recycle one exclusive buffer + exclusiveBuffer.recycleBuffer(); - // Assign the floating buffer to the listener and the channel is still waiting for more floating buffers + // The exclusi
[5/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition
[FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). (cherry picked from commit ebd39f3) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb6a307 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb6a307 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb6a307 Branch: refs/heads/release-1.5 Commit: 8eb6a30798c09d171e3eb8019b53e677252bd5ba Parents: 8e62f90 Author: Piotr Nowojski Authored: Fri Feb 23 11:28:20 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:17:02 2018 +0100 -- .../CreditBasedSequenceNumberingViewReader.java | 10 +--- .../netty/SequenceNumberingViewReader.java | 7 +-- .../partition/PipelinedSubpartition.java| 37 -- .../partition/PipelinedSubpartitionView.java| 5 ++ .../partition/ResultSubpartitionView.java | 2 + .../partition/SpillableSubpartition.java| 1 - .../partition/SpillableSubpartitionView.java| 28 --- .../partition/SpilledSubpartitionView.java | 8 +++ .../network/buffer/BufferBuilderTestUtils.java | 4 ++ .../netty/CancelPartitionRequestTest.java | 5 ++ .../netty/PartitionRequestQueueTest.java| 26 -- .../partition/PipelinedSubpartitionTest.java| 53 .../partition/SpillableSubpartitionTest.java| 9 ++-- .../network/partition/SubpartitionTestBase.java | 5 ++ .../StreamNetworkThroughputBenchmarkTests.java | 8 +++ 15 files changed, 173 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index d02b2bf..9acbbac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the new network credit-based mode. @@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen private final InputChannelID receiverId; - private final AtomicBoolean buffersAvailable = new AtomicBoolean(); - private final PartitionRequestQueue requestQueue; private volatile ResultSubpartitionView subpartitionView; @@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable()! - return buffersAvailable.get() && + return hasBuffersAvailable() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @VisibleForTesting boolean hasBuffersAvailable() { - return buffersAvailable.get(); + return subpartitionView.isAvailable(); } @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { @@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public void notifyDataAvailable() { - buffersAvailable.set(true); requestQueue.notifyReaderNonEmpty(this); } @@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen return "CreditBasedSequenceN
[7/7] flink git commit: [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent
[FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent Because of race condition between: 1. releasing inputChannelsWithData lock in this method and reaching this place 2. empty data notification that re-enqueues a channel we can end up with moreAvailable flag set to true, while we expect no more data. This commit detects such situation, makes a correct assertion and turn off moreAvailable flag. (cherry picked from commit b9b7416) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5338c41 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5338c41 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5338c41 Branch: refs/heads/release-1.5 Commit: d5338c4154e5de029b3b30e3ef0a0732bf7f68e7 Parents: 61a34a6 Author: Piotr Nowojski Authored: Tue Feb 27 10:39:00 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:17:16 2018 +0100 -- .../runtime/io/network/partition/consumer/SingleInputGate.java | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d5338c41/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index be4035c..b9091b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -554,6 +554,12 @@ public class SingleInputGate implements InputGate { channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) { + // Because of race condition between: + // 1. releasing inputChannelsWithData lock in this method and reaching this place + // 2. empty data notification that re-enqueues a channel + // we can end up with moreAvailable flag set to true, while we expect no more data. + checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent()); + moreAvailable = false; hasReceivedAllEndOfPartitionEvents = true; }
[1/7] flink git commit: [hotfix][tests] Deduplicate code in SingleInputGateTest
Repository: flink Updated Branches: refs/heads/release-1.5 623e94459 -> d5338c415 [hotfix][tests] Deduplicate code in SingleInputGateTest (cherry picked from commit 67a547a) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb459cc6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb459cc6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb459cc6 Branch: refs/heads/release-1.5 Commit: bb459cc68f8dc4bd042b61e365e583d4e96b3e0e Parents: 623e944 Author: Piotr Nowojski Authored: Fri Feb 23 11:37:37 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:16:44 2018 +0100 -- .../partition/consumer/SingleInputGateTest.java | 66 1 file changed, 25 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bb459cc6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 0dd0875..e94411d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -77,14 +77,7 @@ public class SingleInputGateTest { @Test(timeout = 120 * 1000) public void testBasicGetNextLogic() throws Exception { // Setup - final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), - new IntermediateDataSetID(), ResultPartitionType.PIPELINED, - 0, 2, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); - - assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType()); + final SingleInputGate inputGate = createInputGate(); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -135,14 +128,8 @@ public class SingleInputGateTest { any(BufferAvailabilityListener.class))).thenReturn(iterator); // Setup reader with one local and one unknown input channel - final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), - resultId, ResultPartitionType.PIPELINED, - 0, 2, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + final SingleInputGate inputGate = createInputGate(); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -190,14 +177,7 @@ public class SingleInputGateTest { */ @Test public void testUpdateChannelBeforeRequest() throws Exception { - SingleInputGate inputGate = new SingleInputGate( - "t1", - new JobID(), - new IntermediateDataSetID(), - ResultPartitionType.PIPELINED, - 0, - 1, - mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + SingleInputGate inputGate = createInputGate(1); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -230,15 +210,7 @@ public class SingleInputGateTest { final AtomicReference asyncException = new AtomicReference<>(); // Setup the input gate with a single channel that does nothing - final SingleInputGate inputGate = new SingleInputGate( - "InputGate", - new JobID(), - new IntermediateDataSetID(), - ResultPartitionType.PIPELINED, - 0, - 1, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + final SingleInputGate inputGate = createInp
[3/7] flink git commit: [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate
[FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. (cherry picked from commit 6c9e267) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/651462e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/651462e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/651462e6 Branch: refs/heads/release-1.5 Commit: 651462e6b22c51ce14bd9ea6db389ef6a1f38e55 Parents: 6b7a448 Author: Piotr Nowojski Authored: Fri Feb 23 11:20:21 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:16:55 2018 +0100 -- .../partition/consumer/BufferOrEvent.java | 6 +- .../partition/consumer/SingleInputGate.java | 1 + .../partition/consumer/UnionInputGate.java | 9 ++- .../partition/consumer/SingleInputGateTest.java | 62 .../partition/consumer/TestInputChannel.java| 14 - .../partition/consumer/UnionInputGateTest.java | 33 ++- 6 files changed, 92 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index 3e93ae6..d1da438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -39,7 +39,7 @@ public class BufferOrEvent { * This is not needed outside of the input gate unioning logic and cannot * be set outside of the consumer package. */ - private final boolean moreAvailable; + private boolean moreAvailable; private int channelIndex; @@ -99,4 +99,8 @@ public class BufferOrEvent { return String.format("BufferOrEvent [%s, channelIndex = %d]", isBuffer() ? buffer : event, channelIndex); } + + public void setMoreAvailable(boolean moreAvailable) { + this.moreAvailable = moreAvailable; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index a1f3cdc..be4035c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -540,6 +540,7 @@ public class SingleInputGate implements InputGate { // will come for that channel if (result.get().moreAvailable()) { queueChannel(currentChannel); + moreAvailable = true; } final Buffer buffer = result.get().buffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/651462e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 481599c..393e087 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -181,6 +181,7 @@ public class UnionInputGate implements InputGate, InputGateListener { final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); + bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); return Optional.ofNullable(bufferOrEvent); } @@ -193,18 +194,20 @@ public class UnionInputGate i
[6/7] flink git commit: [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method
[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method For example, previously if the method was used to check for EndOfPartitionEvent and the Buffer contained huge custom event, the even had to be deserialized before performing the actual check. Now we are quickly entering the correct if/else branch and doing full costly deserialization only if we have to. Other calls to isEvent() then checking against EndOfPartitionEvent were not used. (cherry picked from commit 767027f) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a34a69 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a34a69 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a34a69 Branch: refs/heads/release-1.5 Commit: 61a34a691e7d5233f18ac72a1ab8fb09b53c4753 Parents: 8eb6a30 Author: Piotr Nowojski Authored: Mon Feb 26 16:13:06 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:17:04 2018 +0100 -- .../api/serialization/EventSerializer.java | 57 ++-- .../io/network/netty/PartitionRequestQueue.java | 3 +- .../api/serialization/EventSerializerTest.java | 39 +- 3 files changed, 42 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/61a34a69/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index d7fb7e8..8d76bb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -101,16 +101,15 @@ public class EventSerializer { } /** -* Identifies whether the given buffer encodes the given event. +* Identifies whether the given buffer encodes the given event. Custom events are not supported. * * Pre-condition: This buffer must encode some event! * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { if (buffer.remaining() < 4) { throw new IOException("Incomplete event"); } @@ -122,38 +121,16 @@ public class EventSerializer { try { int type = buffer.getInt(); - switch (type) { - case END_OF_PARTITION_EVENT: - return eventClass.equals(EndOfPartitionEvent.class); - case CHECKPOINT_BARRIER_EVENT: - return eventClass.equals(CheckpointBarrier.class); - case END_OF_SUPERSTEP_EVENT: - return eventClass.equals(EndOfSuperstepEvent.class); - case CANCEL_CHECKPOINT_MARKER_EVENT: - return eventClass.equals(CancelCheckpointMarker.class); - case OTHER_EVENT: - try { - final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - - final Class clazz; - try { - clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); - } - catch (ClassNotFoundException e) { - throw new IOException("Could not load event class '" + className + "'.", e); - } - catch (ClassCastException e) { - throw new IOException("The class '" + className + "' is not a valid subclass of '" -
[4/7] flink git commit: [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase
[hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase (cherry picked from commit 2c2e189) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e62f907 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e62f907 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e62f907 Branch: refs/heads/release-1.5 Commit: 8e62f90739e2319491df983917dc7ab484de2550 Parents: 651462e Author: Piotr Nowojski Authored: Fri Feb 23 11:27:54 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:16:58 2018 +0100 -- ...SuccessAfterNetworkBuffersFailureITCase.java | 40 ++-- 1 file changed, 11 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8e62f907/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index dc19ad1..dbd0f79 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -65,38 +65,20 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { } @Test - public void testSuccessfulProgramAfterFailure() { + public void testSuccessfulProgramAfterFailure() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + runConnectedComponents(env); + try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - try { - runConnectedComponents(env); - } - catch (Exception e) { - e.printStackTrace(); - fail("Program Execution should have succeeded."); - } - - try { - runKMeans(env); - fail("This program execution should have failed."); - } - catch (JobExecutionException e) { - assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers")); - } - - try { - runConnectedComponents(env); - } - catch (Exception e) { - e.printStackTrace(); - fail("Program Execution should have succeeded."); - } + runKMeans(env); + fail("This program execution should have failed."); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (JobExecutionException e) { + assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers")); } + + runConnectedComponents(env); } private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
[2/7] flink git commit: [hotfix][runtime] Remove duplicated check
[hotfix][runtime] Remove duplicated check (cherry picked from commit 42f71f6) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b7a4480 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b7a4480 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b7a4480 Branch: refs/heads/release-1.5 Commit: 6b7a4480ef8610df3ff21eb2811b9a0a3c58c912 Parents: bb459cc Author: Piotr Nowojski Authored: Fri Feb 23 12:11:14 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:16:51 2018 +0100 -- .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 --- 1 file changed, 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6b7a4480/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 5a547ea..481599c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -177,13 +177,6 @@ public class UnionInputGate implements InputGate, InputGateListener { } } - if (bufferOrEvent.moreAvailable()) { - // this buffer or event was now removed from the non-empty gates queue - // we re-add it in case it has more data, because in that case no "non-empty" notification - // will come for that gate - queueInputGate(inputGate); - } - // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
[7/7] flink git commit: [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent
[FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent Because of race condition between: 1. releasing inputChannelsWithData lock in this method and reaching this place 2. empty data notification that re-enqueues a channel we can end up with moreAvailable flag set to true, while we expect no more data. This commit detects such situation, makes a correct assertion and turn off moreAvailable flag. This closes #5588. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9b7416f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9b7416f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9b7416f Branch: refs/heads/master Commit: b9b7416f4d6a708d22029a5e971af5b1f67e3296 Parents: 767027f Author: Piotr Nowojski Authored: Tue Feb 27 10:39:00 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:13:24 2018 +0100 -- .../runtime/io/network/partition/consumer/SingleInputGate.java | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b9b7416f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index be4035c..b9091b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -554,6 +554,12 @@ public class SingleInputGate implements InputGate { channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) { + // Because of race condition between: + // 1. releasing inputChannelsWithData lock in this method and reaching this place + // 2. empty data notification that re-enqueues a channel + // we can end up with moreAvailable flag set to true, while we expect no more data. + checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent()); + moreAvailable = false; hasReceivedAllEndOfPartitionEvents = true; }
[3/7] flink git commit: [hotfix][runtime] Remove duplicated check
[hotfix][runtime] Remove duplicated check Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42f71f61 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42f71f61 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42f71f61 Branch: refs/heads/master Commit: 42f71f61c1ae683cd39887cb3d921c0d0cb67619 Parents: 67a547a Author: Piotr Nowojski Authored: Fri Feb 23 12:11:14 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- .../runtime/io/network/partition/consumer/UnionInputGate.java | 7 --- 1 file changed, 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/42f71f61/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 5a547ea..481599c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -177,13 +177,6 @@ public class UnionInputGate implements InputGate, InputGateListener { } } - if (bufferOrEvent.moreAvailable()) { - // this buffer or event was now removed from the non-empty gates queue - // we re-add it in case it has more data, because in that case no "non-empty" notification - // will come for that gate - queueInputGate(inputGate); - } - // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
[5/7] flink git commit: [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate
[FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable might incorrectly return false. This might caused some dead locks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c9e2671 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c9e2671 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c9e2671 Branch: refs/heads/master Commit: 6c9e26713502a5256520ca3c1619e1da952666d9 Parents: 42f71f6 Author: Piotr Nowojski Authored: Fri Feb 23 11:20:21 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- .../partition/consumer/BufferOrEvent.java | 6 +- .../partition/consumer/SingleInputGate.java | 1 + .../partition/consumer/UnionInputGate.java | 9 ++- .../partition/consumer/SingleInputGateTest.java | 62 .../partition/consumer/TestInputChannel.java| 14 - .../partition/consumer/UnionInputGateTest.java | 33 ++- 6 files changed, 92 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java index 3e93ae6..d1da438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java @@ -39,7 +39,7 @@ public class BufferOrEvent { * This is not needed outside of the input gate unioning logic and cannot * be set outside of the consumer package. */ - private final boolean moreAvailable; + private boolean moreAvailable; private int channelIndex; @@ -99,4 +99,8 @@ public class BufferOrEvent { return String.format("BufferOrEvent [%s, channelIndex = %d]", isBuffer() ? buffer : event, channelIndex); } + + public void setMoreAvailable(boolean moreAvailable) { + this.moreAvailable = moreAvailable; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index a1f3cdc..be4035c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -540,6 +540,7 @@ public class SingleInputGate implements InputGate { // will come for that channel if (result.get().moreAvailable()) { queueChannel(currentChannel); + moreAvailable = true; } final Buffer buffer = result.get().buffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/6c9e2671/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 481599c..393e087 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -181,6 +181,7 @@ public class UnionInputGate implements InputGate, InputGateListener { final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); + bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); return Optional.ofNullable(bufferOrEvent); } @@ -193,18 +194,20 @@ public class UnionInputGate implements InputGate, InputGateListener {
[4/7] flink git commit: [hotfix][tests] Deduplicate code in SingleInputGateTest
[hotfix][tests] Deduplicate code in SingleInputGateTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67a547ad Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67a547ad Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67a547ad Branch: refs/heads/master Commit: 67a547ad438c33d3fbcbe23cd03f009fdc8dd021 Parents: af8efe9 Author: Piotr Nowojski Authored: Fri Feb 23 11:37:37 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- .../partition/consumer/SingleInputGateTest.java | 66 1 file changed, 25 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/67a547ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 0dd0875..e94411d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -77,14 +77,7 @@ public class SingleInputGateTest { @Test(timeout = 120 * 1000) public void testBasicGetNextLogic() throws Exception { // Setup - final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), - new IntermediateDataSetID(), ResultPartitionType.PIPELINED, - 0, 2, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); - - assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType()); + final SingleInputGate inputGate = createInputGate(); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -135,14 +128,8 @@ public class SingleInputGateTest { any(BufferAvailabilityListener.class))).thenReturn(iterator); // Setup reader with one local and one unknown input channel - final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), - resultId, ResultPartitionType.PIPELINED, - 0, 2, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + final SingleInputGate inputGate = createInputGate(); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -190,14 +177,7 @@ public class SingleInputGateTest { */ @Test public void testUpdateChannelBeforeRequest() throws Exception { - SingleInputGate inputGate = new SingleInputGate( - "t1", - new JobID(), - new IntermediateDataSetID(), - ResultPartitionType.PIPELINED, - 0, - 1, - mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + SingleInputGate inputGate = createInputGate(1); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -230,15 +210,7 @@ public class SingleInputGateTest { final AtomicReference asyncException = new AtomicReference<>(); // Setup the input gate with a single channel that does nothing - final SingleInputGate inputGate = new SingleInputGate( - "InputGate", - new JobID(), - new IntermediateDataSetID(), - ResultPartitionType.PIPELINED, - 0, - 1, - mock(TaskActions.class), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + final SingleInputGate inputGate = createInputGate(1); InputChannel unknown = new UnknownInputChannel( inputGate, @@ -410,15 +382,
[2/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition
[FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebd39f32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebd39f32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebd39f32 Branch: refs/heads/master Commit: ebd39f3244a7c56f0d9a3cc4dba4d3f50efb36ad Parents: 2c2e189 Author: Piotr Nowojski Authored: Fri Feb 23 11:28:20 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- .../CreditBasedSequenceNumberingViewReader.java | 10 +--- .../netty/SequenceNumberingViewReader.java | 7 +-- .../partition/PipelinedSubpartition.java| 37 -- .../partition/PipelinedSubpartitionView.java| 5 ++ .../partition/ResultSubpartitionView.java | 2 + .../partition/SpillableSubpartition.java| 1 - .../partition/SpillableSubpartitionView.java| 28 --- .../partition/SpilledSubpartitionView.java | 8 +++ .../network/buffer/BufferBuilderTestUtils.java | 4 ++ .../netty/CancelPartitionRequestTest.java | 5 ++ .../netty/PartitionRequestQueueTest.java| 26 -- .../partition/PipelinedSubpartitionTest.java| 53 .../partition/SpillableSubpartitionTest.java| 9 ++-- .../network/partition/SubpartitionTestBase.java | 5 ++ .../StreamNetworkThroughputBenchmarkTests.java | 8 +++ 15 files changed, 173 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ebd39f32/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index d02b2bf..9acbbac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the new network credit-based mode. @@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen private final InputChannelID receiverId; - private final AtomicBoolean buffersAvailable = new AtomicBoolean(); - private final PartitionRequestQueue requestQueue; private volatile ResultSubpartitionView subpartitionView; @@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable()! - return buffersAvailable.get() && + return hasBuffersAvailable() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @VisibleForTesting boolean hasBuffersAvailable() { - return buffersAvailable.get(); + return subpartitionView.isAvailable(); } @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { @@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public void notifyDataAvailable() { - buffersAvailable.set(true); requestQueue.notifyReaderNonEmpty(this); } @@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen return "CreditBasedSequenceNumberingViewReader{" +
[1/7] flink git commit: [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method
Repository: flink Updated Branches: refs/heads/master af8efe92c -> b9b7416f4 [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method For example, previously if the method was used to check for EndOfPartitionEvent and the Buffer contained huge custom event, the even had to be deserialized before performing the actual check. Now we are quickly entering the correct if/else branch and doing full costly deserialization only if we have to. Other calls to isEvent() then checking against EndOfPartitionEvent were not used. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/767027ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/767027ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/767027ff Branch: refs/heads/master Commit: 767027ff6f388f808842760b7cf8bc807f8e6913 Parents: ebd39f3 Author: Piotr Nowojski Authored: Mon Feb 26 16:13:06 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- .../api/serialization/EventSerializer.java | 57 ++-- .../io/network/netty/PartitionRequestQueue.java | 3 +- .../api/serialization/EventSerializerTest.java | 39 +- 3 files changed, 42 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/767027ff/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index d7fb7e8..8d76bb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -101,16 +101,15 @@ public class EventSerializer { } /** -* Identifies whether the given buffer encodes the given event. +* Identifies whether the given buffer encodes the given event. Custom events are not supported. * * Pre-condition: This buffer must encode some event! * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { if (buffer.remaining() < 4) { throw new IOException("Incomplete event"); } @@ -122,38 +121,16 @@ public class EventSerializer { try { int type = buffer.getInt(); - switch (type) { - case END_OF_PARTITION_EVENT: - return eventClass.equals(EndOfPartitionEvent.class); - case CHECKPOINT_BARRIER_EVENT: - return eventClass.equals(CheckpointBarrier.class); - case END_OF_SUPERSTEP_EVENT: - return eventClass.equals(EndOfSuperstepEvent.class); - case CANCEL_CHECKPOINT_MARKER_EVENT: - return eventClass.equals(CancelCheckpointMarker.class); - case OTHER_EVENT: - try { - final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - - final Class clazz; - try { - clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); - } - catch (ClassNotFoundException e) { - throw new IOException("Could not load event class '" + className + "'.", e); - } - catch (ClassCastException e) { - throw new IOException("The class '" + className + "' is not
[6/7] flink git commit: [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase
[hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c2e1896 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c2e1896 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c2e1896 Branch: refs/heads/master Commit: 2c2e1896bba7805e6afa96cfd9040729c35c2742 Parents: 6c9e267 Author: Piotr Nowojski Authored: Fri Feb 23 11:27:54 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:12:52 2018 +0100 -- ...SuccessAfterNetworkBuffersFailureITCase.java | 40 ++-- 1 file changed, 11 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2c2e1896/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index dc19ad1..dbd0f79 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -65,38 +65,20 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { } @Test - public void testSuccessfulProgramAfterFailure() { + public void testSuccessfulProgramAfterFailure() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + runConnectedComponents(env); + try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - try { - runConnectedComponents(env); - } - catch (Exception e) { - e.printStackTrace(); - fail("Program Execution should have succeeded."); - } - - try { - runKMeans(env); - fail("This program execution should have failed."); - } - catch (JobExecutionException e) { - assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers")); - } - - try { - runConnectedComponents(env); - } - catch (Exception e) { - e.printStackTrace(); - fail("Program Execution should have succeeded."); - } + runKMeans(env); + fail("This program execution should have failed."); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + catch (JobExecutionException e) { + assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers")); } + + runConnectedComponents(env); } private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
flink git commit: [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB
Repository: flink Updated Branches: refs/heads/release-1.5 cf854ccbc -> 623e94459 [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB (cherry picked from commit 66474da) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/623e9445 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/623e9445 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/623e9445 Branch: refs/heads/release-1.5 Commit: 623e94459795a191703b880fcfa4f162c92ae458 Parents: cf854cc Author: Stefan Richter Authored: Wed Feb 28 14:25:55 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 15:48:50 2018 +0100 -- .../contrib/streaming/state/RocksDBStateBackend.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/623e9445/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index f60cb2c..9389295 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -396,10 +396,14 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; ensureRocksDBIsLoaded(tempDir); - lazyInitializeForJob(env, operatorIdentifier); + // replace all characters that are not legal for filenames with underscore + String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_"); - File instanceBasePath = - new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); + lazyInitializeForJob(env, fileCompatibleIdentifier); + + File instanceBasePath = new File( + getNextStoragePath(), + "job_" + jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID()); LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
flink git commit: [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB
Repository: flink Updated Branches: refs/heads/master f57793082 -> af8efe92c [FLINK-8557][checkpointing] Remove illegal characters from operator description text before using it to construct the instance directory in RocksDB This closes #5598. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af8efe92 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af8efe92 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af8efe92 Branch: refs/heads/master Commit: af8efe92c340ad21284d775ce74b15b774b67bf7 Parents: f577930 Author: Stefan Richter Authored: Wed Feb 28 14:25:55 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 15:48:01 2018 +0100 -- .../contrib/streaming/state/RocksDBStateBackend.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/af8efe92/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index f60cb2c..9389295 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -396,10 +396,14 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; ensureRocksDBIsLoaded(tempDir); - lazyInitializeForJob(env, operatorIdentifier); + // replace all characters that are not legal for filenames with underscore + String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_"); - File instanceBasePath = - new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); + lazyInitializeForJob(env, fileCompatibleIdentifier); + + File instanceBasePath = new File( + getNextStoragePath(), + "job_" + jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID()); LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
flink git commit: [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource
Repository: flink Updated Branches: refs/heads/release-1.5 59b607b0c -> cf854ccbc [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf854ccb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf854ccb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf854ccb Branch: refs/heads/release-1.5 Commit: cf854ccbc6fdbf112095c471705c8799aee64a45 Parents: 59b607b Author: Aljoscha Krettek Authored: Wed Feb 28 15:35:13 2018 +0100 Committer: Aljoscha Krettek Committed: Wed Feb 28 15:38:42 2018 +0100 -- .../java/org/apache/flink/test/util/MiniClusterResource.java| 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/cf854ccb/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 9bb1ae9..1c5da62 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -21,6 +21,7 @@ package org.apache.flink.test.util; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.JobExecutorService; @@ -144,6 +145,10 @@ public class MiniClusterResource extends ExternalResource { private JobExecutorService startFlip6MiniCluster() throws Exception { final Configuration configuration = miniClusterResourceConfiguration.getConfiguration(); + // we need to set this since a lot of test expect this because TestBaseUtils.startCluster() + // enabled this by default + configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); + // set rest port to 0 to avoid clashes with concurrent MiniClusters configuration.setInteger(RestOptions.REST_PORT, 0);
flink git commit: [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource
Repository: flink Updated Branches: refs/heads/master 296f9ff74 -> f57793082 [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5779308 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5779308 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5779308 Branch: refs/heads/master Commit: f57793082f92b8052e4fdccc0aaf16f0c55777a8 Parents: 296f9ff Author: Aljoscha Krettek Authored: Wed Feb 28 15:35:13 2018 +0100 Committer: Aljoscha Krettek Committed: Wed Feb 28 15:35:13 2018 +0100 -- .../java/org/apache/flink/test/util/MiniClusterResource.java| 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f5779308/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 9bb1ae9..1c5da62 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -21,6 +21,7 @@ package org.apache.flink.test.util; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.JobExecutorService; @@ -144,6 +145,10 @@ public class MiniClusterResource extends ExternalResource { private JobExecutorService startFlip6MiniCluster() throws Exception { final Configuration configuration = miniClusterResourceConfiguration.getConfiguration(); + // we need to set this since a lot of test expect this because TestBaseUtils.startCluster() + // enabled this by default + configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); + // set rest port to 0 to avoid clashes with concurrent MiniClusters configuration.setInteger(RestOptions.REST_PORT, 0);
[flink-shaded] branch master updated: (#39) Properly hide jackson dependencies
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-shaded.git The following commit(s) were added to refs/heads/master by this push: new d53464b (#39) Properly hide jackson dependencies d53464b is described below commit d53464bb80d08ff6f8538faecb9b479f9caf34d2 Author: zentol AuthorDate: Wed Feb 28 14:19:28 2018 +0100 (#39) Properly hide jackson dependencies --- .../flink-shaded-jackson-2/pom.xml | 12 .../pom.xml| 12 flink-shaded-jackson-parent/pom.xml| 36 -- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml b/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml index 167020a..4a7c753 100644 --- a/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml +++ b/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml @@ -34,6 +34,18 @@ under the License. +com.fasterxml.jackson.core +jackson-core + + +com.fasterxml.jackson.core +jackson-annotations + + +com.fasterxml.jackson.core +jackson-databind + + com.fasterxml.jackson.dataformat jackson-dataformat-yaml ${jackson.version} diff --git a/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml b/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml index 9079575..e723489 100644 --- a/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml +++ b/flink-shaded-jackson-parent/flink-shaded-jackson-module-jsonSchema-2/pom.xml @@ -34,6 +34,18 @@ under the License. +com.fasterxml.jackson.core +jackson-core + + +com.fasterxml.jackson.core +jackson-annotations + + +com.fasterxml.jackson.core +jackson-databind + + com.fasterxml.jackson.module jackson-module-jsonSchema ${jackson.version} diff --git a/flink-shaded-jackson-parent/pom.xml b/flink-shaded-jackson-parent/pom.xml index 0e3073e..bdffc4c 100644 --- a/flink-shaded-jackson-parent/pom.xml +++ b/flink-shaded-jackson-parent/pom.xml @@ -43,23 +43,25 @@ under the License. flink-shaded-jackson-module-jsonSchema-2 - - -com.fasterxml.jackson.core -jackson-core -${jackson.version} - - -com.fasterxml.jackson.core -jackson-annotations -${jackson.version} - - -com.fasterxml.jackson.core -jackson-databind -${jackson.version} - - + + + +com.fasterxml.jackson.core +jackson-core +${jackson.version} + + +com.fasterxml.jackson.core +jackson-annotations +${jackson.version} + + +com.fasterxml.jackson.core +jackson-databind +${jackson.version} + + + -- To stop receiving notification emails like this one, please contact ches...@apache.org.
flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/release-1.4 51231c471 -> 93f823fff [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93f823ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93f823ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93f823ff Branch: refs/heads/release-1.4 Commit: 93f823fff7b1517cc3f2484fa0091ebdc9e546d0 Parents: 51231c4 Author: Timo Walther Authored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:32:34 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/93f823ff/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 7ffad55..a95bdf7 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); + scalaTypes.add("scala.Tuple6"); + scala
flink git commit: [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery
Repository: flink Updated Branches: refs/heads/release-1.5 302aaeb02 -> 59b607b0c [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery (cherry picked from commit 296f9ff) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59b607b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59b607b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59b607b0 Branch: refs/heads/release-1.5 Commit: 59b607b0c411b7d01b97db302d0f124b28ef0d0e Parents: 302aaeb Author: sihuazhou Authored: Mon Feb 26 03:54:53 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 13:35:30 2018 +0100 -- .../runtime/state/TaskLocalStateStore.java | 8 ++ .../runtime/state/TaskLocalStateStoreImpl.java | 108 +-- .../runtime/state/TaskStateManagerImpl.java | 6 +- .../state/TaskLocalStateStoreImplTest.java | 42 ++-- .../runtime/state/TestTaskLocalStateStore.java | 6 ++ 5 files changed, 130 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/59b607b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java index 7089894..686f4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java @@ -24,6 +24,8 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.function.LongPredicate; + /** * Classes that implement this interface serve as a task-manager-level local storage for local checkpointed state. * The purpose is to provide access to a state that is stored locally for a faster recovery compared to the state that @@ -62,4 +64,10 @@ public interface TaskLocalStateStore { * and removes all local states with a checkpoint id that is smaller than the newly confirmed checkpoint id. */ void confirmCheckpoint(long confirmedCheckpointId); + + /** +* Remove all checkpoints from the store that match the given predicate. +* @param matcher the predicate that selects the checkpoints for pruning. +*/ + void pruneMatchingCheckpoints(LongPredicate matcher); } http://git-wip-us.apache.org/repos/asf/flink/blob/59b607b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index bb4f011..29adc4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -35,9 +36,10 @@ import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,6 +48,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.LongPredicate; /** * Main implementation of a {@link TaskLocalStateStore}. @@ -56,7 +59,8 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class); /** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */ - private static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0); + @VisibleForTesting + static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0); /** JobID from the owning subtask. */ @Nonnull @@ -103,14 +107,36 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor discardExecutor) { - this
flink git commit: [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery
Repository: flink Updated Branches: refs/heads/master 6c837d738 -> 296f9ff74 [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery This closes #5578. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/296f9ff7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/296f9ff7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/296f9ff7 Branch: refs/heads/master Commit: 296f9ff742425d348d72ff598da31022aea70778 Parents: 6c837d7 Author: sihuazhou Authored: Mon Feb 26 10:54:53 2018 +0800 Committer: Stefan Richter Committed: Wed Feb 28 13:34:26 2018 +0100 -- .../runtime/state/TaskLocalStateStore.java | 8 ++ .../runtime/state/TaskLocalStateStoreImpl.java | 108 +-- .../runtime/state/TaskStateManagerImpl.java | 6 +- .../state/TaskLocalStateStoreImplTest.java | 42 ++-- .../runtime/state/TestTaskLocalStateStore.java | 6 ++ 5 files changed, 130 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/296f9ff7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java index 7089894..686f4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java @@ -24,6 +24,8 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.function.LongPredicate; + /** * Classes that implement this interface serve as a task-manager-level local storage for local checkpointed state. * The purpose is to provide access to a state that is stored locally for a faster recovery compared to the state that @@ -62,4 +64,10 @@ public interface TaskLocalStateStore { * and removes all local states with a checkpoint id that is smaller than the newly confirmed checkpoint id. */ void confirmCheckpoint(long confirmedCheckpointId); + + /** +* Remove all checkpoints from the store that match the given predicate. +* @param matcher the predicate that selects the checkpoints for pruning. +*/ + void pruneMatchingCheckpoints(LongPredicate matcher); } http://git-wip-us.apache.org/repos/asf/flink/blob/296f9ff7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index bb4f011..29adc4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -35,9 +36,10 @@ import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,6 +48,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.function.LongPredicate; /** * Main implementation of a {@link TaskLocalStateStore}. @@ -56,7 +59,8 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class); /** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */ - private static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0); + @VisibleForTesting + static final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(0); /** JobID from the owning subtask. */ @Nonnull @@ -103,14 +107,36 @@ public class TaskLocalStateStoreImpl implements TaskLocalStateStore { @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor discardExecutor) { - this.lock = new Object(); -
flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/release-1.5 08e615027 -> 302aaeb02 [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302aaeb0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302aaeb0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302aaeb0 Branch: refs/heads/release-1.5 Commit: 302aaeb021bacf3f37cb9a3ee236304c94adbf30 Parents: 08e6150 Author: Timo Walther Authored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:30:59 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/302aaeb0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990..978d270 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); + scalaTypes.add("scala.Tuple6"); + scala
flink git commit: [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
Repository: flink Updated Branches: refs/heads/master e8d168509 -> 6c837d738 [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant This closes #5567. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c837d73 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c837d73 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c837d73 Branch: refs/heads/master Commit: 6c837d738f90ccf140ad2288cd24f3706babd63c Parents: e8d1685 Author: Timo Walther Authored: Thu Feb 22 17:22:54 2018 +0100 Committer: Timo Walther Committed: Wed Feb 28 13:28:12 2018 +0100 -- .../runtime/TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java| 56 +-- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes .../flink-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 + ...leSerializerCompatibilityTestGenerator.scala | 94 +++ 6 files changed, 231 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e..eac5200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public final class TupleSerializerConfigSnapshot extends CompositeTypeSeriali super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/6c837d73/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990..978d270 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ public final class InstantiationUtil { * * This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -121,11 +121,54 @@ public final class InstantiationUtil { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } /** +* The serialVersionUID might change between Scala versions and since those classes are +* part of the tuple serializer config snapshots we need to ignore them. +* +* @see https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451 +*/ + private static final Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); + scalaTypes.add("scala.Tuple6"); + scalaTypes.add(
[3/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString
[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString This closes #5587. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51d5bc6c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51d5bc6c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51d5bc6c Branch: refs/heads/release-1.5 Commit: 51d5bc6c5151c2aed3f932f84c35da43689501ec Parents: acf1147 Author: vinoyang Authored: Tue Feb 27 14:43:52 2018 +0800 Committer: Till Rohrmann Committed: Wed Feb 28 13:28:22 2018 +0100 -- .../handlers/AllowNonRestoredStateQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/StringQueryParameter.java | 2 +- .../handlers/AllowNonRestoredStateQueryParameterTest.java | 4 ++-- .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +- .../flink/runtime/rest/messages/MessageQueryParameter.java | 6 +++--- .../rest/messages/RescalingParallelismQueryParameter.java | 2 +- .../runtime/rest/messages/TerminationModeQueryParameter.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameter.java | 2 +- .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +- .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +- .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameterTest.java | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java index 7ad014e..2ddde3a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java @@ -39,7 +39,7 @@ public class AllowNonRestoredStateQueryParameter extends MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java index 26cb16c..2ade7eb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java @@ -38,7 +38,7 @@ public class ParallelismQueryParameter extends MessageQueryParameter { } @Override - public String convertStringToValue(final Integer value) { + public String convertValueToString(final Integer value) { return value.toString(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java index 226c592..52c0967 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java @@ -35,7 +35,7 @@ public abstract class StringQueryParameter extends MessageQueryParameter } @Override - public final String convertStringToValue(final String value) { + public final String convertValueToString(final String value) { return value; } http://git-wip-us.apache.org/repos/asf/flink/blob/51d5bc6c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java -- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handle
[1/4] flink git commit: [FLINK-8730][REST] JSON serialize entire SerializedThrowable
Repository: flink Updated Branches: refs/heads/release-1.5 23358ff87 -> 08e615027 [FLINK-8730][REST] JSON serialize entire SerializedThrowable Do not only serialize the serialized exception but the entire SerializedThrowable object. This makes it possible to throw the SerializedThrowable itself without deserializing it. This closes #5546. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acf11479 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acf11479 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acf11479 Branch: refs/heads/release-1.5 Commit: acf114793c708f0ab207008c25195f6f65796e5f Parents: 2f6cb37 Author: gyao Authored: Wed Feb 21 16:02:01 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:28:21 2018 +0100 -- .../apache/flink/util/SerializedThrowable.java | 14 .../json/SerializedThrowableDeserializer.java | 15 +++-- .../json/SerializedThrowableSerializer.java | 7 +- .../json/SerializedThrowableSerializerTest.java | 71 4 files changed, 83 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java index de6358c..13f8d77 100644 --- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java +++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java @@ -25,8 +25,6 @@ import java.lang.ref.WeakReference; import java.util.HashSet; import java.util.Set; -import static java.util.Objects.requireNonNull; - /** * Utility class for dealing with user-defined Throwable types that are serialized (for * example during RPC/Actor communication), but cannot be resolved with the default @@ -64,18 +62,6 @@ public class SerializedThrowable extends Exception implements Serializable { this(exception, new HashSet<>()); } - /** -* Creates a new SerializedThrowable from a serialized exception provided as a byte array. -*/ - public SerializedThrowable( - final byte[] serializedException, - final String originalErrorClassName, - final String fullStringifiedStackTrace) { - this.serializedException = requireNonNull(serializedException); - this.originalErrorClassName = requireNonNull(originalErrorClassName); - this.fullStringifiedStackTrace = requireNonNull(fullStringifiedStackTrace); - } - private SerializedThrowable(Throwable exception, Set alreadySeen) { super(getMessageOrError(exception)); http://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java index 3217cce..d0f71ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages.json; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -27,9 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std import java.io.IOException; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE; +import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_THROWABLE; /** * JSON deserializer for {@link SerializedThrowable}. @@ -48,10 +47,12 @@ public class SerializedThrowableDeserializer extends StdDeserializerhttp://git-wip-us.apache.org/repos/asf/flink/blob/acf11479/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java --
[2/4] flink git commit: [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor
[FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor This closes #5591. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f6cb37c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f6cb37c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f6cb37c Branch: refs/heads/release-1.5 Commit: 2f6cb37c775106bb684ef9c608585e7a72056460 Parents: 23358ff Author: gyao Authored: Tue Feb 27 16:58:53 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:28:21 2018 +0100 -- .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2f6cb37c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index e6c36f6..6b93016 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -482,7 +482,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); ApplicationReport report = startAppMaster( - new Configuration(flinkConfiguration), + flinkConfiguration, yarnClusterEntrypoint, jobGraph, yarnClient,
[4/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue
[FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08e61502 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08e61502 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08e61502 Branch: refs/heads/release-1.5 Commit: 08e615027acd426537dc580139a61bd4082b7c3f Parents: 51d5bc6 Author: Till Rohrmann Authored: Wed Feb 28 10:11:44 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:28:22 2018 +0100 -- .../handlers/AllowNonRestoredStateQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/StringQueryParameter.java | 2 +- .../handlers/AllowNonRestoredStateQueryParameterTest.java | 6 +++--- .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +- .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++-- .../rest/messages/RescalingParallelismQueryParameter.java | 2 +- .../runtime/rest/messages/TerminationModeQueryParameter.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameter.java | 2 +- .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +- .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +- .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameterTest.java | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java index 2ddde3a..19734d8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java @@ -34,7 +34,7 @@ public class AllowNonRestoredStateQueryParameter extends MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java index 2ade7eb..398bcb0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java @@ -33,7 +33,7 @@ public class ParallelismQueryParameter extends MessageQueryParameter { } @Override - public Integer convertValueFromString(final String value) { + public Integer convertStringToValue(final String value) { return Integer.valueOf(value); } http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java index 52c0967..67e83ff 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java @@ -30,7 +30,7 @@ public abstract class StringQueryParameter extends MessageQueryParameter } @Override - public final String convertValueFromString(final String value) { + public final String convertStringToValue(final String value) { return value; } http://git-wip-us.apache.org/repos/asf/flink/blob/08e61502/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java -- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/A
[3/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString
[FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString This closes #5587. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a69035d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a69035d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a69035d Branch: refs/heads/master Commit: 9a69035d8c6b62bb6229d75fed6ab43407e35e04 Parents: 970d94e Author: vinoyang Authored: Tue Feb 27 14:43:52 2018 +0800 Committer: Till Rohrmann Committed: Wed Feb 28 13:27:06 2018 +0100 -- .../handlers/AllowNonRestoredStateQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/StringQueryParameter.java | 2 +- .../handlers/AllowNonRestoredStateQueryParameterTest.java | 4 ++-- .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +- .../flink/runtime/rest/messages/MessageQueryParameter.java | 6 +++--- .../rest/messages/RescalingParallelismQueryParameter.java | 2 +- .../runtime/rest/messages/TerminationModeQueryParameter.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameter.java | 2 +- .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +- .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +- .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameterTest.java | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java index 7ad014e..2ddde3a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java @@ -39,7 +39,7 @@ public class AllowNonRestoredStateQueryParameter extends MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java index 26cb16c..2ade7eb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java @@ -38,7 +38,7 @@ public class ParallelismQueryParameter extends MessageQueryParameter { } @Override - public String convertStringToValue(final Integer value) { + public String convertValueToString(final Integer value) { return value.toString(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java index 226c592..52c0967 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java @@ -35,7 +35,7 @@ public abstract class StringQueryParameter extends MessageQueryParameter } @Override - public final String convertStringToValue(final String value) { + public final String convertValueToString(final String value) { return value; } http://git-wip-us.apache.org/repos/asf/flink/blob/9a69035d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java -- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/Al
[1/4] flink git commit: [FLINK-8730][REST] JSON serialize entire SerializedThrowable
Repository: flink Updated Branches: refs/heads/master f89fce83e -> e8d168509 [FLINK-8730][REST] JSON serialize entire SerializedThrowable Do not only serialize the serialized exception but the entire SerializedThrowable object. This makes it possible to throw the SerializedThrowable itself without deserializing it. This closes #5546. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970d94e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970d94e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970d94e8 Branch: refs/heads/master Commit: 970d94e821d2341d200baf692ee2fbbc85e395b5 Parents: 71baa27 Author: gyao Authored: Wed Feb 21 16:02:01 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:27:05 2018 +0100 -- .../apache/flink/util/SerializedThrowable.java | 14 .../json/SerializedThrowableDeserializer.java | 15 +++-- .../json/SerializedThrowableSerializer.java | 7 +- .../json/SerializedThrowableSerializerTest.java | 71 4 files changed, 83 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java index de6358c..13f8d77 100644 --- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java +++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java @@ -25,8 +25,6 @@ import java.lang.ref.WeakReference; import java.util.HashSet; import java.util.Set; -import static java.util.Objects.requireNonNull; - /** * Utility class for dealing with user-defined Throwable types that are serialized (for * example during RPC/Actor communication), but cannot be resolved with the default @@ -64,18 +62,6 @@ public class SerializedThrowable extends Exception implements Serializable { this(exception, new HashSet<>()); } - /** -* Creates a new SerializedThrowable from a serialized exception provided as a byte array. -*/ - public SerializedThrowable( - final byte[] serializedException, - final String originalErrorClassName, - final String fullStringifiedStackTrace) { - this.serializedException = requireNonNull(serializedException); - this.originalErrorClassName = requireNonNull(originalErrorClassName); - this.fullStringifiedStackTrace = requireNonNull(fullStringifiedStackTrace); - } - private SerializedThrowable(Throwable exception, Set alreadySeen) { super(getMessageOrError(exception)); http://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java index 3217cce..d0f71ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages.json; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -27,9 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std import java.io.IOException; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION; -import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE; +import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_THROWABLE; /** * JSON deserializer for {@link SerializedThrowable}. @@ -48,10 +47,12 @@ public class SerializedThrowableDeserializer extends StdDeserializerhttp://git-wip-us.apache.org/repos/asf/flink/blob/970d94e8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
[4/4] flink git commit: [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue
[FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8d16850 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8d16850 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8d16850 Branch: refs/heads/master Commit: e8d16850948231070dbc3344c2b287d07e2647a7 Parents: 9a69035 Author: Till Rohrmann Authored: Wed Feb 28 10:11:44 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:27:06 2018 +0100 -- .../handlers/AllowNonRestoredStateQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/ParallelismQueryParameter.java | 2 +- .../runtime/webmonitor/handlers/StringQueryParameter.java | 2 +- .../handlers/AllowNonRestoredStateQueryParameterTest.java | 6 +++--- .../webmonitor/handlers/ParallelismQueryParameterTest.java | 2 +- .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++-- .../rest/messages/RescalingParallelismQueryParameter.java | 2 +- .../runtime/rest/messages/TerminationModeQueryParameter.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameter.java | 2 +- .../apache/flink/runtime/rest/RestServerEndpointITCase.java| 2 +- .../runtime/rest/handler/util/HandlerRequestUtilsTest.java | 2 +- .../flink/runtime/rest/messages/MessageParametersTest.java | 2 +- .../rest/messages/job/metrics/MetricsFilterParameterTest.java | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java index 2ddde3a..19734d8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java @@ -34,7 +34,7 @@ public class AllowNonRestoredStateQueryParameter extends MessageQueryParameterhttp://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java index 2ade7eb..398bcb0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java @@ -33,7 +33,7 @@ public class ParallelismQueryParameter extends MessageQueryParameter { } @Override - public Integer convertValueFromString(final String value) { + public Integer convertStringToValue(final String value) { return Integer.valueOf(value); } http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java index 52c0967..67e83ff 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java @@ -30,7 +30,7 @@ public abstract class StringQueryParameter extends MessageQueryParameter } @Override - public final String convertValueFromString(final String value) { + public final String convertStringToValue(final String value) { return value; } http://git-wip-us.apache.org/repos/asf/flink/blob/e8d16850/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java -- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowN
[2/4] flink git commit: [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor
[FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor This closes #5591. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71baa278 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71baa278 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71baa278 Branch: refs/heads/master Commit: 71baa2784edab8b851dabe15855a023c73b4fc1c Parents: f89fce8 Author: gyao Authored: Tue Feb 27 16:58:53 2018 +0100 Committer: Till Rohrmann Committed: Wed Feb 28 13:27:05 2018 +0100 -- .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/71baa278/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index e6c36f6..6b93016 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -482,7 +482,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); ApplicationReport report = startAppMaster( - new Configuration(flinkConfiguration), + flinkConfiguration, yarnClusterEntrypoint, jobGraph, yarnClient,
[4/9] flink git commit: [FLINK-8764] [quickstarts] Make quickstarts work out of the box for IDE and JAR packaging
[FLINK-8764] [quickstarts] Make quickstarts work out of the box for IDE and JAR packaging - All Flink and Scala dependencies are properly set to provided - That way, Maven JAR packaging behaves correctly by default - Eclipse adds 'provided' dependencies to the classpath when running programs, so works out of the box - There is a profile that automatically activates in IntelliJ that adds the necessary dependencies in 'compile' scope to make it run out of the box. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa306385 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa306385 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa306385 Branch: refs/heads/release-1.4 Commit: fa306385fe190450251fd902a544d5d3fa2ca664 Parents: 6bd35a3 Author: Stephan Ewen Authored: Fri Feb 23 10:53:22 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:42:33 2018 +0100 -- .../main/resources/archetype-resources/pom.xml | 231 - .../main/resources/archetype-resources/pom.xml | 246 --- 2 files changed, 207 insertions(+), 270 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fa306385/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 84584af..6ac05b0 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -50,161 +50,53 @@ under the License. - - - - org.apache.flink - flink-core - ${flink.version} - + org.apache.flink flink-java ${flink.version} + provided - org.apache.flink - flink-clients_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${flink.version} + provided + + + + - + + org.slf4j slf4j-log4j12 ${slf4j.version} + runtime log4j log4j ${log4j.version} + runtime - - - - build-jar - - - false - - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - org.apache.flink - flink-java - ${flink.version} - provided - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - - - org.slf4j - slf4j-log4j12 - ${slf4j.version} - provided - - - log4j - log4j - ${log4j.version} -
[7/9] flink git commit: [FLINK-8767] [quickstarts] Set the maven.compiler.source and .target properties for Java Quickstart
[FLINK-8767] [quickstarts] Set the maven.compiler.source and .target properties for Java Quickstart Setting these properties helps properly pinning the Java version in IntelliJ. Without these properties, Java version keeps switching back to 1.5 in some setups. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb0a0fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb0a0fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb0a0fb Branch: refs/heads/release-1.4 Commit: 4eb0a0fb9f41b29f6b87ce77f5e985f2b9fa30a3 Parents: 23c37cf Author: Stephan Ewen Authored: Fri Feb 23 11:18:36 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:43:35 2018 +0100 -- .../src/main/resources/archetype-resources/pom.xml | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4eb0a0fb/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index d53415a..0ca6eb9 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -31,7 +31,10 @@ under the License. UTF-8 @project.version@ + 1.8 2.11 + ${java.version} + ${java.version} @@ -100,8 +103,8 @@ under the License. maven-compiler-plugin 3.1 - 1.8 - 1.8 + ${java.version} + ${java.version} @@ -158,8 +161,8 @@ under the License. maven-compiler-plugin - 1.8 - 1.8 + ${java.version} + ${java.version} jdt
[5/9] flink git commit: [FLINK-8765] [quickstarts] Simplify quickstart properties
[FLINK-8765] [quickstarts] Simplify quickstart properties This does not pull out the slf4j and log4j version into properties any more, making the quickstarts a bit simpler. Given that both versions are used only once, and only for the feature to have convenience logging in the IDE, the versions might as well be defined directly in the dependencies. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11e19a56 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11e19a56 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11e19a56 Branch: refs/heads/release-1.4 Commit: 11e19a56bfdedb4f41ecc3dcf1cd4120191cdb1a Parents: fa30638 Author: Stephan Ewen Authored: Fri Feb 23 11:10:43 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:43:01 2018 +0100 -- .../src/main/resources/archetype-resources/pom.xml | 6 ++ .../src/main/resources/archetype-resources/pom.xml | 6 ++ 2 files changed, 4 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/11e19a56/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 6ac05b0..50b0351 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -31,8 +31,6 @@ under the License. UTF-8 @project.version@ - @slf4j.version@ - @log4j.version@ @scala.binary.version@ @@ -82,13 +80,13 @@ under the License. org.slf4j slf4j-log4j12 - ${slf4j.version} + 1.7.7 runtime log4j log4j - ${log4j.version} + 1.2.17 runtime http://git-wip-us.apache.org/repos/asf/flink/blob/11e19a56/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index b318a19..4a866c3 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -46,8 +46,6 @@ under the License. UTF-8 @project.version@ - @slf4j.version@ - @log4j.version@ 2.11 2.11.11 @@ -92,13 +90,13 @@ under the License. org.slf4j slf4j-log4j12 - ${slf4j.version} + 1.7.7 runtime log4j log4j - ${log4j.version} + 1.2.17 runtime
[9/9] flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies
[FLINK-8791] [docs] Fix documentation about configuring dependencies Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51231c47 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51231c47 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51231c47 Branch: refs/heads/release-1.4 Commit: 51231c471bca5e400e1bf152d3cfbdb68c809567 Parents: 0c848aa Author: Stephan Ewen Authored: Mon Feb 26 16:41:24 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:44:49 2018 +0100 -- docs/dev/linking.md | 96 docs/dev/linking_with_flink.md | 146 --- docs/redirects/linking_with_flink.md| 25 ++ docs/redirects/linking_with_optional_modules.md | 25 ++ docs/start/dependencies.md | 244 +++ 5 files changed, 294 insertions(+), 242 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/51231c47/docs/dev/linking.md -- diff --git a/docs/dev/linking.md b/docs/dev/linking.md deleted file mode 100644 index 78ef544..000 --- a/docs/dev/linking.md +++ /dev/null @@ -1,96 +0,0 @@ -nav-title: "Linking with Optional Modules" -title: "Linking with modules not contained in the binary distribution" -nav-parent_id: start -nav-pos: 10 - - -The binary distribution contains jar packages in the `lib` folder that are automatically -provided to the classpath of your distributed programs. Almost all of Flink classes are -located there with a few exceptions, for example the streaming connectors and some freshly -added modules. To run code depending on these modules you need to make them accessible -during runtime, for which we suggest two options: - -1. Either copy the required jar files to the `lib` folder onto all of your TaskManagers. -Note that you have to restart your TaskManagers after this. -2. Or package them with your code. - -The latter version is recommended as it respects the classloader management in Flink. - -### Packaging dependencies with your usercode with Maven - -To provide these dependencies not included by Flink we suggest two options with Maven. - -1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. -The assembly configuration is straight-forward, but the resulting jar might become bulky. -See [maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html) for further information. -2. The maven unpack plugin unpacks the relevant parts of the dependencies and -then packages it with your code. - -Using the latter approach in order to bundle the Kafka connector, `flink-connector-kafka` -you would need to add the classes from both the connector and the Kafka API itself. Add -the following to your plugins section. - -~~~xml - -org.apache.maven.plugins -maven-dependency-plugin -2.9 - - -unpack - -prepare-package - -unpack - - - - - -org.apache.flink -flink-connector-kafka -{{ site.version }} -jar -false - ${project.build.directory}/classes -org/apache/flink/** - - - -org.apache.kafka -kafka_ - -jar -false - ${project.build.directory}/classes -kafka/** - - - - - - -~~~ - -Now when running `mvn clean package` the produced jar includes the required dependencies. - -{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/51231c47/docs/dev/linking_with_flink.md -- diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md deleted file mode 100644 index f2380b2..000 --- a/docs/dev/linking_with_flink.md +++ /dev/null @@ -1,146 +0,0 @@ -title: "Linking with Flink" -nav-parent_id: start -nav-pos: 2 - - -To write programs with Flink, you need to include the Flink library corresponding to -your programming language in your project. - -The simplest way to do this is to use one of the quickstart scripts: either for -[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They -create a blank project from a template (a Maven Archetyp
[8/9] flink git commit: [FLINK-8764] [docs] Adjust quickstart documentation
[FLINK-8764] [docs] Adjust quickstart documentation Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c848aa9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c848aa9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c848aa9 Branch: refs/heads/release-1.4 Commit: 0c848aa98b466764873f2372af3ffd64cd8f71ac Parents: 4eb0a0f Author: Stephan Ewen Authored: Mon Feb 26 12:19:00 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:43:58 2018 +0100 -- docs/quickstart/java_api_quickstart.md | 119 ++- docs/quickstart/scala_api_quickstart.md | 95 ++--- 2 files changed, 52 insertions(+), 162 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0c848aa9/docs/quickstart/java_api_quickstart.md -- diff --git a/docs/quickstart/java_api_quickstart.md b/docs/quickstart/java_api_quickstart.md index baf14de..9a32591 100644 --- a/docs/quickstart/java_api_quickstart.md +++ b/docs/quickstart/java_api_quickstart.md @@ -1,6 +1,6 @@ --- -title: "Sample Project using the Java API" -nav-title: Sample Project in Java +title: "Project Template for Java" +nav-title: Project Template for Java nav-parent_id: start nav-pos: 0 --- @@ -86,120 +86,51 @@ quickstart/ â  âââ myorg â  âââ quickstart â  âââ BatchJob.java -â  âââ SocketTextStreamWordCount.java -â  âââ StreamingJob.java -â  âââ WordCount.java +â  âââ StreamingJob.java âââ resources âââ log4j.properties {% endhighlight %} -The sample project is a __Maven project__, which contains four classes. _StreamingJob_ and _BatchJob_ are basic skeleton programs, _SocketTextStreamWordCount_ is a working streaming example and _WordCountJob_ is a working batch example. Please note that the _main_ method of all classes allow you to start Flink in a development/testing mode. +The sample project is a __Maven project__, which contains two classes: _StreamingJob_ and _BatchJob_ are the basic skeleton programs for a *DataStream* and *DataSet* program. +The _main_ method is the entry point of the program, both for in-IDE testing/execution and for proper deployments. We recommend you __import this project into your IDE__ to develop and -test it. If you use Eclipse, the [m2e plugin](http://www.eclipse.org/m2e/) +test it. IntelliJ IDEA supports Maven projects out of the box. +If you use Eclipse, the [m2e plugin](http://www.eclipse.org/m2e/) allows to [import Maven projects](http://books.sonatype.com/m2eclipse-book/reference/creating-sect-importing-projects.html#fig-creating-import). Some Eclipse bundles include that plugin by default, others require you -to install it manually. The IntelliJ IDE supports Maven projects out of -the box. +to install it manually. - -*A note to Mac OS X users*: The default JVM heapsize for Java is too +*A note to Mac OS X users*: The default JVM heapsize for Java mey be too small for Flink. You have to manually increase it. In Eclipse, choose `Run Configurations -> Arguments` and write into the `VM Arguments` box: `-Xmx800m`. ## Build Project -If you want to __build your project__, go to your project directory and -issue the `mvn clean install -Pbuild-jar` command. You will -__find a jar__ that runs on every Flink cluster with a compatible -version, __target/original-your-artifact-id-your-version.jar__. There -is also a fat-jar in __target/your-artifact-id-your-version.jar__ which, -additionally, contains all dependencies that were added to the Maven -project. +If you want to __build/package your project__, go to your project directory and +run the '`mvn clean package`' command. +You will __find a JAR file__ that contains your application, plus connectors and libraries +that you may have added as dependencoes to the application: `target/-.jar`. + +__Note:__ If you use a different class than *StreamingJob* as the application's main class / entry point, +we recommend you change the `mainClass` setting in the `pom.xml` file accordingly. That way, the Flink +can run time application from the JAR file without additionally specifying the main class. ## Next Steps Write your application! -The quickstart project contains a `WordCount` implementation, the -"Hello World" of Big Data processing systems. The goal of `WordCount` -is to determine the frequencies of words in a text, e.g., how often do -the terms "the" or "house" occur in all Wikipedia texts. - -__Sample Input__: - -~~~bash -big data is big -~~~ - -__Sample Output__: - -~~~bash -big 2 -data 1 -
[2/9] flink git commit: [hotfix] [quickstarts] Fix header and package declaration order.
[hotfix] [quickstarts] Fix header and package declaration order. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a60fcb0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a60fcb0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a60fcb0 Branch: refs/heads/release-1.4 Commit: 4a60fcb0f41e96221108a43fc7037e08ea047758 Parents: 179cfdd Author: Stephan Ewen Authored: Wed Feb 21 20:30:35 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:35:48 2018 +0100 -- .../resources/archetype-resources/src/main/java/BatchJob.java | 6 +++--- .../archetype-resources/src/main/java/StreamingJob.java| 6 +++--- .../archetype-resources/src/main/scala/BatchJob.scala | 6 +++--- .../archetype-resources/src/main/scala/StreamingJob.scala | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java index 9711924..9515791 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java @@ -1,6 +1,4 @@ -package ${package}; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +16,8 @@ package ${package}; * limitations under the License. */ +package ${package}; + import org.apache.flink.api.java.ExecutionEnvironment; /** http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java index 6027e75..4091889 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java @@ -1,6 +1,4 @@ -package ${package}; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +16,8 @@ package ${package}; * limitations under the License. */ +package ${package}; + import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala index a533da9..46520b7 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala @@ -1,6 +1,4 @@ -package ${package} - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +16,8 @@ package ${package} * limitations under the License. */ +package ${package} + import org.apache.flink.api.scala._ /** http://git-wip-us.apache.org/repos/asf/flink/blob/4a60fcb0/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala index 7c950b1..20115a7 100644 --- a/flink-quickstart/fli
[3/9] flink git commit: [hotfix] [quickstarts] Fix block comments in program stubs.
[hotfix] [quickstarts] Fix block comments in program stubs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd35a3c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd35a3c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd35a3c Branch: refs/heads/release-1.4 Commit: 6bd35a3c537eaa54c55312a6b3703208582b9229 Parents: 4a60fcb Author: Stephan Ewen Authored: Wed Feb 21 21:05:23 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:36:08 2018 +0100 -- .../main/resources/archetype-resources/src/main/java/BatchJob.java | 2 +- .../resources/archetype-resources/src/main/java/StreamingJob.java | 2 +- .../resources/archetype-resources/src/main/scala/BatchJob.scala| 2 +- .../archetype-resources/src/main/scala/StreamingJob.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java index 9515791..db2ee60 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java @@ -36,7 +36,7 @@ public class BatchJob { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - /** + /* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java index 4091889..5bcee21 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/StreamingJob.java @@ -38,7 +38,7 @@ public class StreamingJob { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - /** + /* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala index 46520b7..329e6c2 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/BatchJob.scala @@ -36,7 +36,7 @@ object BatchJob { // set up the batch execution environment val env = ExecutionEnvironment.getExecutionEnvironment -/** +/* * Here, you can start creating your execution plan for Flink. * * Start with getting some data from the environment, like http://git-wip-us.apache.org/repos/asf/flink/blob/6bd35a3c/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/StreamingJob.scala index 20115a7..bab0bb9 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src
[6/9] flink git commit: [FLINK-8766] [quickstarts] Pin scala runtime version for Java Quickstart
[FLINK-8766] [quickstarts] Pin scala runtime version for Java Quickstart Followup to FLINK-7414, which pinned the scala version for the Scala Quickstart Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23c37cf4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23c37cf4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23c37cf4 Branch: refs/heads/release-1.4 Commit: 23c37cf43cac94ecaa43641d264f7422bc1c57ed Parents: 11e19a5 Author: Stephan Ewen Authored: Fri Feb 23 11:13:01 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:43:19 2018 +0100 -- .../src/main/resources/archetype-resources/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/23c37cf4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 50b0351..d53415a 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -31,7 +31,7 @@ under the License. UTF-8 @project.version@ - @scala.binary.version@ + 2.11
[1/9] flink git commit: [FLINK-8762] [quickstarts] Make 'StreamingJob' the default main class and remove WordCount example from the quickstart.
Repository: flink Updated Branches: refs/heads/release-1.4 392cfaaed -> 51231c471 [FLINK-8762] [quickstarts] Make 'StreamingJob' the default main class and remove WordCount example from the quickstart. The packaged example jobs have been reported to not be terribly helpful and simply create noise in the initial project setup. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/179cfdd4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/179cfdd4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/179cfdd4 Branch: refs/heads/release-1.4 Commit: 179cfdd4c6fd5f38ab01c2a55b17f251cc173dbc Parents: 392cfaa Author: Stephan Ewen Authored: Wed Feb 21 20:20:51 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:35:33 2018 +0100 -- .../main/resources/archetype-resources/pom.xml | 6 +- .../src/main/java/BatchJob.java | 20 +--- .../main/java/SocketTextStreamWordCount.java| 108 --- .../src/main/java/StreamingJob.java | 21 ++-- .../src/main/java/WordCount.java| 94 .../main/resources/archetype-resources/pom.xml | 6 +- .../src/main/scala/BatchJob.scala | 25 ++--- .../main/scala/SocketTextStreamWordCount.scala | 69 .../src/main/scala/StreamingJob.scala | 24 ++--- .../src/main/scala/WordCount.scala | 53 - 10 files changed, 27 insertions(+), 399 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 71bc0a1..84584af 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -189,15 +189,11 @@ under the License. - - http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java index d0e68a4..9711924 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/BatchJob.java @@ -23,22 +23,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; /** * Skeleton for a Flink Batch Job. * - * For a full example of a Flink Batch Job, see the WordCountJob.java file in the - * same package/directory or have a look at the website. + * For a tutorial how to write a Flink batch application, check the + * tutorials and examples on the http://flink.apache.org/docs/stable/";>Flink Website. * - * You can also generate a .jar file that you can submit on your Flink - * cluster. - * Just type - * mvn clean package - * in the projects root directory. - * You will find the jar in - * target/${artifactId}-${version}.jar - * From the CLI you can then run - * ./bin/flink run -c ${package}.BatchJob target/${artifactId}-${version}.jar - * - * For more information on the CLI see: - * - * http://flink.apache.org/docs/latest/apis/cli.html + * To package your appliation into a JAR file for execution, + * change the main class in the POM.xml file to this class (simply search for 'mainClass') + * and run 'mvn clean package' on the command line. */ public class BatchJob { http://git-wip-us.apache.org/repos/asf/flink/blob/179cfdd4/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/SocketTextStreamWordCount.java --
svn commit: r25326 - in /release/flink/flink-shaded-3.0: ./ flink-shaded-3.0-src.tgz flink-shaded-3.0-src.tgz.asc flink-shaded-3.0-src.tgz.md5 flink-shaded-3.0-src.tgz.sha
Author: chesnay Date: Wed Feb 28 10:34:11 2018 New Revision: 25326 Log: Add flink-shaded 3.0 release files Added: release/flink/flink-shaded-3.0/ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz (with props) release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz == Binary file - no diff available. Propchange: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz -- svn:mime-type = application/octet-stream Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc == --- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc (added) +++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.asc Wed Feb 28 10:34:11 2018 @@ -0,0 +1,17 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v1 + +iQIcBAABAgAGBQJaiswJAAoJEMLu17ER1GS6quIQAJIIfjpZ1QZAq9JasaVxDvy0 +u8P7kRFPd8YZiy5ffdqc5mtZzBrKyM4m8CqFalFZsCxyIQHJL2orsMkdseUG798M +QT3zFZI5kwACq3TahVfULWSGTWbu0lhv4Hfr4QhQwqd4Sd1SgLJHcE3oYxZZlX9h +fmyDkbXmwvehgS8Wl27wXBjpybHjE6454Q1ajiFm7y6uLqcl+1SyRGEn0+4S2koL +AT5znvFatoLLEdouOLhy8iP2Q9ewYNVJTmskgRmOOh7Hx0fwleZt9Tdid31XRvHB +14o84ZU6dkA6wUhnl2xzoGezqXaNmDGOgH8zP0yHdFyQBUwImlpIv6h+5gC6JX/j +eqjjtjeLZuVT56KXGYj3uFQQf11iwxou0LzOqvn6pN5SK77LXwRHAcnlNRXNVEDh +Di/37Xoa8MZ5F1og/5ResFFgNXer7n2hNYnqCEQW28oIQOBRKQcpkKnDJjLYJk1L +rSJ9Q+259nzE8ly0NxVOfpeiC1fWtcROnmNNdRruQ7pWaDdl0yNVrswvFpXWCDDO +gyBgs5jdIPwsrKDtbDWA1zOzvWMcKXD3MSWa7OjeGuA/uja27cF3gN80/w4Gyiya +VdzhTuJTczfevkPb81r4u851WzeDO9g7l2PhUpbjds2OMnEt6LMJFEyw5OnsHl8w +qcWp3rApSoV0jVUhY4ml +=KwVl +-END PGP SIGNATURE- Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 == --- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 (added) +++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.md5 Wed Feb 28 10:34:11 2018 @@ -0,0 +1 @@ +88b20a832fc947a002e7c0740e0a9e9e flink-shaded-3.0-src.tgz Added: release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha == --- release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha (added) +++ release/flink/flink-shaded-3.0/flink-shaded-3.0-src.tgz.sha Wed Feb 28 10:34:11 2018 @@ -0,0 +1 @@ +982a14a4952814f08691105fe0e1567542511ab421af7a72c78452e53571313dbc6fc456ba31d0f23abee0118823d463db851a33cf8cd2510b3611dba6815b9d flink-shaded-3.0-src.tgz
flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies
Repository: flink Updated Branches: refs/heads/master d9f2f2f83 -> f89fce83e [FLINK-8791] [docs] Fix documentation about configuring dependencies This closes #5586 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f89fce83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f89fce83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f89fce83 Branch: refs/heads/master Commit: f89fce83eaadddcc3aa8dee2167188a2b53b0c8e Parents: d9f2f2f Author: Stephan Ewen Authored: Mon Feb 26 16:41:24 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:26:03 2018 +0100 -- docs/dev/linking.md | 96 docs/dev/linking_with_flink.md | 146 --- docs/redirects/linking_with_flink.md| 25 ++ docs/redirects/linking_with_optional_modules.md | 25 ++ docs/start/dependencies.md | 244 +++ 5 files changed, 294 insertions(+), 242 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f89fce83/docs/dev/linking.md -- diff --git a/docs/dev/linking.md b/docs/dev/linking.md deleted file mode 100644 index 78ef544..000 --- a/docs/dev/linking.md +++ /dev/null @@ -1,96 +0,0 @@ -nav-title: "Linking with Optional Modules" -title: "Linking with modules not contained in the binary distribution" -nav-parent_id: start -nav-pos: 10 - - -The binary distribution contains jar packages in the `lib` folder that are automatically -provided to the classpath of your distributed programs. Almost all of Flink classes are -located there with a few exceptions, for example the streaming connectors and some freshly -added modules. To run code depending on these modules you need to make them accessible -during runtime, for which we suggest two options: - -1. Either copy the required jar files to the `lib` folder onto all of your TaskManagers. -Note that you have to restart your TaskManagers after this. -2. Or package them with your code. - -The latter version is recommended as it respects the classloader management in Flink. - -### Packaging dependencies with your usercode with Maven - -To provide these dependencies not included by Flink we suggest two options with Maven. - -1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. -The assembly configuration is straight-forward, but the resulting jar might become bulky. -See [maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html) for further information. -2. The maven unpack plugin unpacks the relevant parts of the dependencies and -then packages it with your code. - -Using the latter approach in order to bundle the Kafka connector, `flink-connector-kafka` -you would need to add the classes from both the connector and the Kafka API itself. Add -the following to your plugins section. - -~~~xml - -org.apache.maven.plugins -maven-dependency-plugin -2.9 - - -unpack - -prepare-package - -unpack - - - - - -org.apache.flink -flink-connector-kafka -{{ site.version }} -jar -false - ${project.build.directory}/classes -org/apache/flink/** - - - -org.apache.kafka -kafka_ - -jar -false - ${project.build.directory}/classes -kafka/** - - - - - - -~~~ - -Now when running `mvn clean package` the produced jar includes the required dependencies. - -{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/f89fce83/docs/dev/linking_with_flink.md -- diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md deleted file mode 100644 index f2380b2..000 --- a/docs/dev/linking_with_flink.md +++ /dev/null @@ -1,146 +0,0 @@ -title: "Linking with Flink" -nav-parent_id: start -nav-pos: 2 - - -To write programs with Flink, you need to include the Flink library corresponding to -your programming language in your project. - -The simplest way to do this is to use one of the quickstart scripts: either for -[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quicks
flink git commit: [FLINK-8791] [docs] Fix documentation about configuring dependencies
Repository: flink Updated Branches: refs/heads/release-1.5 db2c510fb -> 23358ff87 [FLINK-8791] [docs] Fix documentation about configuring dependencies Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23358ff8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23358ff8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23358ff8 Branch: refs/heads/release-1.5 Commit: 23358ff87003fd6603c0ca19bc37f31944d2c494 Parents: db2c510 Author: Stephan Ewen Authored: Mon Feb 26 16:41:24 2018 +0100 Committer: Stephan Ewen Committed: Wed Feb 28 11:17:02 2018 +0100 -- docs/dev/linking.md | 96 docs/dev/linking_with_flink.md | 146 --- docs/redirects/linking_with_flink.md| 25 ++ docs/redirects/linking_with_optional_modules.md | 25 ++ docs/start/dependencies.md | 244 +++ 5 files changed, 294 insertions(+), 242 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/23358ff8/docs/dev/linking.md -- diff --git a/docs/dev/linking.md b/docs/dev/linking.md deleted file mode 100644 index 78ef544..000 --- a/docs/dev/linking.md +++ /dev/null @@ -1,96 +0,0 @@ -nav-title: "Linking with Optional Modules" -title: "Linking with modules not contained in the binary distribution" -nav-parent_id: start -nav-pos: 10 - - -The binary distribution contains jar packages in the `lib` folder that are automatically -provided to the classpath of your distributed programs. Almost all of Flink classes are -located there with a few exceptions, for example the streaming connectors and some freshly -added modules. To run code depending on these modules you need to make them accessible -during runtime, for which we suggest two options: - -1. Either copy the required jar files to the `lib` folder onto all of your TaskManagers. -Note that you have to restart your TaskManagers after this. -2. Or package them with your code. - -The latter version is recommended as it respects the classloader management in Flink. - -### Packaging dependencies with your usercode with Maven - -To provide these dependencies not included by Flink we suggest two options with Maven. - -1. The maven assembly plugin builds a so-called uber-jar (executable jar) containing all your dependencies. -The assembly configuration is straight-forward, but the resulting jar might become bulky. -See [maven-assembly-plugin](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html) for further information. -2. The maven unpack plugin unpacks the relevant parts of the dependencies and -then packages it with your code. - -Using the latter approach in order to bundle the Kafka connector, `flink-connector-kafka` -you would need to add the classes from both the connector and the Kafka API itself. Add -the following to your plugins section. - -~~~xml - -org.apache.maven.plugins -maven-dependency-plugin -2.9 - - -unpack - -prepare-package - -unpack - - - - - -org.apache.flink -flink-connector-kafka -{{ site.version }} -jar -false - ${project.build.directory}/classes -org/apache/flink/** - - - -org.apache.kafka -kafka_ - -jar -false - ${project.build.directory}/classes -kafka/** - - - - - - -~~~ - -Now when running `mvn clean package` the produced jar includes the required dependencies. - -{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/23358ff8/docs/dev/linking_with_flink.md -- diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md deleted file mode 100644 index f2380b2..000 --- a/docs/dev/linking_with_flink.md +++ /dev/null @@ -1,146 +0,0 @@ -title: "Linking with Flink" -nav-parent_id: start -nav-pos: 2 - - -To write programs with Flink, you need to include the Flink library corresponding to -your programming language in your project. - -The simplest way to do this is to use one of the quickstart scripts: either for -[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scal
[flink-shaded] tag 3.0 created (now fd0f8cc)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to tag 3.0 in repository https://gitbox.apache.org/repos/asf/flink-shaded.git. at fd0f8cc (commit) No new revisions were added by this update. -- To stop receiving notification emails like this one, please contact ches...@apache.org.