This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Thu May 16 16:53:51 2019 +0800

    [hotfix][network,tests] Add new unit test for 
LocalInputChannel#requestSubpartition
    
    It is necessary for flip1 to make sure the PartitionNotFoundException would 
be thrown by LocalInputChannel#requestSubpartition if the partition
    was not registered in ResultPartitionManager before. So a new unit test is 
added to cover this case.
---
 .../partition/consumer/SingleInputGate.java        |  5 ++
 .../partition/consumer/LocalInputChannelTest.java  | 72 ++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 6c23698..63504bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate {
                }
        }
 
+       @VisibleForTesting
+       Timer getRetriggerLocalRequestTimer() {
+               return retriggerLocalRequestTimer;
+       }
+
        @Override
        public void close() throws IOException {
                boolean released = false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 505f792..a3bc696 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,6 +61,8 @@ import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -247,6 +250,75 @@ public class LocalInputChannelTest {
        }
 
        /**
+        * Tests that {@link LocalInputChannel#requestSubpartition(int)} throws 
{@link PartitionNotFoundException}
+        * if the result partition was not registered in {@link 
ResultPartitionManager} and no backoff.
+        */
+       @Test
+       public void testPartitionNotFoundExceptionWhileRequestingPartition() 
throws Exception {
+               final SingleInputGate inputGate = createSingleInputGate(1);
+               final LocalInputChannel localChannel = 
createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+               try {
+                       localChannel.requestSubpartition(0);
+
+                       fail("Should throw a PartitionNotFoundException.");
+               } catch (PartitionNotFoundException notFound) {
+                       assertThat(localChannel.getPartitionId(), 
Matchers.is(notFound.getPartitionId()));
+               }
+       }
+
+       /**
+        * Tests that {@link 
SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is 
triggered
+        * after {@link LocalInputChannel#requestSubpartition(int)} throws 
{@link PartitionNotFoundException}
+        * within backoff.
+        */
+       @Test
+       public void testRetriggerPartitionRequestWhilePartitionNotFound() 
throws Exception {
+               final SingleInputGate inputGate = createSingleInputGate(1);
+               final LocalInputChannel localChannel = createLocalInputChannel(
+                       inputGate, new ResultPartitionManager(), 1, 1);
+
+               
inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), 
localChannel);
+               localChannel.requestSubpartition(0);
+
+               // The timer should be initialized at the first time of 
retriggering partition request.
+               assertNotNull(inputGate.getRetriggerLocalRequestTimer());
+       }
+
+       /**
+        * Tests that {@link 
LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
+        * {@link PartitionNotFoundException} which is set onto the input 
channel then.
+        */
+       @Test
+       public void testChannelErrorWhileRetriggeringRequest() {
+               final SingleInputGate inputGate = createSingleInputGate(1);
+               final LocalInputChannel localChannel = 
createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+               final Timer timer = new Timer(true) {
+                       @Override
+                       public void schedule(TimerTask task, long delay) {
+                               task.run();
+
+                               try {
+                                       localChannel.checkError();
+
+                                       fail("Should throw a 
PartitionNotFoundException.");
+                               } catch (PartitionNotFoundException notFound) {
+                                       assertThat(localChannel.partitionId, 
Matchers.is(notFound.getPartitionId()));
+                               } catch (IOException ex) {
+                                       fail("Should throw a 
PartitionNotFoundException.");
+                               }
+                       }
+               };
+
+               try {
+                       localChannel.retriggerSubpartitionRequest(timer, 0);
+               } finally {
+                       timer.cancel();
+               }
+       }
+
+       /**
         * Verifies that concurrent release via the SingleInputGate and 
re-triggering
         * of a partition request works smoothly.
         *

Reply via email to