Pushing down Joins, Aggregates and filters, and data distribution questions

2017-06-01 Thread Muhammad Gelbana
First of all, I was very happy to at last attend the hangouts meeting, I've
been trying to do so for quite sometime.

I know I confused most of you during the meeting but that's because my
requirements aren't crystal clear at the moment and I'm still learning what
Drill can do. Hopefully I learn enough so I would be confident about the
options I have when I need to make implementation decisions.

Now to the point, and let me restate my case..

We have a proprietary datasource that can perform limits, aggregations,
filters and joins very fast. This datasource can handle SQL queries but not
all possible SQL syntax. I've been successful, so far, to pushdown joins,
filters and limits, but I'm still struggling with aggregates. I've sent an
email about aggregates to Calcite's mailing list.

The amount of data this datasource may be required to process can be
billions of records and 100s of GBs of data. So we are looking forward to
distribute this data among multiple servers to overcome storage limitations
and maximize throughput.

This distribution can be just duplicating the data to maximize throughput,
so each server will have the same set of data, *or* records may be
distributed among different servers, without duplication among these
servers because a single server may not be able to hold all the data. So
some tables may be duplicated and some tables may be distributed among
servers. Let's assume that the distribution details of each table is
available for the plugin.

Now I understand that for Drill to implement a query, it supports a
set of physical
operators . These
operators logic\code is generated at runtime and it's distributed among a
Drill cluster to be compiled and executed.

So to scan a table distributed\duplicated among 3 servers, I may want to
configure Drill to execute *SELECT * FROM TABLE* by running the same query
with an extra filter (to read\scan a specific portion of the table if the
table is duplicated, to maximize throughput) or by running the query
without modifications but having Drill run the query multiple times, once
against each server. I assume this can be done by having the table's
*GroupScan* return 3 different *SubScan*s when the
*GroupScan.getSpecificScan(int)* is called multiple times, with different
parameters of course.

These different parameters can be controlled by the output of
*GroupScan.getMinParallelizationWidth()* and
*GroupScan.getMaxParallelizationWidth()*, correct ?

Please correct me if I'm wrong about anything.

Now assuming what I said is correct, I have a couple of questions:

   1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
   handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
   I'll have *n* Scan operators distributed among Drill's cluster ?
   2. How can I control the amount of any type of physical operators per
   Drill cluster or node ? For instance, what if I want to have less
   *Filter* operators or more *Scan* operators, how can I do that ?

I'm aware that my distribution goal may not be as simple as I may have made
it sound.

Pardon me for the huge email and thanks a lot for your time.

*-*
*Muhammad Gelbana*
http://www.linkedin.com/in/mgelbana


Re: Partitioning for parquet

2017-06-01 Thread Raz Baluchi
I guess there is such a thing as over partitioning...

The query on the table partitioned by date spends most of the elapsed time
on the 'planning' phase, with the execution being roughly equal to the one
on the table partitioned by year and month.

Based on these results, I've added a third table which is partitioned
simply by year. I've also added an ORDER BY to the CTAS in an attempt to
sort the table by date.

This third table seems to have the fastest query times so far with the
least amount of 'planning'.  My take away from this exercise is to limit
the partitioning to the minimum required to obtain parquet files in the
range of 100 MB or so. Is that a valid lesson learned?

On Thu, Jun 1, 2017 at 1:05 AM, Jinfeng Ni  wrote:

