Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/254#discussion_r21552080
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
 ---
    @@ -111,111 +89,176 @@
        /** The I/O manager of the current environment (currently the one 
associated with the executing TaskManager). */
        private final IOManager ioManager;
     
    -   /** The input split provider that can be queried for new input splits.  
*/
    +   /** The input split provider that can be queried for new input splits. 
*/
        private final InputSplitProvider inputSplitProvider;
     
    -   
        /** The thread executing the task in the environment. */
        private Thread executingThread;
     
    -   /**
    -    * The RPC proxy to report accumulators to JobManager
    -    */
    +   /** The RPC proxy to report accumulators to JobManager. */
        private final AccumulatorProtocol accumulatorProtocolProxy;
     
    -   private final Map<String,FutureTask<Path>> cacheCopyTasks = new 
HashMap<String, FutureTask<Path>>();
    -   
    -   private final BroadcastVariableManager bcVarManager;
    -   
    -   private LocalBufferPool outputBufferPool;
    -   
    +   /** The network environment of the task manager */
    +   private final NetworkEnvironment networkEnvironment;
    +
    +   private final Map<String, FutureTask<Path>> cacheCopyTasks = new 
HashMap<String, FutureTask<Path>>();
    +
    +   private final IntermediateResultPartitionManager partitionManager;
    +
    +   private final TaskEventDispatcher taskEventDispatcher;
    +
        private AtomicBoolean canceled = new AtomicBoolean();
     
    +   private BufferWriter[] writers;
    +
    +   private BufferReader[] readers;
    +
    +   private Map<IntermediateDataSetID, BufferReader> readersById = new 
