I would advise against relying on any specific ordering of batches coming in. 
When you want to scale up to multiple threads, you will no longer be able to 
rely on any order because scheduling is generally pretty nondeterministic. I 
would suggest accumulating all batches just like in Hash Join, and when 
InputFinished is called, sort them. I’d even suggest not relying on input 
batches being sorted within themselves, so you’ll have to implement a mini 
“sort node” (if you don’t have some other fancier data structure for this 
join). The accumulation itself shouldn’t be a performance hit either because 
the threads that would be processing the join will continue processing the 
inputs to the join, so the overall throughput shouldn’t be affected. 

After the sorting, you can kick off a task group that will compute the results. 
One thing you’ll have to experiment with is how many tasks to start: one for 
each pair of batches, or one for each left-side batch, or one for each 
right-side batch. If it’s the first, it may be useful to extend the TaskGroup 
interface to allow for two-dimensional task groups (so that it would be easier 
to start a task for each pair).  

Sasha

> On Apr 26, 2022, at 11:03 AM, Li Jin <ice.xell...@gmail.com> wrote:
> 
>> In order to produce a output for a left batch, I would need to wait until
> I received enough batches from the right tables to cover all potential
> matches (wait until I have seen right timestamps outside the matching range)
> Add a bit more explanation, let's say the time range of the current left
> batch is (2020101, 20200201), in order to produce the join result for this
> batch, I need to receive all data from the right tables from (-inf,
> 20200201) and I can know this once I have seen data from all right tables
> that have timestamp after 20200201 (since data arrives in time order). But
> I do not need to receive all data in order to produce an output batch
> (unlike hash join).
> 
> On Tue, Apr 26, 2022 at 1:59 PM Li Jin <ice.xell...@gmail.com> wrote:
> 
>> Thanks both for the reply. To add a bit more context, I am trying to
>> implement an "asof join". Here I have one left table and n right table, and
>> all batches arrive in time order.
>> 
>> In order to produce a output for a left batch, I would need to wait until
>> I received enough batches from the right tables to cover all potential
>> matches (wait until I have seen right timestamps outside the matching range)
>> 
>> From both replies it sounds like I should just do the check if I got
>> enough data in InputReceived function and do work there when I have enough
>> data.
>> 
>> However, one thing that I am not sure about is how does the
>> parallelization comes into play - it sounds like InputReceived could be
>> called by multiple thread of the same input node for different batches?
>> Currently I have just trying to get a baseline implementation that has one
>> thread doing the join so if InputReceived is called by multiple thread I
>> might ended up blocking other threads unnecessarily.
>> 
>> If I have a dedicate thread/executor that does the join and InputReceived
>> just queue the batches and return immediately, I felt like it would be more
>> efficient.
>> 
>> Thoughts?
>> 
>> Thanks,
>> Li
>> 
>> On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <
>> krassovskysa...@gmail.com> wrote:
>> 
>>> If I understand correctly, on InputReceived you’ll be accumulating
>>> batches until you have enough to compute the next output? In that case, you
>>> have two options: you can either just immediately compute it using the same
>>> thread, or call the schedule_callback directly (not using the scheduler). I
>>> think your pseudocode is correct - since whether or not you can output the
>>> next batch can only change on InputReceived, that’s the only spot you need
>>> to check. I think an elaboration of your pseudocode could be something like:
>>> 
>>> Status InputReceived(Batch b)
>>>    lock(accum_lock);
>>>    accum.push_back(b);
>>>    if(enough_inputs)
>>>        vector<Batch> batches = std::move(accum);
>>>        unlock(accum_lock);
>>>        compute_next_output(batches);
>>>    return Status::OK();
>>> 
>>> Sasha
>>> 
>>>> On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote:
>>>> 
>>>> Thanks! That's super helpful.
>>>> 
>>>> A follow up question on TaskScheduler - What's the correct way to
>>> define a
>>>> task that "do work if input batches are ready, otherwise try later"?
>>>> 
>>>> Sth like
>>>> 
>>>> Status try_process():
>>>> if enough_inputs_to _produce_next_output:
>>>> compute_and_produce_next_output();
>>>> return Status::OK()
>>>> else:
>>>> # Is this right?
>>>> # Exit and try later
>>>> return Status::OK();
>>>> 
>>>> If I register this function with TaskScheduler, I think it only gets run
>>>> once, so I think I might need to schedule the next task when inputs are
>>> not
>>>> ready but I am not sure of the best way to do that. Any suggestions?
>>>> 
>>>> Li
>>>> 
>>>> On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
>>> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>>
>>>> wrote:
>>>> 
>>>>> Hi Li,
>>>>> I’ll answer the questions in order:
>>>>> 
>>>>> 1. Your guess is correct! The Hash Join may be used standalone (mostly
>>> in
>>>>> testing or benchmarking for now) or as part of the ExecNode. The
>>> ExecNode
>>>>> will pass the task to the Executor to be scheduled, or will run it
>>>>> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
>>>>> benchmark uses OpenMP to schedule things, and passes a lambda that does
>>>>> OpenMP things to the HashJoin.
>>>>> 
>>>>> 2. We might not have an executor if we want to execute synchronously.
>>> This
>>>>> is set during construction of the ExecContext, which is given to the
>>>>> ExecPlan during creation. If the ExecContext has a nullptr Executor,
>>> then
>>>>> we are in async mode, otherwise we use the Executor to schedule. One
>>>>> confusing thing is that we also have a SerialExecutor - I’m actually
>>> not
>>>>> quite sure what the difference between using that and setting the
>>> Executor
>>>>> to nullptr is (might have something to do with testing?). @Weston
>>> probably
>>>>> knows
>>>>> 
>>>>> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
>>>>> the function that implements the work that needs to be split up,
>>>>> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
>>>>> will receive the index of the task. If you’re familiar with OpenMP,
>>> it’s
>>>>> equivalent to this:
>>>>> 
>>>>> #pragma omp parallel for
>>>>> for(int i = 0; i < 100; i++)
>>>>> TaskImpl(omp_get_thread_num(), i);
>>>>> TaskGroupContinuationImpl();
>>>>> 
>>>>> Examples of the two are here:
>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>>>> <
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>>> 
>>>>> <
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>> <
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
>>>> 
>>>>>> 
>>>>> 
>>>>> Sasha
>>>>> 
>>>>>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>> 
>>>>>> Hello!
>>>>>> 
>>>>>> I am reading the use of TaskScheduler inside C++ compute code (reading
>>>>> hash
>>>>>> join) and have some questions about it, in particular:
>>>>>> 
>>>>>> (1) What the purpose of SchedulerTaskCallback defined here:
>>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
>>>>>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
>>>>>> provide an implementation of a task executor, and the implementation
>>> of
>>>>>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
>>>>>> implementation)
>>>>>> 
>>>>>> (2) When would this task context not have an executor?
>>>>>> 
>>>>> 
>>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
>>>>>> 
>>>>>> (3) What's the difference between TaskImpl and
>>> TaskGroupContinuationImpl
>>>>> in
>>>>>> TaskScheduler::RegisterTaskGroup? And how would one normally define
>>>>>> TaskGroupContinuationImpl?
>>>>>> 
>>>>>> Sorry I am still learning the Arrow compute internals and appreciate
>>> help
>>>>>> on understanding these.
>>>>>> 
>>>>>> Li
>>> 
>>> 

Reply via email to