Repository: flink Updated Branches: refs/heads/master fdfce98ab -> 0113ee2b3
[FLINK-9243][tests] fix flaky SuccessAfterNetworkBuffersFailureITCase This closes #5915. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0113ee2b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0113ee2b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0113ee2b Branch: refs/heads/master Commit: 0113ee2b33e3f86bb6a3de2ecf60d8e7bf554be2 Parents: fdfce98 Author: Nico Kruber <n...@data-artisans.com> Authored: Wed Apr 25 20:27:43 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Apr 25 23:52:24 2018 +0200 ---------------------------------------------------------------------- .../test/misc/SuccessAfterNetworkBuffersFailureITCase.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0113ee2b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index 16159c1..c5c1882 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -134,13 +134,17 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { // set number of bulk iterations for KMeans algorithm IterativeDataSet<KMeans.Centroid> loop = centroids.iterate(20); + // add some re-partitions to increase network buffer use DataSet<KMeans.Centroid> newCentroids = points + .rebalance() // compute closest centroid for each point .map(new KMeans.SelectNearestCenter()).withBroadcastSet(loop, "centroids") - // count and sum point coordinates for each centroid + .rebalance() + // count and sum point coordinates for each centroid .map(new KMeans.CountAppender()) .groupBy(0).reduce(new KMeans.CentroidAccumulator()) - // compute new centroids from point counts and coordinate sums + // compute new centroids from point counts and coordinate sums + .rebalance() .map(new KMeans.CentroidAverager()); // feed new centroids back into next iteration