If I understand correctly, on InputReceived you’ll be accumulating batches
until you have enough to compute the next output? In that case, you have two
options: you can either just immediately compute it using the same thread, or
call the schedule_callback directly (not using the scheduler). I think your
pseudocode is correct - since whether or not you can output the next batch can
only change on InputReceived, that’s the only spot you need to check. I think
an elaboration of your pseudocode could be something like:
Status InputReceived(Batch b)
lock(accum_lock);
accum.push_back(b);
if(enough_inputs)
vector<Batch> batches = std::move(accum);
unlock(accum_lock);
compute_next_output(batches);
return Status::OK();
Sasha
> On Apr 25, 2022, at 3:29 PM, Li Jin <[email protected]> wrote:
>
> Thanks! That's super helpful.
>
> A follow up question on TaskScheduler - What's the correct way to define a
> task that "do work if input batches are ready, otherwise try later"?
>
> Sth like
>
> Status try_process():
> if enough_inputs_to _produce_next_output:
> compute_and_produce_next_output();
> return Status::OK()
> else:
> # Is this right?
> # Exit and try later
> return Status::OK();
>
> If I register this function with TaskScheduler, I think it only gets run
> once, so I think I might need to schedule the next task when inputs are not
> ready but I am not sure of the best way to do that. Any suggestions?
>
> Li
>
> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <[email protected]
> <mailto:[email protected]>>
> wrote:
>
>> Hi Li,
>> I’ll answer the questions in order:
>>
>> 1. Your guess is correct! The Hash Join may be used standalone (mostly in
>> testing or benchmarking for now) or as part of the ExecNode. The ExecNode
>> will pass the task to the Executor to be scheduled, or will run it
>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>> benchmark uses OpenMP to schedule things, and passes a lambda that does
>> OpenMP things to the HashJoin.
>>
>> 2. We might not have an executor if we want to execute synchronously. This
>> is set during construction of the ExecContext, which is given to the
>> ExecPlan during creation. If the ExecContext has a nullptr Executor, then
>> we are in async mode, otherwise we use the Executor to schedule. One
>> confusing thing is that we also have a SerialExecutor - I’m actually not
>> quite sure what the difference between using that and setting the Executor
>> to nullptr is (might have something to do with testing?). @Weston probably
>> knows
>>
>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>> the function that implements the work that needs to be split up,
>> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>> will receive the index of the task. If you’re familiar with OpenMP, it’s
>> equivalent to this:
>>
>> #pragma omp parallel for
>> for(int i = 0; i < 100; i++)
>> TaskImpl(omp_get_thread_num(), i);
>> TaskGroupContinuationImpl();
>>
>> Examples of the two are here:
>>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416>
>>>
>>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>> <
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>
>> <https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458>
>>>
>>
>> Sasha
>>
>>> On Apr 25, 2022, at 8:35 AM, Li Jin <[email protected]> wrote:
>>>
>>> Hello!
>>>
>>> I am reading the use of TaskScheduler inside C++ compute code (reading
>> hash
>>> join) and have some questions about it, in particular:
>>>
>>> (1) What the purpose of SchedulerTaskCallback defined here:
>>>
>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>>> provide an implementation of a task executor, and the implementation of
>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>>> implementation)
>>>
>>> (2) When would this task context not have an executor?
>>>
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>>>
>>> (3) What's the difference between TaskImpl and TaskGroupContinuationImpl
>> in
>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>>> TaskGroupContinuationImpl?
>>>
>>> Sorry I am still learning the Arrow compute internals and appreciate help
>>> on understanding these.
>>>
>>> Li