> You may want to check if query on the second table is slower because of
> planning time or execution time. That could be determined by looking at the
> query profile in web-UI.
>
> Two factors might impact the planning time for second table having 11837:
> 1. Reading parquet metadata from those parquet files.  Parquet metadata
> cache file might help for the cases of large number of small files.
> 2. Filter expression evaluation cost : query second would evaluate
> expression 11837 times, vs just 410 times for first table.
>
> In general, if you have 100M rows in 11837 files, ==> that's about 8500
> rows per file. Performance-wise, this does not seem to be a good choice for
> parquet format.
>
>
>
> On Wed, May 31, 2017 at 9:33 PM, Padma Penumarthy 
> wrote:
>
> > Are you running same query on both tables ? What is the filter condition
> ?
> > Since they are partitioned differently, same filter may prune the files
> > differently.
> > If possible, can you share query profiles ?
> > You can check query profiles to see how many rows are being read from
> disk
> > in both cases.
> >
> > Thanks,
> > Padma
> >
> >
> > > On May 31, 2017, at 6:15 PM, Raz Baluchi 
> wrote:
> > >
> > > As an experiment, I created an event file will 100 million entries
> > spanning
> > > 25 years. I then created tables both ways, one partitioned by year and
> > > month and the other by date. The first table created 410 parquet files
> > and
> > > the second 11837.
> > >
> > > Querying the first table is consistently faster by a factor of 2x to
> 10x,
> > >
> > > Is this because drill is not very efficient at querying a large number
> of
> > > small(ish) parquet files?
> > >
> > > On Wed, May 31, 2017 at 6:42 PM, rahul challapalli <
> > > challapallira...@gmail.com> wrote:
> > >
> > >> If most of your queries use date column in the filter condition, I
> would
> > >> partition the data on the date column. Then you can simply say
> > >>
> > >> select * from events where `date` between '2016-11-11' and
> '2017-01-23';
> > >>
> > >> - Rahul
> > >>
> > >> On Wed, May 31, 2017 at 3:22 PM, Raz Baluchi 
> > >> wrote:
> > >>
> > >>> So, if I understand you correctly, I would have to include the 'yr'
> and
> > >>> 'mnth' columns in addition to the 'date' column in the query?
> > >>>
> > >>> e.g.
> > >>>
> > >>> select * from events where yr in (2016, 2017)  and mnth in (11,12,1)
> > and
> > >>> date between '2016-11-11' and '2017-01-23';
> > >>>
> > >>> Is that correct?
> > >>>
> > >>> On Wed, May 31, 2017 at 4:49 PM, rahul challapalli <
> > >>> challapallira...@gmail.com> wrote:
> > >>>
> >  How to partition data is dependent on how you want to access your
> > data.
> > >>> If
> >  you can foresee that most of the queries use year and month, then
> > >>> go-ahead
> >  and partition the data on those 2 columns. You can do that like
> below
> > 
> >  create table partitioned_data partition by (yr, mnth) as select
> >  extract(year from `date`) yr, extract(month from `date`) mnth,
> `date`,
> >   from mydata;
> > 
> >  For partitioning to have any benefit, your queries should have
> filters
> > >> on
> >  month and year columns.
> > 
> >  - Rahul
> > 
> >  On Wed, May 31, 2017 at 1:28 PM, Raz Baluchi  >
> >  wrote:
> > 
> > > Hi all,
> > >
> > > Trying to understand parquet partitioning works.
> > >
> > > What is the recommended partitioning scheme for event data that
> will
> > >> be
> > > queried primarily by date. I assume that partitioning by year and
> > >> month
> > > would be optimal?
> > >
> > > Lets say I have data that looks like:
> > >
> > > application,status,date,message
> > > kafka,down,2017-03023 04:53,zookeeper is not available
> > >
> > >
> > > Would I have to create new columns for year and month?
> > >
> > > e.g.
> > > application,status,date,message,year,month
> > > kafka,down,2017-03023 04:53,zookeeper is not available,2017,03
> > >
> > > and then perform a CTAS using the year and month columns as the
> >  'partition
> > > by'?
> > >
> > > Thanks
> > >
> > 
> > >>>
> > >>
> >
> >
>


Re: Partitioning for parquet

2017-06-01 Thread Jinfeng Ni
Looks like the default parquet block size is 512MB [1]. There is an ongoing
patch which may put one single parquet block into a single file system
block [2].

In general, if you are doing pruning over large number of small parquet
files, the filter evaluation during pruning may become a bottleneck. Unlike
filter evaluation in execution time which uses run-time generated code and
is more efficient, filter evaluation in planning time uses interpreter and
is less efficient. Partition pruning only makes sense when each partition
contains large number or rows.