HashMap<IntermediateDataSetID, BufferReader>();
    +
    +   private IntermediateResultPartition[] producedPartitions;
    +
    +   public RuntimeEnvironment(
    +                   Task owner, TaskDeploymentDescriptor tdd, ClassLoader 
userCodeClassLoader,
    +                   MemoryManager memoryManager, IOManager ioManager, 
InputSplitProvider inputSplitProvider,
    +                   AccumulatorProtocol accumulatorProtocolProxy, 
NetworkEnvironment networkEnvironment) throws Exception {
    +
    +           this.owner = checkNotNull(owner);
    +
    +           this.memoryManager = checkNotNull(memoryManager);
    +           this.ioManager = checkNotNull(ioManager);
    +           this.inputSplitProvider = checkNotNull(inputSplitProvider);
    +           this.accumulatorProtocolProxy = 
checkNotNull(accumulatorProtocolProxy);
    +           this.partitionManager = 
checkNotNull(networkEnvironment.getPartitionManager());
    +           this.taskEventDispatcher = 
checkNotNull(networkEnvironment.getTaskEventDispatcher());
    +
    +           this.networkEnvironment = checkNotNull(networkEnvironment);
    +
    +           boolean success = false;
     
    -   public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
    -                                                   ClassLoader 
userCodeClassLoader,
    -                                                   MemoryManager 
memoryManager, IOManager ioManager,
    -                                                   InputSplitProvider 
inputSplitProvider,
    -                                                   AccumulatorProtocol 
accumulatorProtocolProxy,
    -                                                   
BroadcastVariableManager bcVarManager)
    -           throws Exception
    -   {
    -           Preconditions.checkNotNull(owner);
    -           Preconditions.checkNotNull(memoryManager);
    -           Preconditions.checkNotNull(ioManager);
    -           Preconditions.checkNotNull(inputSplitProvider);
    -           Preconditions.checkNotNull(accumulatorProtocolProxy);
    -           Preconditions.checkNotNull(userCodeClassLoader);
    -           Preconditions.checkNotNull(bcVarManager);
    -           
    -           this.owner = owner;
    -
    -           this.memoryManager = memoryManager;
    -           this.ioManager = ioManager;
    -           this.inputSplitProvider = inputSplitProvider;
    -           this.accumulatorProtocolProxy = accumulatorProtocolProxy;
    -           this.bcVarManager = bcVarManager;
    -
    -           // load and instantiate the invokable class
    -           this.userCodeClassLoader = userCodeClassLoader;
    -           try {
    -                   final String className = tdd.getInvokableClassName();
    -                   this.invokableClass = Class.forName(className, true, 
userCodeClassLoader).asSubclass(AbstractInvokable.class);
    -           }
    -           catch (Throwable t) {
    -                   throw new Exception("Could not load invokable class.", 
t);
    -           }
    -           
                try {
    -                   this.invokable = this.invokableClass.newInstance();
    -           }
    -           catch (Throwable t) {
    -                   throw new Exception("Could not instantiate the 
invokable class.", t);
    -           }
    -           
    -           this.jobConfiguration = tdd.getJobConfiguration();
    -           this.taskConfiguration = tdd.getTaskConfiguration();
    -           
    -           this.invokable.setEnvironment(this);
    -           
    -           // make sure that user classloader is available, because 
registerInputOutput might call usercode
    -           {
    -                   Thread currentThread = Thread.currentThread();
    -                   ClassLoader context = 
currentThread.getContextClassLoader();
    -                   
currentThread.setContextClassLoader(userCodeClassLoader);
    +                   // 
----------------------------------------------------------------
    +                   // Produced intermediate result partitions and writers
    +                   // 
----------------------------------------------------------------
    +
    +                   List<IntermediateResultPartitionDeploymentDescriptor> 
irpdd = tdd.getProducedPartitions();
    +
    +                   this.producedPartitions = new 
IntermediateResultPartition[irpdd.size()];
    +                   this.writers = new BufferWriter[irpdd.size()];
    +
    +                   for (int i = 0; i < producedPartitions.length; i++) {
    +                           IntermediateResultPartitionDeploymentDescriptor 
irp = irpdd.get(i);
    +
    +                           producedPartitions[i] = new 
IntermediateResultPartition(i, getJobID(), tdd.getExecutionId(), 
networkEnvironment, irp);
    +
    +                           writers[i] = new 
BufferWriter(producedPartitions[i]);
    +
    +                           
partitionManager.registerIntermediateResultPartition(producedPartitions[i]);
    +                   }
    +
    +                   registerAllWritersWithTaskEventDispatcher();
    +
    +                   // 
----------------------------------------------------------------
    +                   // Consumed intermediate result partition readers
    +                   // 
----------------------------------------------------------------
    +
    +                   
List<IntermediateResultPartitionConsumerDeploymentDescriptor> 
consumedPartitions = tdd.getConsumedPartitions();
    +
    +                   this.readers = new 
BufferReader[consumedPartitions.size()];
    +
    +                   for (int i = 0; i < consumedPartitions.size(); i++) {
    +                           
IntermediateResultPartitionConsumerDeploymentDescriptor cdd = 
consumedPartitions.get(i);
    +
    +                           // There is one input channel per partition of 
the consumed
    +                           // result. Each of these input channels 
consumes the queue
    +                           // matching the index of this subtask (or queue 
0 for
    +                           // forwarding distribution patterns). In cases 
where the number
    +                           // of queues and subtasks does not match, there 
needs to be
    +                           // a separate repartitioning task.
    +
    +                           IntermediateResultPartitionInfo[] partitions = 
cdd.getPartitions();
    +                           InputChannel[] inputChannels = new 
InputChannel[partitions.length];
    +
    +                           IntermediateDataSetID resultId = 
cdd.getResultId();
    +                           int queueToRequest = cdd.getQueueToRequest();
    +
    +                           BufferReader reader = new 
BufferReader(resultId, owner, networkEnvironment, inputChannels.length, 
queueToRequest);
    +                           readersById.put(resultId, reader);
    +
    +                           for (int j = 0; j < partitions.length; j++) {
    +                                   IntermediateResultPartitionInfo 
partitionInfo = partitions[j];
    +                                   IntermediateResultPartitionID 
partitionId = partitionInfo.getPartitionId();
    +
    +                                   IntermediateResultPartitionLocation 
location = partitionInfo.getProducerLocation();
    +
    +                                   switch (location) {
    +                                           case LOCAL:
    +                                                   inputChannels[j] = new 
LocalInputChannel(j, partitionId, reader);
    +                                                   break;
    +                                           case REMOTE:
    +                                                   inputChannels[j] = new 
RemoteInputChannel(j, partitionId, reader, partitionInfo.getProducerAddress());
    +                                                   break;
    +                                           case UNKNOWN:
    +                                                   inputChannels[j] = new 
UnknownInputChannel(j, partitionId, reader);
    +                                                   break;
    +                                   }
    +
    +                                   reader.setInputChannel(partitionId, 
inputChannels[j]);
    +                           }
    +
    +                           readers[i] = reader;
    +                   }
    +
    +                   // 
----------------------------------------------------------------
    +                   // Invokable setup
    +                   // 
----------------------------------------------------------------
    +                   // Note: This has to be done *after* the readers and 
writers have
    +                   // been setup, because the invokable relies on them for 
I/O.
    +                   // 
----------------------------------------------------------------
    +
    +                   // load and instantiate the invokable class
    +                   this.userCodeClassLoader = 
checkNotNull(userCodeClassLoader);
    +                   /* Class of the task to run in this environment. */
    +                   Class<? extends AbstractInvokable> invokableClass;
                        try {
    -                           this.invokable.registerInputOutput();
    +                           final String className = 
tdd.getInvokableClassName();
    +                           invokableClass = Class.forName(className, true, 
userCodeClassLoader).asSubclass(AbstractInvokable.class);
                        }
    -                   finally {
    -                           currentThread.setContextClassLoader(context);
    +                   catch (Throwable t) {
    +                           throw new Exception("Could not load invokable 
class.", t);
                        }
    -           }
     
    -           List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
    -           List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
    -           
    -           
    -           if (this.inputGates.size() != inGates.size()) {
    -                   throw new Exception("The number of readers created in 
'registerInputOutput()' "
    -                                   + "is different than the number of 
connected incoming edges in the job graph.");
    -           }
    -           if (this.outputGates.size() != outGates.size()) {
    -                   throw new Exception("The number of writers created in 
'registerInputOutput()' "
    -                                   + "is different than the number of 
connected outgoing edges in the job graph.");
    +                   try {
    +                           this.invokable = invokableClass.newInstance();
    +                   }
    +                   catch (Throwable t) {
    +                           throw new Exception("Could not instantiate the 
invokable class.", t);
    +                   }
    +
    +                   this.jobConfiguration = tdd.getJobConfiguration();
    +                   this.taskConfiguration = tdd.getTaskConfiguration();
    +
    +                   this.invokable.setEnvironment(this);
    +                   this.invokable.registerInputOutput();
    +
    +                   success = true;
                }
    -           
    -           for (int i = 0; i < inGates.size(); i++) {
    -                   
this.inputGates.get(i).initializeChannels(inGates.get(i));
    +           catch (Throwable t) {
    +                   LOG.error(ExceptionUtils.stringifyException(t), t);
    --- End diff --
    
    I think that the `stringifyException` call is unnecessary and even 
hindering proper logging. I would remove it, construct a proper error message 
for the log and pass the exception as the second parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to