Re: Question on GroupBy query results merging process

2018-07-21 Thread Jihoon Son
Hi Jisoo,

I think it would work, but there is currently at least one reason to not
use queryRunnerFactory.mergeRunners() in groupBy v2.

In v2, the broker assumes that intermediate aggregates from historicals are
always sorted by grouping keys. This enables merge-sorted aggregation on
brokers which is much more efficient than hash aggregation in terms of
speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
works based on hash aggregation. This would cause the same issue of groupBy
v1 which requires the full materialization of intermediates on brokers
(either in memory or on disk). Also, this requires to sort the result
of queryRunnerFactory.mergeRunners() before
calling queryToolChest.mergeResults() since it assumes the input is sorted
by grouping keys.

> I am wondering if the improvement that I gained was from changing logic
was mainly from deserializing the sub-query results in parallel (by calling
queryRunnerFactory.mergeRunners() which seems to enable parallelism), or if
it was also benefitting from using GroupByMergingQueryRunnerV2 that has
parallel combining threads enabled.

I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
(it's disabled by default). Then, it's difficult to tell where you gained
the benefit. You might ned to run more benchmarks to figure out.

Best,
Jihoon

On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim 
wrote:

> Hi Jihoon,
>
> Thanks for the reply. So what I ended up doing for merging a list of
> serialized result Sequences (which is a byte array) was:
>
> 1) Create a stream of  out of the list
> 2) For each serialized sequence in a list, create a query runner that
> deserializes the byte array and returns a Sequence (along with applying
> PreComputeManipulatorFn). Now the stream becomes Stream
> 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> injector and given query) on the materialized list of QueryRunner
> 4) Create a FluentQueryRunner out of 3) and add necessary steps including
> mergeResults(), which essentially calls queryToolChest.mergeResults() on
> queryRunnerFactory.mergeRunners()
>
> Does my approach look valid or is it something that I shouldn't be doing
> for merging query results? Before I changed the merging logic to the above
> I encountered a problem with merging sub-query results properly for very
> heavy groupBy queries.
>
> I haven't had much chance to read through all the group by query processing
> logic, but I am wondering if the improvement that I gained was from
> changing logic was mainly from deserializing the sub-query results in
> parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> enable parallelism), or if it was also benefitting from using
> GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.
>
> Thanks,
> Jisoo
>
> On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son  wrote:
>
> > Hi Jisoo,
> >
> > sorry, the previous email was sent by accident.
> >
> > The initial version of groupBy v2 wasn't capable of combining
> intermediates
> > in parallel. Some of our customers met the similar issue to yours, and
> so I
> > was working on improving groupBy v2 performance for a while.
> >
> > Parallel combining on brokers definitely makes sense. I was thinking to
> add
> > a sort of ParallelMergeSequence which is a parallel version of
> > MergeSequence, but it can be anything if it supports parallel combining
> on
> > brokers.
> >
> > One thing I'm worrying about is, most query processing interfaces in
> > brokers are using Sequence, and thus using another stuff for a specific
> > query type might make the codes complicated. I think we need to avoid it
> if
> > possible.
> >
> > Best,
> > Jihoon
> >
> > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son  wrote:
> >
> > > Hi Jisoo,
> > >
> > > the initial version of groupBy v2
> > >
> > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim 
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I am currently working on a project that uses Druid's QueryRunner and
> > >> other
> > >> druid-processing classes. It uses Druid's own classes to calculate
> query
> > >> results. I have been testing large GroupBy queries (using v2), and it
> > >> seems
> > >> like parallel combining threads for GroupBy queries are only enabled
> on
> > >> the
> > >> historical level. I think it is only getting called by
> > >> GroupByStrategyV2.mergeRunners()
> > >> <
> > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > 12.1/processing/src/main/java/io/druid/query/groupby/
> > strategy/GroupByStrategyV2.java#L335
> > >> >
> > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> > >> historicals.
> > >>
> > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant
> for
> > >> computing and merging per-segment results only, or can they also be
> used
> > >> on
> > >> the broker level? I changed the logic of my project from calling
> > >> queryToolChest.mergeResults() on MergeSequence (created by providing a
> > >> 

Re: Question on GroupBy query results merging process

2018-07-19 Thread Jisoo Kim
Hi Jihoon,

Thanks for the reply. So what I ended up doing for merging a list of
serialized result Sequences (which is a byte array) was:

1) Create a stream of  out of the list
2) For each serialized sequence in a list, create a query runner that
deserializes the byte array and returns a Sequence (along with applying
PreComputeManipulatorFn). Now the stream becomes Stream
3) Call queryRunnerFactory.mergeRunners() (factory is created from the
injector and given query) on the materialized list of QueryRunner
4) Create a FluentQueryRunner out of 3) and add necessary steps including
mergeResults(), which essentially calls queryToolChest.mergeResults() on
queryRunnerFactory.mergeRunners()

