Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/254#discussion_r21552080
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
---
@@ -111,111 +89,176 @@
/** The I/O manager of the current environment (currently the one
associated with the executing TaskManager). */
private final IOManager ioManager;
- /** The input split provider that can be queried for new input splits.
*/
+ /** The input split provider that can be queried for new input splits.
*/
private final InputSplitProvider inputSplitProvider;
-
/** The thread executing the task in the environment. */
private Thread executingThread;
- /**
- * The RPC proxy to report accumulators to JobManager
- */
+ /** The RPC proxy to report accumulators to JobManager. */
private final AccumulatorProtocol accumulatorProtocolProxy;
- private final Map<String,FutureTask<Path>> cacheCopyTasks = new
HashMap<String, FutureTask<Path>>();
-
- private final BroadcastVariableManager bcVarManager;
-
- private LocalBufferPool outputBufferPool;
-
+ /** The network environment of the task manager */
+ private final NetworkEnvironment networkEnvironment;
+
+ private final Map<String, FutureTask<Path>> cacheCopyTasks = new
HashMap<String, FutureTask<Path>>();
+
+ private final IntermediateResultPartitionManager partitionManager;
+
+ private final TaskEventDispatcher taskEventDispatcher;
+
private AtomicBoolean canceled = new AtomicBoolean();
+ private BufferWriter[] writers;
+
+ private BufferReader[] readers;
+
+ private Map<IntermediateDataSetID, BufferReader> readersById = new
HashMap<IntermediateDataSetID, BufferReader>();
+
+ private IntermediateResultPartition[] producedPartitions;
+
+ public RuntimeEnvironment(
+ Task owner, TaskDeploymentDescriptor tdd, ClassLoader
userCodeClassLoader,
+ MemoryManager memoryManager, IOManager ioManager,
InputSplitProvider inputSplitProvider,
+ AccumulatorProtocol accumulatorProtocolProxy,
NetworkEnvironment networkEnvironment) throws Exception {
+
+ this.owner = checkNotNull(owner);
+
+ this.memoryManager = checkNotNull(memoryManager);
+ this.ioManager = checkNotNull(ioManager);
+ this.inputSplitProvider = checkNotNull(inputSplitProvider);
+ this.accumulatorProtocolProxy =
checkNotNull(accumulatorProtocolProxy);
+ this.partitionManager =
checkNotNull(networkEnvironment.getPartitionManager());
+ this.taskEventDispatcher =
checkNotNull(networkEnvironment.getTaskEventDispatcher());
+
+ this.networkEnvironment = checkNotNull(networkEnvironment);
+
+ boolean success = false;
- public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
- ClassLoader
userCodeClassLoader,
- MemoryManager
memoryManager, IOManager ioManager,
- InputSplitProvider
inputSplitProvider,
- AccumulatorProtocol
accumulatorProtocolProxy,
-
BroadcastVariableManager bcVarManager)
- throws Exception
- {
- Preconditions.checkNotNull(owner);
- Preconditions.checkNotNull(memoryManager);
- Preconditions.checkNotNull(ioManager);
- Preconditions.checkNotNull(inputSplitProvider);
- Preconditions.checkNotNull(accumulatorProtocolProxy);
- Preconditions.checkNotNull(userCodeClassLoader);
- Preconditions.checkNotNull(bcVarManager);
-
- this.owner = owner;
-
- this.memoryManager = memoryManager;
- this.ioManager = ioManager;
- this.inputSplitProvider = inputSplitProvider;
- this.accumulatorProtocolProxy = accumulatorProtocolProxy;
- this.bcVarManager = bcVarManager;
-
- // load and instantiate the invokable class
- this.userCodeClassLoader = userCodeClassLoader;
- try {
- final String className = tdd.getInvokableClassName();
- this.invokableClass = Class.forName(className, true,
userCodeClassLoader).asSubclass(AbstractInvokable.class);
- }
- catch (Throwable t) {
- throw new Exception("Could not load invokable class.",
t);
- }
-
try {
- this.invokable = this.invokableClass.newInstance();
- }
- catch (Throwable t) {
- throw new Exception("Could not instantiate the
invokable class.", t);
- }
-
- this.jobConfiguration = tdd.getJobConfiguration();
- this.taskConfiguration = tdd.getTaskConfiguration();
-
- this.invokable.setEnvironment(this);
-
- // make sure that user classloader is available, because
registerInputOutput might call usercode
- {
- Thread currentThread = Thread.currentThread();
- ClassLoader context =
currentThread.getContextClassLoader();
-
currentThread.setContextClassLoader(userCodeClassLoader);
+ //
----------------------------------------------------------------
+ // Produced intermediate result partitions and writers
+ //
----------------------------------------------------------------
+
+ List<IntermediateResultPartitionDeploymentDescriptor>
irpdd = tdd.getProducedPartitions();
+
+ this.producedPartitions = new
IntermediateResultPartition[irpdd.size()];
+ this.writers = new BufferWriter[irpdd.size()];
+
+ for (int i = 0; i < producedPartitions.length; i++) {
+ IntermediateResultPartitionDeploymentDescriptor
irp = irpdd.get(i);
+
+ producedPartitions[i] = new
IntermediateResultPartition(i, getJobID(), tdd.getExecutionId(),
networkEnvironment, irp);
+
+ writers[i] = new
BufferWriter(producedPartitions[i]);
+
+
partitionManager.registerIntermediateResultPartition(producedPartitions[i]);
+ }
+
+ registerAllWritersWithTaskEventDispatcher();
+
+ //
----------------------------------------------------------------
+ // Consumed intermediate result partition readers
+ //
----------------------------------------------------------------
+
+
List<IntermediateResultPartitionConsumerDeploymentDescriptor>
consumedPartitions = tdd.getConsumedPartitions();
+
+ this.readers = new
BufferReader[consumedPartitions.size()];
+
+ for (int i = 0; i < consumedPartitions.size(); i++) {
+
IntermediateResultPartitionConsumerDeploymentDescriptor cdd =
consumedPartitions.get(i);
+
+ // There is one input channel per partition of
the consumed
+ // result. Each of these input channels
consumes the queue
+ // matching the index of this subtask (or queue
0 for
+ // forwarding distribution patterns). In cases
where the number
+ // of queues and subtasks does not match, there
needs to be
+ // a separate repartitioning task.
+
+ IntermediateResultPartitionInfo[] partitions =
cdd.getPartitions();
+ InputChannel[] inputChannels = new
InputChannel[partitions.length];
+
+ IntermediateDataSetID resultId =
cdd.getResultId();
+ int queueToRequest = cdd.getQueueToRequest();
+
+ BufferReader reader = new
BufferReader(resultId, owner, networkEnvironment, inputChannels.length,
queueToRequest);
+ readersById.put(resultId, reader);
+
+ for (int j = 0; j < partitions.length; j++) {
+ IntermediateResultPartitionInfo
partitionInfo = partitions[j];
+ IntermediateResultPartitionID
partitionId = partitionInfo.getPartitionId();
+
+ IntermediateResultPartitionLocation
location = partitionInfo.getProducerLocation();
+
+ switch (location) {
+ case LOCAL:
+ inputChannels[j] = new
LocalInputChannel(j, partitionId, reader);
+ break;
+ case REMOTE:
+ inputChannels[j] = new
RemoteInputChannel(j, partitionId, reader, partitionInfo.getProducerAddress());
+ break;
+ case UNKNOWN:
+ inputChannels[j] = new
UnknownInputChannel(j, partitionId, reader);
+ break;
+ }
+
+ reader.setInputChannel(partitionId,
inputChannels[j]);
+ }
+
+ readers[i] = reader;
+ }
+
+ //
----------------------------------------------------------------
+ // Invokable setup
+ //
----------------------------------------------------------------
+ // Note: This has to be done *after* the readers and
writers have
+ // been setup, because the invokable relies on them for
I/O.
+ //
----------------------------------------------------------------
+
+ // load and instantiate the invokable class
+ this.userCodeClassLoader =
checkNotNull(userCodeClassLoader);
+ /* Class of the task to run in this environment. */
+ Class<? extends AbstractInvokable> invokableClass;
try {
- this.invokable.registerInputOutput();
+ final String className =
tdd.getInvokableClassName();
+ invokableClass = Class.forName(className, true,
userCodeClassLoader).asSubclass(AbstractInvokable.class);
}
- finally {
- currentThread.setContextClassLoader(context);
+ catch (Throwable t) {
+ throw new Exception("Could not load invokable
class.", t);
}
- }
- List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
- List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
-
-
- if (this.inputGates.size() != inGates.size()) {
- throw new Exception("The number of readers created in
'registerInputOutput()' "
- + "is different than the number of
connected incoming edges in the job graph.");
- }
- if (this.outputGates.size() != outGates.size()) {
- throw new Exception("The number of writers created in
'registerInputOutput()' "
- + "is different than the number of
connected outgoing edges in the job graph.");
+ try {
+ this.invokable = invokableClass.newInstance();
+ }
+ catch (Throwable t) {
+ throw new Exception("Could not instantiate the
invokable class.", t);
+ }
+
+ this.jobConfiguration = tdd.getJobConfiguration();
+ this.taskConfiguration = tdd.getTaskConfiguration();
+
+ this.invokable.setEnvironment(this);
+ this.invokable.registerInputOutput();
+
+ success = true;
}
-
- for (int i = 0; i < inGates.size(); i++) {
-
this.inputGates.get(i).initializeChannels(inGates.get(i));
+ catch (Throwable t) {
+ LOG.error(ExceptionUtils.stringifyException(t), t);
--- End diff --
I think that the `stringifyException` call is unnecessary and even
hindering proper logging. I would remove it, construct a proper error message
for the log and pass the exception as the second parameter.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---