1. https://drill.apache.org/docs/parquet-format/#configuring-
the-size-of-parquet-files
2. https://issues.apache.org/jira/browse/DRILL-5379


On Thu, Jun 1, 2017 at 6:55 AM, Raz Baluchi  wrote:

> I guess there is such a thing as over partitioning...
>
> The query on the table partitioned by date spends most of the elapsed time
> on the 'planning' phase, with the execution being roughly equal to the one
> on the table partitioned by year and month.
>
> Based on these results, I've added a third table which is partitioned
> simply by year. I've also added an ORDER BY to the CTAS in an attempt to
> sort the table by date.
>
> This third table seems to have the fastest query times so far with the
> least amount of 'planning'.  My take away from this exercise is to limit
> the partitioning to the minimum required to obtain parquet files in the
> range of 100 MB or so. Is that a valid lesson learned?
>
> On Thu, Jun 1, 2017 at 1:05 AM, Jinfeng Ni  wrote:
>
> > You may want to check if query on the second table is slower because of
> > planning time or execution time. That could be determined by looking at
> the
> > query profile in web-UI.
> >
> > Two factors might impact the planning time for second table having 11837:
> > 1. Reading parquet metadata from those parquet files.  Parquet metadata
> > cache file might help for the cases of large number of small files.
> > 2. Filter expression evaluation cost : query second would evaluate
> > expression 11837 times, vs just 410 times for first table.
> >
> > In general, if you have 100M rows in 11837 files, ==> that's about 8500
> > rows per file. Performance-wise, this does not seem to be a good choice
> for
> > parquet format.
> >
> >
> >
> > On Wed, May 31, 2017 at 9:33 PM, Padma Penumarthy 
> > wrote:
> >
> > > Are you running same query on both tables ? What is the filter
> condition
> > ?
> > > Since they are partitioned differently, same filter may prune the files
> > > differently.
> > > If possible, can you share query profiles ?
> > > You can check query profiles to see how many rows are being read from
> > disk
> > > in both cases.
> > >
> > > Thanks,
> > > Padma
> > >
> > >
> > > > On May 31, 2017, at 6:15 PM, Raz Baluchi 
> > wrote:
> > > >
> > > > As an experiment, I created an event file will 100 million entries
> > > spanning
> > > > 25 years. I then created tables both ways, one partitioned by year
> and
> > > > month and the other by date. The first table created 410 parquet
> files
> > > and
> > > > the second 11837.
> > > >
> > > > Querying the first table is consistently faster by a factor of 2x to
> > 10x,
> > > >
> > > > Is this because drill is not very efficient at querying a large
> number
> > of
> > > > small(ish) parquet files?
> > > >
> > > > On Wed, May 31, 2017 at 6:42 PM, rahul challapalli <
> > > > challapallira...@gmail.com> wrote:
> > > >
> > > >> If most of your queries use date column in the filter condition, I
> > would
> > > >> partition the data on the date column. Then you can simply say
> > > >>
> > > >> select * from events where `date` between '2016-11-11' and
> > '2017-01-23';
> > > >>
> > > >> - Rahul
> > > >>
> > > >> On Wed, May 31, 2017 at 3:22 PM, Raz Baluchi  >
> > > >> wrote:
> > > >>
> > > >>> So, if I understand you correctly, I would have to include the 'yr'
> > and
> > > >>> 'mnth' columns in addition to the 'date' column in the query?
> > > >>>
> > > >>> e.g.
> > > >>>
> > > >>> select * from events where yr in (2016, 2017)  and mnth in
> (11,12,1)
> > > and
> > > >>> date between '2016-11-11' and '2017-01-23';
> > > >>>
> > > >>> Is that correct?
> > > >>>
> > > >>> On Wed, May 31, 2017 at 4:49 PM, rahul challapalli <
> > > >>> challapallira...@gmail.com> wrote:
> > > >>>
> > >  How to partition data is dependent on how you want to access your
> > > data.
> > > >>> If
> > >  you can foresee that most of the queries use year and month, then
> > > >>> go-ahead
> > >  and partition the data on those 2 columns. You can do that like
> > below
> > > 
> > >  create table partitioned_data partition by (yr, mnth) as select
> > >  extract(year from `date`) yr, extract(month from `date`) mnth,
> > `date`,
> > >   from mydata;
> > > 
> > >  For partitioning to have any benefit, your queries should have
> > filters
> > > >> on
> > >  month and year columns.
> > > 
> > >  - Rahul
> > > 
> > >  On Wed, May 3