Does my approach look valid or is it something that I shouldn't be doing
for merging query results? Before I changed the merging logic to the above
I encountered a problem with merging sub-query results properly for very
heavy groupBy queries.

I haven't had much chance to read through all the group by query processing
logic, but I am wondering if the improvement that I gained was from
changing logic was mainly from deserializing the sub-query results in
parallel (by calling queryRunnerFactory.mergeRunners() which seems to
enable parallelism), or if it was also benefitting from using
GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.

Thanks,
Jisoo

On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son  wrote:

> Hi Jisoo,
>
> sorry, the previous email was sent by accident.
>
> The initial version of groupBy v2 wasn't capable of combining intermediates
> in parallel. Some of our customers met the similar issue to yours, and so I
> was working on improving groupBy v2 performance for a while.
>
> Parallel combining on brokers definitely makes sense. I was thinking to add
> a sort of ParallelMergeSequence which is a parallel version of
> MergeSequence, but it can be anything if it supports parallel combining on
> brokers.
>
> One thing I'm worrying about is, most query processing interfaces in
> brokers are using Sequence, and thus using another stuff for a specific
> query type might make the codes complicated. I think we need to avoid it if
> possible.
>
> Best,
> Jihoon
>
> On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son  wrote:
>
> > Hi Jisoo,
> >
> > the initial version of groupBy v2
> >
> > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim 
> > wrote:
> >
> >> Hi all,
> >>
> >> I am currently working on a project that uses Druid's QueryRunner and
> >> other
> >> druid-processing classes. It uses Druid's own classes to calculate query
> >> results. I have been testing large GroupBy queries (using v2), and it
> >> seems
> >> like parallel combining threads for GroupBy queries are only enabled on
> >> the
> >> historical level. I think it is only getting called by
> >> GroupByStrategyV2.mergeRunners()
> >> <
> >> https://github.com/apache/incubator-druid/blob/druid-0.
> 12.1/processing/src/main/java/io/druid/query/groupby/
> strategy/GroupByStrategyV2.java#L335
> >> >
> >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> >> historicals.
> >>
> >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
> >> computing and merging per-segment results only, or can they also be used
> >> on
> >> the broker level? I changed the logic of my project from calling
> >> queryToolChest.mergeResults() on MergeSequence (created by providing a
> >> list
> >> of per-segment/per-server sequences) to calling
> >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> (where
> >> each runner returns a deserialized result sequence), and that seemed to
> >> have reduced really heavy groupby query computation time or failures by
> >> quite a lot. Or is this just a coincidence and there shouldn't be a
> >> performance difference in merging groupby query results, and the only
> >> difference could've been by parallelizing the deserialization of result
> >> sequences from sub-queries?
> >>
> >> Thanks,
> >> Jisoo
> >>
> >
>


Re: Question on GroupBy query results merging process

2018-07-19 Thread Jihoon Son
Hi Jisoo,

sorry, the previous email was sent by accident.

The initial version of groupBy v2 wasn't capable of combining intermediates
in parallel. Some of our customers met the similar issue to yours, and so I
was working on improving groupBy v2 performance for a while.

