This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49e1832056483d17f540a515cc5be7be1654dd48
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Wed May 22 14:45:46 2019 +0800

    [FLINK-12458][network] Introduce PartitionConnectionException for 
unreachable producer
    
    If the consumer can not establish a connection to remote task executor 
while requesting remote subpartition, which might indicate the remote task 
executor is not reachable.
    We could wrap this connection exception into new proposed 
PartitionConnectionException which also extends PartitionException, then the 
job master would decide whether to
    restart the upstream region to re-producer partition data.
    
    This closes #8509.
---
 .../consumer/PartitionConnectionException.java}    | 35 +++++++---------------
 .../partition/consumer/RemoteInputChannel.java     |  8 +++--
 .../io/network/TestingConnectionManager.java       |  4 ++-
 .../partition/consumer/RemoteInputChannelTest.java | 24 +++++++++++++++
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
similarity index 54%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
index 822314d..4713dfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
@@ -16,35 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network;
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 /**
- * A dummy implementation of the {@link ConnectionManager} which is mainly 
used for creating
- * {@link PartitionRequestClient} instance in tests.
+ * Exception for failed partition requests due to connection failure
+ * with unreachable producer.
  */
-public class TestingConnectionManager implements ConnectionManager {
-
-       @Override
-       public void start() {}
-
-       @Override
-       public PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) {
-               return new TestingPartitionRequestClient();
-       }
+public class PartitionConnectionException extends PartitionException {
 
-       @Override
-       public void closeOpenChannelConnections(ConnectionID connectionId) {}
+       private static final long serialVersionUID = 0L;
 
-       @Override
-       public int getNumberOfActiveConnections() {
-               return 0;
+       public PartitionConnectionException(ResultPartitionID partitionId, 
Throwable throwable) {
+               super("Connection for partition " + partitionId + " not 
reachable.", partitionId, throwable);
        }
-
-       @Override
-       public int getDataPort() {
-               return -1;
-       }
-
-       @Override
-       public void shutdown() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index fabc495..2d174ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -161,8 +161,12 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
        public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
                if (partitionRequestClient == null) {
                        // Create a client and request the partition
-                       partitionRequestClient = connectionManager
-                               .createPartitionRequestClient(connectionId);
+                       try {
+                               partitionRequestClient = 
connectionManager.createPartitionRequestClient(connectionId);
+                       } catch (IOException e) {
+                               // IOExceptions indicate that we could not open 
a connection to the remote TaskExecutor
+                               throw new 
PartitionConnectionException(partitionId, e);
+                       }
 
                        partitionRequestClient.requestSubpartition(partitionId, 
subpartitionIndex, this, 0);
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
index 822314d..c23b3c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network;
 
+import java.io.IOException;
+
 /**
  * A dummy implementation of the {@link ConnectionManager} which is mainly 
used for creating
  * {@link PartitionRequestClient} instance in tests.
@@ -28,7 +30,7 @@ public class TestingConnectionManager implements 
ConnectionManager {
        public void start() {}
 
        @Override
-       public PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) {
+       public PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) throws IOException {
                return new TestingPartitionRequestClient();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 24d256e..0fdebf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1016,6 +1016,23 @@ public class RemoteInputChannelTest {
                }
        }
 
+       /**
+        * Tests that any exceptions thrown by {@link 
ConnectionManager#createPartitionRequestClient(ConnectionID)}
+        * would be wrapped into {@link PartitionConnectionException} during
+        * {@link RemoteInputChannel#requestSubpartition(int)}.
+        */
+       @Test
+       public void testPartitionConnectionExceptionWhileRequestingPartition() 
throws Exception {
+               final RemoteInputChannel inputChannel = 
InputChannelTestUtils.createRemoteInputChannel(
+                       createSingleInputGate(1), 0, new 
TestingExceptionConnectionManager());
+               try {
+                       inputChannel.requestSubpartition(0);
+                       fail("Expected PartitionConnectionException.");
+               } catch (PartitionConnectionException ex) {
+                       assertThat(inputChannel.getPartitionId(), 
is(ex.getPartitionId()));
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate)
@@ -1179,4 +1196,11 @@ public class RemoteInputChannelTest {
                        ExceptionUtils.rethrowException(throwable);
                }
        }
+
+       private static final class TestingExceptionConnectionManager extends 
TestingConnectionManager {
+               @Override
+               public PartitionRequestClient 
createPartitionRequestClient(ConnectionID connectionId) throws IOException {
+                       throw new IOException("");
+               }
+       }
 }

Reply via email to