Thanks for reaching out to us Anupam. Going to take a look at these questions in depth over the next couple of days to give a proper reply.
Regards, jeagles On Mon, Mar 5, 2018 at 7:51 PM, Anupam <[email protected]> wrote: > Hi Tez experts, > > > > We are hitting some issues with proper handling of DataMovementEvent and > InputFailedEvent and are hoping if you could provide us with some guidance. > > > > Questions: > > # What is the expectation for handling InputFailedEvent in > AbstractLogicalInput implementations? What order do we expect to receive > DataMovementEvents and InputFailedEvents for particular targetIndex and > versionNumber? > > # When are we expected to call inputIsReady? Should we probe all physical > inputs before signaling this? > > # inputIsReady method documentation says it can be called multiple times. > In what scenario? > > # How can we be sure that there are no more events that we should wait > for? A generous timeout? How do we avoid unnecessary latency? > > > Some background for understanding this: > > > > We are using Tez DAG engine to run SCOPE[1] jobs. The operator and IO > implementation for SCOPE is in C++, let's call it scopehost. > > On a high level, we use Tez task to assemble information (scopeHostRunSpec) > to run the scopehost and then execute the process. The information is very > similar to task spec: vertexid, taskid, input/output channels. > > Note that we do have custom edge managers as well. > > Problem details: > > > We are hitting a problem with handling input failed events, and wondering > if there is something fundamental that we are missing. > > > > Say ScopeInput implements AgstractLogicalInput. > > The handleEvents logic looks like: > > > > ``` > > for (final Event event : events) { > > if (event instanceof DataMovementEvent) { > > final ScopeDataMovementEventChannelDetailsPayload taskPayload = > > > SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload()); > > > > // if we have already seen an InputFailedEvent for this targetIndex > > // with this version then just ignore this DataMovementEvent > > if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetInd > ex()) > > && obsoleteInputMap.get(dataMovementEvent.getTargetIndex()) > > == dataMovementEvent.getVersion()) { > > continue; > > } > > > > // Track the current latest version for this targetIndex > > this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(), > dataMovementEvent.getVersion()); > > > > // save the physical input path for this targetIndex > > this.taskPhysicalInputs.put( > > dataMovementEvent.getTargetIndex(), > > taskPayload.getPhysicalInput() > > ); > > } else if (event instanceof InputFailedEvent) { > > // Keep track the version of input that failed for this targetIndex > > obsoleteInputMap.put(inputFailedEvent.getTargetIndex(), > inputFailedEvent.getVersion()); > > } > > } > > > > // if I have seen all the inputs then signal the processor that this > logical input is ready > > if (taskPhysicalInputs.size() == getNumPhysicalInputs()) { > > LOG.info("Got all inputs (input count = " + > vertexPhysicalInputs.size() + ")"); > > getContext().inputIsReady(); > > } > > ``` > > > > where, handleDataMovementEvent and handleInputFailedEvent are: > > > > ``` > > private void handleInputFailedEvent(final InputFailedEvent > inputFailedEvent) { > > obsoleteInputMap.put(inputFailedEvent.getTargetIndex(), > inputFailedEvent.getVersion()); > > } > > > > private void handleDataMovementEvent(final DataMovementEvent > dataMovementEvent) { > > final ScopeDataMovementEventChannelDetailsPayload taskPayload = > > > SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload()); > > if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex()) > > && obsoleteInputMap.get(dataMovementEvent.getTargetIndex()) > > == dataMovementEvent.getVersion()) { > > return; > > } > > > > this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(), > dataMovementEvent.getVersion()); > > this.taskPhysicalInputs.put( > > dataMovementEvent.getTargetIndex(), > > taskPayload.getPhysicalInput() > > ); > > } > > ``` > > > > In certain run we get the following sequence of events: > > > > 2018-03-05 08:25:15,591 [INFO] > [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] > |task.ScopeInput|: Data movement event received for targetIndex: 0 version: > 0 > > 2018-03-05 08:25:15,591 [INFO] > [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] > |task.ScopeInput|: Got all inputs (input count = 1) > > 2018-03-05 08:25:15,591 [INFO] > [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] > |task.ScopeInput|: Input failed event received for targetIndex: 0 version: > 0 > > 2018-03-05 08:25:15,591 [INFO] > [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] > |task.ScopeInput|: Got all inputs (input count = 1) > > > > > > This is a ScopeInput with only 1 physical input. We get > ScopeInput::handleEvents called with DataMovementEvent version 0. The code > doesn't know if there might be more events pending. > > We expected only one physical input so we indicated inputIsReady. > Subsequently we receive InputFailedEvent for this version. But we have > already set inputIsReady and hence the processor > > assembles scopeHostRunSpec with stale information causing input read > failure again. > > > > > > Reference: > > [1] SCOPE: easy and efficient parallel processing of massive data sets. > https://dl.acm.org/citation.cfm?id=1454166 >
