tillrohrmann commented on a change in pull request #8463: 
[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment
URL: https://github.com/apache/flink/pull/8463#discussion_r286541806
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##########
 @@ -614,34 +610,22 @@ private void stopTaskExecutorServices() throws Exception 
{
 
                if (task != null) {
                        for (final PartitionInfo partitionInfo: partitionInfos) 
{
-                               IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
-
-                               final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
-
-                               if (singleInputGate != null) {
-                                       // Run asynchronously because it might 
be blocking
-                                       getRpcService().execute(
-                                               () -> {
-                                                       try {
-                                                               
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
-                                                       } catch (IOException | 
InterruptedException e) {
-                                                               
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
-
-                                                               try {
-                                                                       
task.failExternally(e);
-                                                               } catch 
(RuntimeException re) {
-                                                                       // 
TODO: Check whether we need this or make exception in failExtenally checked
-                                                                       
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
-                                                               }
-                                                       }
-                                               });
-                               } else {
-                                       return 
FutureUtils.completedExceptionally(
-                                               new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
-                                                       " for task " + 
executionAttemptID + " was found."));
-                               }
+                               // Run asynchronously because it might be 
blocking
+                               getRpcService().execute(
+                                       () -> {
+                                               try {
+                                                       
networkEnvironment.updatePartitionInfo(partitionInfo);
+                                               } catch (Throwable t) {
 
 Review comment:
   I think we should not catch `Throwable` here. If an unchecked exception 
occurs, it should simply bubble up and cause the component to fail.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to