Re: Data source API | sizeInBytes should be to *Scan

2015-02-11 Thread Aniket Bhatnagar
Circling back on this. Did you get a chance to re-look at this?

Thanks,
Aniket

On Sun, Feb 8, 2015, 2:53 AM Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 Thanks for looking into this. If this true, isn't this an issue today? The
 default implementation of sizeInBytes is 1 + broadcast threshold. So, if
 catalyst's cardinality estimation estimates even a small filter
 selectivity, it will result in broadcasting the relation. Therefore,
 shouldn't the default be much higher than broadcast threshold?

 Also, since the default implementation of sizeInBytes already exists in
 BaseRelation, I am not sure why the same/similar default implementation
 can't be provided with in *Scan specific sizeInBytes functions and have
 Catalyst always trust the size returned by DataSourceAPI (with default
 implementation being to never broadcast). Another thing that could be done
 is have sizeInBytes return Option[Long] so that Catalyst explicitly knows
 when DataSource was able to optimize the size. The reason why I would push
 for sizeInBytes in *Scan interfaces is because at times the data source
 implementation can more accurately predict the size output. For example,
 DataSource implementations for MongoDB, ElasticSearch, Cassandra, etc can
 easy use filter push downs to query the underlying storage to predict the
 size. Such predictions will be more accurate than Catalyst's prediction.
 Therefore, if its not a fundamental change in Catalyst, I would think this
 makes sense.


 Thanks,
 Aniket


 On Sat, Feb 7, 2015, 4:50 AM Reynold Xin r...@databricks.com wrote:

 We thought about this today after seeing this email. I actually built a
 patch for this (adding filter/column to data source stat estimation), but
 ultimately dropped it due to the potential problems the change the cause.

 The main problem I see is that column pruning/predicate pushdowns are
 advisory, i.e. the data source might or might not apply those filters.

 Without significantly complicating the data source API, it is hard for
 the optimizer (and future cardinality estimation) to know whether the
 filter/column pushdowns are advisory, and whether to incorporate that in
 cardinality estimation.

 Imagine this scenario: a data source applies a filter and estimates the
 filter's selectivity is 0.1, then the data set is reduced to 10% of the
 size. Catalyst's own cardinality estimation estimates the filter
 selectivity to 0.1 again, and thus the estimated data size is now 1% of the
 original data size, lowering than some threshold. Catalyst decides to
 broadcast the table. The actual table size is actually 10x the size.





 On Fri, Feb 6, 2015 at 3:39 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi Spark SQL committers

 I have started experimenting with data sources API and I was wondering if
 it makes sense to move the method sizeInBytes from BaseRelation to Scan
 interfaces. This is because that a relation may be able to leverage
 filter
 push down to estimate size potentially making a very large relation
 broadcast-able. Thoughts?

 Aniket





Re: Data source API | sizeInBytes should be to *Scan

2015-02-11 Thread Reynold Xin
Unfortunately this is not to happen for 1.3 (as a snapshot release is
already cut). We need to figure out how we are going to do cardinality
estimation before implementing this. If we need to do this in the future, I
think we can do it in a way that doesn't break existing APIs. Given I think
this won't bring much benefit right now (the only use for it is broadcast
joins), I think it is ok to push this till later.

The issue I asked still stands. What should the optimizer do w.r.t. filters
that are pushed into the data source? Should it ignore those filters, or
apply statistics again?

This also depends on how we want to do statistics. Hive (and a lot of other
database systems) does a scan to figure out statistics, and put all of
those statistics in a catalog. That is a more unified way to solve the
stats problem.

That said, in the world of federated databases, I can see why we might want
to push cardinality estimation to the data sources, since if the use case
is selecting a very small subset of the data from the sources, then it
might be hard for the statistics to be accurate in the catalog built from
data scan.



On Wed, Feb 11, 2015 at 10:47 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Circling back on this. Did you get a chance to re-look at this?

 Thanks,
 Aniket

 On Sun, Feb 8, 2015, 2:53 AM Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Thanks for looking into this. If this true, isn't this an issue today?
 The default implementation of sizeInBytes is 1 + broadcast threshold. So,
 if catalyst's cardinality estimation estimates even a small filter
 selectivity, it will result in broadcasting the relation. Therefore,
 shouldn't the default be much higher than broadcast threshold?

 Also, since the default implementation of sizeInBytes already exists in
 BaseRelation, I am not sure why the same/similar default implementation
 can't be provided with in *Scan specific sizeInBytes functions and have
 Catalyst always trust the size returned by DataSourceAPI (with default
 implementation being to never broadcast). Another thing that could be done
 is have sizeInBytes return Option[Long] so that Catalyst explicitly knows
 when DataSource was able to optimize the size. The reason why I would push
 for sizeInBytes in *Scan interfaces is because at times the data source
 implementation can more accurately predict the size output. For example,
 DataSource implementations for MongoDB, ElasticSearch, Cassandra, etc can
 easy use filter push downs to query the underlying storage to predict the
 size. Such predictions will be more accurate than Catalyst's prediction.
 Therefore, if its not a fundamental change in Catalyst, I would think this
 makes sense.


 Thanks,
 Aniket


 On Sat, Feb 7, 2015, 4:50 AM Reynold Xin r...@databricks.com wrote:

 We thought about this today after seeing this email. I actually built a
 patch for this (adding filter/column to data source stat estimation), but
 ultimately dropped it due to the potential problems the change the cause.

 The main problem I see is that column pruning/predicate pushdowns are
 advisory, i.e. the data source might or might not apply those filters.

 Without significantly complicating the data source API, it is hard for
 the optimizer (and future cardinality estimation) to know whether the
 filter/column pushdowns are advisory, and whether to incorporate that in
 cardinality estimation.

 Imagine this scenario: a data source applies a filter and estimates the
 filter's selectivity is 0.1, then the data set is reduced to 10% of the
 size. Catalyst's own cardinality estimation estimates the filter
 selectivity to 0.1 again, and thus the estimated data size is now 1% of the
 original data size, lowering than some threshold. Catalyst decides to
 broadcast the table. The actual table size is actually 10x the size.





 On Fri, Feb 6, 2015 at 3:39 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi Spark SQL committers

 I have started experimenting with data sources API and I was wondering
 if
 it makes sense to move the method sizeInBytes from BaseRelation to Scan
 interfaces. This is because that a relation may be able to leverage
 filter
 push down to estimate size potentially making a very large relation
 broadcast-able. Thoughts?

 Aniket