Parallel combining on brokers definitely makes sense. I was thinking to add
a sort of ParallelMergeSequence which is a parallel version of
MergeSequence, but it can be anything if it supports parallel combining on
brokers.

One thing I'm worrying about is, most query processing interfaces in
brokers are using Sequence, and thus using another stuff for a specific
query type might make the codes complicated. I think we need to avoid it if
possible.

Best,
Jihoon

On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son  wrote:

> Hi Jisoo,
>
> the initial version of groupBy v2
>
> On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim 
> wrote:
>
>> Hi all,
>>
>> I am currently working on a project that uses Druid's QueryRunner and
>> other
>> druid-processing classes. It uses Druid's own classes to calculate query
>> results. I have been testing large GroupBy queries (using v2), and it
>> seems
>> like parallel combining threads for GroupBy queries are only enabled on
>> the
>> historical level. I think it is only getting called by
>> GroupByStrategyV2.mergeRunners()
>> <
>> https://github.com/apache/incubator-druid/blob/druid-0.12.1/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L335
>> >
>> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
>> historicals.
>>
>> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
>> computing and merging per-segment results only, or can they also be used
>> on
>> the broker level? I changed the logic of my project from calling
>> queryToolChest.mergeResults() on MergeSequence (created by providing a
>> list
>> of per-segment/per-server sequences) to calling
>> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
>> each runner returns a deserialized result sequence), and that seemed to
>> have reduced really heavy groupby query computation time or failures by
>> quite a lot. Or is this just a coincidence and there shouldn't be a
>> performance difference in merging groupby query results, and the only
>> difference could've been by parallelizing the deserialization of result
>> sequences from sub-queries?
>>
>> Thanks,
>> Jisoo
>>
>


Re: Question on GroupBy query results merging process

2018-07-19 Thread Jihoon Son
Hi Jisoo,

the initial version of groupBy v2

On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim 
wrote:

> Hi all,
>
> I am currently working on a project that uses Druid's QueryRunner and other
> druid-processing classes. It uses Druid's own classes to calculate query
> results. I have been testing large GroupBy queries (using v2), and it seems
> like parallel combining threads for GroupBy queries are only enabled on the
> historical level. I think it is only getting called by
> GroupByStrategyV2.mergeRunners()
> <
> https://github.com/apache/incubator-druid/blob/druid-0.12.1/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L335
> >
> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> historicals.
>
> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
> computing and merging per-segment results only, or can they also be used on
> the broker level? I changed the logic of my project from calling
> queryToolChest.mergeResults() on MergeSequence (created by providing a list
> of per-segment/per-server sequences) to calling
> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
> each runner returns a deserialized result sequence), and that seemed to
> have reduced really heavy groupby query computation time or failures by
> quite a lot. Or is this just a coincidence and there shouldn't be a
> performance difference in merging groupby query results, and the only
> difference could've been by parallelizing the deserialization of result
> sequences from sub-queries?
>
> Thanks,
> Jisoo
>


Question on GroupBy query results merging process

2018-07-19 Thread Jisoo Kim
Hi all,

I am currently working on a project that uses Druid's QueryRunner and other
druid-processing classes. It uses Druid's own classes to calculate query
results. I have been testing large GroupBy queries (using v2), and it seems
like parallel combining threads for GroupBy queries are only enabled on the
historical level. I think it is only getting called by
GroupByStrategyV2.mergeRunners()

which is only called by GroupByQueryRunnerFactory.mergeRunners() on
historicals.

Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
computing and merging per-segment results only, or can they also be used on
the broker level? I changed the logic of my project from calling
queryToolChest.mergeResults() on MergeSequence (created by providing a list
of per-segment/per-server sequences) to calling
queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
each runner returns a deserialized result sequence), and that seemed to
have reduced really heavy groupby query computation time or failures by
quite a lot. Or is this just a coincidence and there shouldn't be a
performance difference in merging groupby query results, and the only
difference could've been by parallelizing the deserialization of result
sequences from sub-queries?

Thanks,
Jisoo