Re: Partitioning for parquet

2017-06-01 Thread Andries Engelbrecht
Sorting the data by the partition column in the CTAS is a good plan normally, 
not only does it sort the output by the most likely filter column but also 
limits the number of parquet files being written to a single stream per 
partition. Drill can write data per fragment by partition, unless you add a 
sort operator.

And as Jinfeng mentioned metadata caching is very helpful on large data sets.

There is some info available on partition strategies for Drill on parquet to 
optimize performance.

--Andries


On 6/1/17, 6:55 AM, "yousef.l...@gmail.com on behalf of Raz Baluchi" 
 wrote:

I guess there is such a thing as over partitioning...

The query on the table partitioned by date spends most of the elapsed time
on the 'planning' phase, with the execution being roughly equal to the one
on the table partitioned by year and month.

Based on these results, I've added a third table which is partitioned
simply by year. I've also added an ORDER BY to the CTAS in an attempt to
sort the table by date.

This third table seems to have the fastest query times so far with the
least amount of 'planning'.  My take away from this exercise is to limit
the partitioning to the minimum required to obtain parquet files in the
range of 100 MB or so. Is that a valid lesson learned?

On Thu, Jun 1, 2017 at 1:05 AM, Jinfeng Ni  wrote:

> You may want to check if query on the second table is slower because of
> planning time or execution time. That could be determined by looking at 
the
> query profile in web-UI.
>
> Two factors might impact the planning time for second table having 11837:
> 1. Reading parquet metadata from those parquet files.  Parquet metadata
> cache file might help for the cases of large number of small files.
> 2. Filter expression evaluation cost : query second would evaluate
> expression 11837 times, vs just 410 times for first table.
>
> In general, if you have 100M rows in 11837 files, ==> that's about 8500
> rows per file. Performance-wise, this does not seem to be a good choice 
for
> parquet format.
>
>
>
> On Wed, May 31, 2017 at 9:33 PM, Padma Penumarthy 
> wrote:
>
> > Are you running same query on both tables ? What is the filter condition
> ?
> > Since they are partitioned differently, same filter may prune the files
> > differently.
> > If possible, can you share query profiles ?
> > You can check query profiles to see how many rows are being read from
> disk
> > in both cases.
> >
> > Thanks,
> > Padma
> >
> >
> > > On May 31, 2017, at 6:15 PM, Raz Baluchi 
> wrote:
> > >
> > > As an experiment, I created an event file will 100 million entries
> > spanning
> > > 25 years. I then created tables both ways, one partitioned by year and
> > > month and the other by date. The first table created 410 parquet files
> > and
> > > the second 11837.
> > >
> > > Querying the first table is consistently faster by a factor of 2x to
> 10x,
> > >
> > > Is this because drill is not very efficient at querying a large number
> of
> > > small(ish) parquet files?
> > >
> > > On Wed, May 31, 2017 at 6:42 PM, rahul challapalli <
> > > challapallira...@gmail.com> wrote:
> > >
> > >> If most of your queries use date column in the filter condition, I
> would
> > >> partition the data on the date column. Then you can simply say
> > >>
> > >> select * from events where `date` between '2016-11-11' and
> '2017-01-23';
> > >>
> > >> - Rahul
> > >>
> > >> On Wed, May 31, 2017 at 3:22 PM, Raz Baluchi 
> > >> wrote:
> > >>
> > >>> So, if I understand you correctly, I would have to include the 'yr'
> and
> > >>> 'mnth' columns in addition to the 'date' column in the query?
> > >>>
> > >>> e.g.
> > >>>
> > >>> select * from events where yr in (2016, 2017)  and mnth in (11,12,1)
> > and
> > >>> date between '2016-11-11' and '2017-01-23';
> > >>>
> > >>> Is that correct?
> > >>>
> > >>> On Wed, May 31, 2017 at 4:49 PM, rahul challapalli <
> > >>> challapallira...@gmail.com> wrote:
> > >>>
> >  How to partition data is dependent on how you want to access your
> > data.
> > >>> If
> >  you can foresee that most of the queries use year and month, then
> > >>> go-ahead
> >  and partition the data on those 2 columns. You can do that like
> below
> > 
> >  create table partitioned_data partition by (yr, mnth) as select
> >  extract(year from `date`) yr, extract(month from `date`) mnth,
> `date`,
> >   from mydata;
> > 
> >  For partitioning to have any benefit, your queries should have
> filters
> > >> on
> >  month and year columns.
> > 
 