Re: Data source API | sizeInBytes should be to *Scan

2015-02-08 Thread Aniket Bhatnagar
Thanks for looking into this. If this true, isn't this an issue today? The
default implementation of sizeInBytes is 1 + broadcast threshold. So, if
catalyst's cardinality estimation estimates even a small filter
selectivity, it will result in broadcasting the relation. Therefore,
shouldn't the default be much higher than broadcast threshold?

Also, since the default implementation of sizeInBytes already exists in
BaseRelation, I am not sure why the same/similar default implementation
can't be provided with in *Scan specific sizeInBytes functions and have
Catalyst always trust the size returned by DataSourceAPI (with default
implementation being to never broadcast). Another thing that could be done
is have sizeInBytes return Option[Long] so that Catalyst explicitly knows
when DataSource was able to optimize the size. The reason why I would push
for sizeInBytes in *Scan interfaces is because at times the data source
implementation can more accurately predict the size output. For example,
DataSource implementations for MongoDB, ElasticSearch, Cassandra, etc can
easy use filter push downs to query the underlying storage to predict the
size. Such predictions will be more accurate than Catalyst's prediction.
Therefore, if its not a fundamental change in Catalyst, I would think this
makes sense.


Thanks,
Aniket


On Sat, Feb 7, 2015, 4:50 AM Reynold Xin r...@databricks.com wrote:

 We thought about this today after seeing this email. I actually built a
 patch for this (adding filter/column to data source stat estimation), but
 ultimately dropped it due to the potential problems the change the cause.

 The main problem I see is that column pruning/predicate pushdowns are
 advisory, i.e. the data source might or might not apply those filters.

 Without significantly complicating the data source API, it is hard for the
 optimizer (and future cardinality estimation) to know whether the
 filter/column pushdowns are advisory, and whether to incorporate that in
 cardinality estimation.

 Imagine this scenario: a data source applies a filter and estimates the
 filter's selectivity is 0.1, then the data set is reduced to 10% of the
 size. Catalyst's own cardinality estimation estimates the filter
 selectivity to 0.1 again, and thus the estimated data size is now 1% of the
 original data size, lowering than some threshold. Catalyst decides to
 broadcast the table. The actual table size is actually 10x the size.





 On Fri, Feb 6, 2015 at 3:39 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi Spark SQL committers

 I have started experimenting with data sources API and I was wondering if
 it makes sense to move the method sizeInBytes from BaseRelation to Scan
 interfaces. This is because that a relation may be able to leverage filter
 push down to estimate size potentially making a very large relation
 broadcast-able. Thoughts?

 Aniket





Re: Data source API | sizeInBytes should be to *Scan

2015-02-08 Thread Reynold Xin
We thought about this today after seeing this email. I actually built a
patch for this (adding filter/column to data source stat estimation), but
ultimately dropped it due to the potential problems the change the cause.

The main problem I see is that column pruning/predicate pushdowns are
advisory, i.e. the data source might or might not apply those filters.

Without significantly complicating the data source API, it is hard for the
optimizer (and future cardinality estimation) to know whether the
filter/column pushdowns are advisory, and whether to incorporate that in
cardinality estimation.

Imagine this scenario: a data source applies a filter and estimates the
filter's selectivity is 0.1, then the data set is reduced to 10% of the
size. Catalyst's own cardinality estimation estimates the filter
selectivity to 0.1 again, and thus the estimated data size is now 1% of the
original data size, lowering than some threshold. Catalyst decides to
broadcast the table. The actual table size is actually 10x the size.





On Fri, Feb 6, 2015 at 3:39 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Hi Spark SQL committers

 I have started experimenting with data sources API and I was wondering if
 it makes sense to move the method sizeInBytes from BaseRelation to Scan
 interfaces. This is because that a relation may be able to leverage filter
 push down to estimate size potentially making a very large relation
 broadcast-able. Thoughts?

 Aniket



Data source API | sizeInBytes should be to *Scan

2015-02-06 Thread Aniket Bhatnagar
Hi Spark SQL committers

I have started experimenting with data sources API and I was wondering if
it makes sense to move the method sizeInBytes from BaseRelation to Scan
interfaces. This is because that a relation may be able to leverage filter
push down to estimate size potentially making a very large relation
broadcast-able. Thoughts?

Aniket