Thanks for reaching out to us Anupam.

Going to take a look at these questions in depth over the next couple of
days to give a proper reply.

Regards,
jeagles

On Mon, Mar 5, 2018 at 7:51 PM, Anupam <[email protected]> wrote:

> Hi Tez experts,
>
>
>
> We are hitting some issues with proper handling of DataMovementEvent and
> InputFailedEvent and are hoping if you could provide us with some guidance.
>
>
>
> Questions:
>
>  # What is the expectation for handling InputFailedEvent in
> AbstractLogicalInput implementations? What order do we expect to receive
> DataMovementEvents and InputFailedEvents for particular targetIndex and
> versionNumber?
>
>  # When are we expected to call inputIsReady? Should we probe all physical
> inputs before signaling this?
>
>  # inputIsReady method documentation says it can be called multiple times.
> In what scenario?
>
>  # How can we be sure that there are no more events that we should wait
> for? A generous timeout? How do we avoid unnecessary latency?
>
>
> Some background for understanding this:
>
>
>
> We are using Tez DAG engine to run SCOPE[1] jobs. The operator and IO
> implementation for SCOPE is in C++, let's call it scopehost.
>
> On a high level, we use Tez task to assemble information (scopeHostRunSpec)
> to run the scopehost and then execute the process. The information is very
> similar to task spec: vertexid, taskid, input/output channels.
>
> Note that we do have custom edge managers as well.
>
> Problem details:
>
>
> We are hitting a problem with handling input failed events, and wondering
> if there is something fundamental that we are missing.
>
>
>
> Say ScopeInput implements AgstractLogicalInput.
>
> The handleEvents logic looks like:
>
>
>
> ```
>
>     for (final Event event : events) {
>
>       if (event instanceof DataMovementEvent) {
>
>         final ScopeDataMovementEventChannelDetailsPayload taskPayload =
>
>
> SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload());
>
>
>
>         // if we have already seen an InputFailedEvent for this targetIndex
>
>         // with this version then just ignore this DataMovementEvent
>
>         if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetInd
> ex())
>
>           && obsoleteInputMap.get(dataMovementEvent.getTargetIndex())
>
>            == dataMovementEvent.getVersion()) {
>
>           continue;
>
>         }
>
>
>
>         // Track the current latest version for this targetIndex
>
>         this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(),
> dataMovementEvent.getVersion());
>
>
>
>         // save the physical input path for this targetIndex
>
>         this.taskPhysicalInputs.put(
>
>           dataMovementEvent.getTargetIndex(),
>
>           taskPayload.getPhysicalInput()
>
>         );
>
>       } else if (event instanceof InputFailedEvent) {
>
>         // Keep track the version of input that failed for this targetIndex
>
>         obsoleteInputMap.put(inputFailedEvent.getTargetIndex(),
> inputFailedEvent.getVersion());
>
>       }
>
>     }
>
>
>
>     // if I have seen all the inputs then signal the processor that this
> logical input is ready
>
>     if (taskPhysicalInputs.size() == getNumPhysicalInputs()) {
>
>       LOG.info("Got all inputs (input count = " +
> vertexPhysicalInputs.size() + ")");
>
>       getContext().inputIsReady();
>
>     }
>
> ```
>
>
>
> where, handleDataMovementEvent and handleInputFailedEvent are:
>
>
>
> ```
>
>   private void handleInputFailedEvent(final InputFailedEvent
> inputFailedEvent) {
>
>     obsoleteInputMap.put(inputFailedEvent.getTargetIndex(),
> inputFailedEvent.getVersion());
>
>   }
>
>
>
>   private void handleDataMovementEvent(final DataMovementEvent
> dataMovementEvent) {
>
>     final ScopeDataMovementEventChannelDetailsPayload taskPayload =
>
>
> SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload());
>
>     if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex())
>
>         && obsoleteInputMap.get(dataMovementEvent.getTargetIndex())
>
>            == dataMovementEvent.getVersion()) {
>
>       return;
>
>     }
>
>
>
>     this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(),
> dataMovementEvent.getVersion());
>
>     this.taskPhysicalInputs.put(
>
>         dataMovementEvent.getTargetIndex(),
>
>         taskPayload.getPhysicalInput()
>
>     );
>
>   }
>
> ```
>
>
>
> In certain run we get the following sequence of events:
>
>
>
> 2018-03-05 08:25:15,591 [INFO]
> [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
> |task.ScopeInput|: Data movement event received for targetIndex: 0 version:
> 0
>
> 2018-03-05 08:25:15,591 [INFO]
> [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
> |task.ScopeInput|: Got all inputs (input count = 1)
>
> 2018-03-05 08:25:15,591 [INFO]
> [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
> |task.ScopeInput|: Input failed event received for targetIndex: 0 version:
> 0
>
> 2018-03-05 08:25:15,591 [INFO]
> [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
> |task.ScopeInput|: Got all inputs (input count = 1)
>
>
>
>
>
> This is a ScopeInput with only 1 physical input. We get
> ScopeInput::handleEvents called with DataMovementEvent version 0. The code
> doesn't know if there might be more events pending.
>
> We expected only one physical input so we indicated inputIsReady.
> Subsequently we receive InputFailedEvent for this version. But we have
> already set inputIsReady and hence the processor
>
> assembles scopeHostRunSpec with stale information causing input read
> failure again.
>
>
>
>
>
> Reference:
>
> [1] SCOPE: easy and efficient parallel processing of massive data sets.
> https://dl.acm.org/citation.cfm?id=1454166
>

Reply via email to