Parquet on S3 - timeouts

2017-06-01 Thread Raz Baluchi
Now that I have Drill working with parquet files on dfs, the next step was
to move the parquet files to S3.

I get pretty good performance - I can query for events  by date range
within 10 seconds. ( out of a total of ~ 800M events across 25 years)
 However, there seems to be some threshold beyond which queries start
timing out.

SYSTEM ERROR: ConnectionPoolTimeoutException: Timeout waiting for
connection from pool

My first question is, is there a default timeout value to queries against
S3? Anything that takes longer than ~ 150 seconds seems to hit the timeout
error.

The second question has to do with the possible conditions that trigger the
prolonged query time. It seems that if I increase the filters beyond a
certain number - it doesn't take much - the query times out.

For example the query:

select * from events where YEAR in (2012, 2013) works fine - however,
select * from events where YEAR in (2012, 2013, 2014) fails with a timeout.

To make it worse, I can't use the first query either  until I restart
drill...


Re: Parquet on S3 - timeouts

2017-06-01 Thread Abhishek Girish
Can you take a look at [1] and let us know if that helps resolve your issue?

[1]
https://drill.apache.org/docs/s3-storage-plugin/#quering-parquet-format-files-on-s3

On Thu, Jun 1, 2017 at 12:55 PM, Raz Baluchi  wrote:

> Now that I have Drill working with parquet files on dfs, the next step was
> to move the parquet files to S3.
>
> I get pretty good performance - I can query for events  by date range
> within 10 seconds. ( out of a total of ~ 800M events across 25 years)
>  However, there seems to be some threshold beyond which queries start
> timing out.
>
> SYSTEM ERROR: ConnectionPoolTimeoutException: Timeout waiting for
> connection from pool
>
> My first question is, is there a default timeout value to queries against
> S3? Anything that takes longer than ~ 150 seconds seems to hit the timeout
> error.
>
> The second question has to do with the possible conditions that trigger the
> prolonged query time. It seems that if I increase the filters beyond a
> certain number - it doesn't take much - the query times out.
>
> For example the query:
>
> select * from events where YEAR in (2012, 2013) works fine - however,
> select * from events where YEAR in (2012, 2013, 2014) fails with a timeout.
>
> To make it worse, I can't use the first query either  until I restart
> drill...
>


Re: Pushing down Joins, Aggregates and filters, and data distribution questions

2017-06-01 Thread rahul challapalli
I would first recommend you spend some time reading the execution flow
inside drill [1]. Try to understand specifically what major/minor fragments
are and that different major fragments can have different levels of
parallelism.

Let us take a simple query which runs on a 2 node cluster

select * from employee where salary > 10;

Now how do we control parallelism for the above query? Unfortunately the
generic answer is not a simple one. But since I conveniently took a simple
query with a single major fragment, lets make an effort to understand this.
There are 3 variables which control the parallelism

