[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5697

[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

## What is the purpose of the change

This PR ports the `ScheduleOrUpdateConsumersTest` to flip6. The existing 
test was renamed to `LegacyScheduleOrUpdateConsumersTest`, and a ported copy 
was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8704_schedule

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5697.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5697


commit d7f42bf65177bb57f8159f7866397f4e55c5d9f0
Author: zentol 
Date:   2018-03-12T12:21:01Z

[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest




---


[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5697#discussion_r177795751
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+   private static final int NUMBER_OF_TMS = 2;
+   private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   flink.stop();
+   }
+
+   /**
+* Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
+* result.
+*
+* 
+* +--+
+*+-- pipelined -> | Receiver |
+* ++ |+--+
+* | Sender |-|
+* ++ |+--+
+*+-- blocking --> | Receiver |
+* +--+
+* 
+*
+* The pipelined receiver gets deployed after the first buffer is 
available and the blocking
+* one after all subtasks are finished.
+*/
+   @Test
+   public void testMixedPipelinedAndBlockingResults() throws Exception {
+   final JobVertex sender = new JobVertex("Sender");
+   
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+   
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
+   sender.setParallelism(PARALLELISM);
+
+   final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
+   
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+   pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
+   pipelinedReceiver.setParallelism(PARALLELISM);
+
+   pipelinedReceiver.connectNewDataSetAsInput(
+   sender,
+   DistributionPattern.ALL_TO_ALL,
+   ResultPartitionType.PIPELINED);
+
+   final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
+   
blockingReceiver.setInvokableClass(SlotCountExceedingPar

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-03-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5697#discussion_r177801606
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+   private static final int NUMBER_OF_TMS = 2;
+   private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   flink.stop();
+   }
+
+   /**
+* Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
+* result.
+*
+* 
+* +--+
+*+-- pipelined -> | Receiver |
+* ++ |+--+
+* | Sender |-|
+* ++ |+--+
+*+-- blocking --> | Receiver |
+* +--+
+* 
+*
+* The pipelined receiver gets deployed after the first buffer is 
available and the blocking
+* one after all subtasks are finished.
+*/
+   @Test
+   public void testMixedPipelinedAndBlockingResults() throws Exception {
+   final JobVertex sender = new JobVertex("Sender");
+   
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+   
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
+   sender.setParallelism(PARALLELISM);
+
+   final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
+   
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+   pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
+   pipelinedReceiver.setParallelism(PARALLELISM);
+
+   pipelinedReceiver.connectNewDataSetAsInput(
+   sender,
+   DistributionPattern.ALL_TO_ALL,
+   ResultPartitionType.PIPELINED);
+
+   final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
+   
blockingReceiver.setInvokableClass(SlotCountExceedingParalleli

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5697#discussion_r177804076
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+   private static final int NUMBER_OF_TMS = 2;
+   private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   flink.stop();
+   }
+
+   /**
+* Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
+* result.
+*
+* 
+* +--+
+*+-- pipelined -> | Receiver |
+* ++ |+--+
+* | Sender |-|
+* ++ |+--+
+*+-- blocking --> | Receiver |
+* +--+
+* 
+*
+* The pipelined receiver gets deployed after the first buffer is 
available and the blocking
+* one after all subtasks are finished.
+*/
+   @Test
+   public void testMixedPipelinedAndBlockingResults() throws Exception {
+   final JobVertex sender = new JobVertex("Sender");
+   
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+   
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
+   sender.setParallelism(PARALLELISM);
+
+   final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
+   
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+   pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
+   pipelinedReceiver.setParallelism(PARALLELISM);
+
+   pipelinedReceiver.connectNewDataSetAsInput(
+   sender,
+   DistributionPattern.ALL_TO_ALL,
+   ResultPartitionType.PIPELINED);
+
+   final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
+   
blockingReceiver.setInvokableClass(SlotCountExceedingPar

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-04-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5697#discussion_r178754924
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+   private static final int NUMBER_OF_TMS = 2;
+   private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   flink.stop();
+   }
+
+   /**
+* Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
+* result.
+*
+* 
+* +--+
+*+-- pipelined -> | Receiver |
+* ++ |+--+
+* | Sender |-|
+* ++ |+--+
+*+-- blocking --> | Receiver |
+* +--+
+* 
+*
+* The pipelined receiver gets deployed after the first buffer is 
available and the blocking
+* one after all subtasks are finished.
+*/
+   @Test
+   public void testMixedPipelinedAndBlockingResults() throws Exception {
+   final JobVertex sender = new JobVertex("Sender");
+   
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+   
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
+   sender.setParallelism(PARALLELISM);
+
+   final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
+   
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+   pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
+   pipelinedReceiver.setParallelism(PARALLELISM);
+
+   pipelinedReceiver.connectNewDataSetAsInput(
+   sender,
+   DistributionPattern.ALL_TO_ALL,
+   ResultPartitionType.PIPELINED);
+
+   final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
+   
blockingReceiver.setInvokableClass(SlotCountExceedingParalleli

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-04-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5697#discussion_r178756067
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
+
+public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
+
+   private static final int NUMBER_OF_TMS = 2;
+   private static final int NUMBER_OF_SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUp() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   flink.stop();
+   }
+
+   /**
+* Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
+* result.
+*
+* 
+* +--+
+*+-- pipelined -> | Receiver |
+* ++ |+--+
+* | Sender |-|
+* ++ |+--+
+*+-- blocking --> | Receiver |
+* +--+
+* 
+*
+* The pipelined receiver gets deployed after the first buffer is 
available and the blocking
+* one after all subtasks are finished.
+*/
+   @Test
+   public void testMixedPipelinedAndBlockingResults() throws Exception {
+   final JobVertex sender = new JobVertex("Sender");
+   
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
+   
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
+   sender.setParallelism(PARALLELISM);
+
+   final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
+   
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
+   pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
+   pipelinedReceiver.setParallelism(PARALLELISM);
+
+   pipelinedReceiver.connectNewDataSetAsInput(
+   sender,
+   DistributionPattern.ALL_TO_ALL,
+   ResultPartitionType.PIPELINED);
+
+   final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
+   
blockingReceiver.setInvokableClass(SlotCountExceedingPar

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

2018-04-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5697


---