1. No of cores available
2. planner.width.max_per_node : Maximum number of minor fragments within a
major fragment per node
3. Parallelism supported by the scan for the particular storage plugin
involved

Lets try to understand the last parameter which is of interest to storage
plugin developers. Like you hinted, the number of sub-scans determines the
parallelism of the above query in the absence of the first 2 variables. But
how many subscan's can exist? This unfortunately depends on how you can
split the data (by respecting the row boundaries) and is dependent on the
storage format. Hypothetically, lets say you have a file which is composed
of 100 parts and each part contains few records and you know that a single
record is not split across multiple parts. Now with this setup, the storage
plugin simply has to get the number of parts present in the data and
instantiate that many subscans.

So in the above simplistic setup the max parallelization that can be
achieved for the major fragment (and in effect the whole query) is
determined by the number of parts present in the data which is 100. Now if
you do not set (2), the default max parallelization limit is 70% of the
number of cores available. If (2) is set by the user, that determines the
max threads that can be used per node. So for our example, the max
parallelization that can be supported is MIN(100,
planner.width.max_per_node). So if the user has planner.width.max_per_node
set to 30, then we end up with a total of 60 threads (on 2 nodes combined)
which need to run 100 minor fragments

With this understanding lets move to the next related topic which is
"Assignment". Now we have 60 threads (across 2 nodes) and 100 minor
fragments. So how do you assign minor fragments to specific nodes? This is
determined by the affinity that a particular node has for handling a
particular subscan. This can be controlled by the storage plugin by using
the "public List getOperatorAffinity()" method in the
GroupScan class.

Now to your questions

1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
   handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
   I'll have *n* Scan operators distributed among Drill's cluster ?

I am not sure if I even understood your question correctly. Each minor
fragment gets executed in a single thread. In my example, each minor
fragment executes one subscan, followed by project, filter etc. Read [1] to
understand more about this.

2. How can I control the amount of any type of physical operators per
   Drill cluster or node ? For instance, what if I want to have less
   *Filter* operators or more *Scan* operators, how can I do that ?

I am not sure if we can control parallelism at the operator level within a
major fragment.

[1] https://drill.apache.org/docs/drill-query-execution/


On Thu, Jun 1, 2017 at 5:17 AM, Muhammad Gelbana 
wrote:
>
> First of all, I was very happy to at last attend the hangouts meeting,
I've
> been trying to do so for quite sometime.
>
> I know I confused most of you during the meeting but that's because my
> requirements aren't crystal clear at the moment and I'm still learning
what
> Drill can do. Hopefully I learn enough so I would be confident about the
> options I have when I need to make implementation decisions.
>
> Now to the point, and let me restate my case..
>
> We have a proprietary datasource that can perform limits, aggregations,
> filters and joins very fast. This datasource can handle SQL queries but
not
> all possible SQL syntax. I've been successful, so far, to pushdown joins,
> filters and limits, but I'm still struggling with aggregates. I've sent an
> email about aggregates to Calcite's mailing list.
>
> The amount of data this datasource may be required to process can be
> billions of records and 100s of GBs of data. So we are looking forward to
> distribute this data among multiple servers to overcome storage
limitations
> and maximize throughput.
>
> This distribution can be just duplicating the data to maximize throughput,
> so each server will have the same set of data, *or* records may be
> distributed among different servers, without duplication among these
> servers because a single server may not be able to hold all the data. So
> some tables may be duplicated and some tables may be distributed among
> servers. Let's assume that the distribution d

Re: Parquet on S3 - timeouts

2017-06-01 Thread Raz Baluchi
I noticed that if I precede the query with a select count(*) with the same
filters, I no longer experience timeouts. By 'priming' the query in this
way, the second query is also faster. This seems to be an acceptable
workaround as it it seems to allow me to essentially include all partitions
in the filter and still get results pretty quickly. I am still curious why
this occurs?

On Thu, Jun 1, 2017 at 4:08 PM, Abhishek Girish  wrote:

> Can you take a look at [1] and let us know if that helps resolve your
> issue?
>
> [1]
> https://drill.apache.org/docs/s3-storage-plugin/#quering-
> parquet-format-files-on-s3
>
> On Thu, Jun 1, 2017 at 12:55 PM, Raz Baluchi 
> wrote:
>
> > Now that I have Drill working with parquet files on dfs, the next step
> was
> > to move the parquet files to S3.
> >
> > I get pretty good performance - I can query for events  by date range
> > within 10 seconds. ( out of a total of ~ 800M events across 25 years)
> >  However, there seems to be some threshold beyond which queries start
> > timing out.
> >
> > SYSTEM ERROR: ConnectionPoolTimeoutException: Timeout waiting for
> > connection from pool
> >
> > My first question is, is there a default timeout value to queries against
> > S3? Anything that takes longer than ~ 150 seconds seems to hit the
> timeout
> > error.
> >
> > The second question has to do with the possible conditions that trigger
> the
> > prolonged query time. It seems that if I increase the filters beyond a
> > certain number - it doesn't take much - the query times out.
> >
> > For example the query:
> >
> > select * from events where YEAR in (2012, 2013) works fine - however,
> > select * from events where YEAR in (2012, 2013, 2014) fails with a
> timeout.
> >
> > To make it worse, I can't use the first query either  until I restart
> > drill...
> >
>


Re: Parquet on S3 - timeouts

2017-06-01 Thread Raz Baluchi
setting

  
fs.s3a.connection.maximum
100
  

does fix the problem. No more timeouts and very quick response. No need to
'prime' the query...

On Thu, Jun 1, 2017 at 4:08 PM, Abhishek Girish  wrote:

> Can you take a look at [1] and let us know if that helps resolve your
> issue?
>
> [1]
> https://drill.apache.org/docs/s3-storage-plugin/#quering-
> parquet-format-files-on-s3
>
> On Thu, Jun 1, 2017 at 12:55 PM, Raz Baluchi 
> wrote:
>
> > Now that I have Drill working with parquet files on dfs, the next step
> was
> > to move the parquet files to S3.
> >
> > I get pretty good performance - I can query for events  by date range
> > within 10 seconds. ( out of a total of ~ 800M events across 25 years)
> >  However, there seems to be some threshold beyond which queries start
> > timing out.
> >
> > SYSTEM ERROR: ConnectionPoolTimeoutException: Timeout waiting for
> > connection from pool
> >
> > My first question is, is there a default timeout value to queries against
> > S3? Anything that takes longer than ~ 150 seconds seems to hit the
> timeout
> > error.
> >
> > The second question has to do with the possible conditions that trigger
> the
> > prolonged query time. It seems that if I increase the filters beyond a
> > certain number - it doesn't take much - the query times out.
> >
> > For example the query:
> >
> > select * from events where YEAR in (2012, 2013) works fine - however,
> > select * from events where YEAR in (2012, 2013, 2014) fails with a
> timeout.
> >
> > To make it worse, I can't use the first query either  until I restart
> > drill...
> >
>


Re: Parquet on S3 - timeouts

2017-06-01 Thread Abhishek Girish






Cool, thanks for confirming.



_
From: Raz Baluchi 
Sent: Thursday, June 1, 2017 2:14 PM
Subject: Re: Parquet on S3 - timeouts
To:  


setting

  
fs.s3a.connection.maximum
100
  

does fix the problem. No more timeouts and very quick response. No need to
'prime' the query...

On Thu, Jun 1, 2017 at 4:08 PM, Abhishek Girish  wrote:

> Can you take a look at [1] and let us know if that helps resolve your
> issue?
>
> [1]
> https://drill.apache.org/docs/s3-storage-plugin/#quering-
> parquet-format-files-on-s3
>
> On Thu, Jun 1, 2017 at 12:55 PM, Raz Baluchi 
> wrote:
>
> > Now that I have Drill working with parquet files on dfs, the next step
> was
> > to move the parquet files to S3.
> >
> > I get pretty good performance - I can query for events  by date range
> > within 10 seconds. ( out of a total of ~ 800M events across 25 years)
> >  However, there seems to be some threshold beyond which queries start
> > timing out.
> >
> > SYSTEM ERROR: ConnectionPoolTimeoutException: Timeout waiting for
> > connection from pool
> >
> > My first question is, is there a default timeout value to queries against
> > S3? Anything that takes longer than ~ 150 seconds seems to hit the
> timeout
> > error.
> >
> > The second question has to do with the possible conditions that trigger
> the
> > prolonged query time. It seems that if I increase the filters beyond a
> > certain number - it doesn't take much - the query times out.
> >
> > For example the query:
> >
> > select * from events where YEAR in (2012, 2013) works fine - however,
> > select * from events where YEAR in (2012, 2013, 2014) fails with a
> timeout.
> >
> > To make it worse, I can't use the first query either  until I restart
> > drill...
> >
>






Re: Pushing down Joins, Aggregates and filters, and data distribution questions

2017-06-01 Thread Paul Rogers
Hi Muhammad,

> I have a couple of questions:
> 
>   1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
>   handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
>   I'll have *n* Scan operators distributed among Drill's cluster ?

As Rahul explained, subscans are assigned to fragments. Let’s say that three 
were assigned to the same fragment. In this case, a single scan operator 
handles all three. Your “Scan Batch Creator” will create a separate “Record 
Reader” for each subscan and hand them to the scan operator. The scan operator 
then opens, reads, an closes each in turn.

>   2. How can I control the amount of any type of physical operators per
>   Drill cluster or node ? For instance, what if I want to have less
>   *Filter* operators or more *Scan* operators, how can I do that ?
> 
I’ve not seen anything that suggests that this is possible. Drill groups 
operators into fragments, then parallelizes the fragments. To accomplish what 
you want, you’d need to figure out how Drill slices the DAG into fragments and 
adjust the slicing to isolate the operators as you desire. Network exchanges 
join your custom fragments.

Parallelization is generic for all fragments as Rahul explained; I’ve seen 
nothing that suggests we have a way to identify different categories of 
fragments and apply different parallelization rules to each.

Maybe there is some Calcite magic available?

- Paul




UNORDERED_RECEIVER taking 70% of query time

2017-06-01 Thread jasbir.sing
Hi,

I am running a simple query which performs JOIN operation between two parquet 
files and it takes around 3-4 secs and I noticed that 70% of the time is used 
by UNORDERED_RECEIVER.

Sample query is -

select sum(sales),week from 
dfs.`C:\parquet-location\F8894180-AFFB-4803-B8CF-CCF883AA5AAF-Search_Snapshot_Data.parquet`
 where model_component_id in(
select model_component_id from dfs.`C:\parquet-location\poc48k.parquet`) group 
by week


Can we somehow reduce unordered receiver time?

Please find the below screenshot of Visualized plan

[cid:image001.png@01D2DB8D.B3B1C790]








This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Re: UNORDERED_RECEIVER taking 70% of query time

2017-06-01 Thread Abhishek Girish
Attachment hasn't come through. Can you upload the query profile to some
cloud storage and share a link to it?

Also, please share details on how large your dataset is, number of
Drillbits, memory and other configurations.


On Thu, Jun 1, 2017 at 10:18 PM,  wrote:

> Hi,
>
>
>
> I am running a simple query which performs JOIN operation between two
> parquet files and it takes around 3-4 secs and I noticed that 70% of the
> time is used by UNORDERED_RECEIVER.
>
>
>
> Sample query is –
>
>
>
> select sum(sales),week from dfs.`C:\parquet-location\
> F8894180-AFFB-4803-B8CF-CCF883AA5AAF-Search_Snapshot_Data.parquet` where
> model_component_id in(
>
> select model_component_id from dfs.`C:\parquet-location\poc48k.parquet`)
> group by week
>
>
>
>
>
> Can we somehow reduce unordered receiver time?
>
>
>
> Please find the below screenshot of Visualized plan
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
> 
> __
>
> www.accenture.com
>