Hi All,

Thanks for the discussion.

As @karuppayya’s PR recently got merged and it collects the NDV stats on a
table level I would like to revisit the partition stats vs table stats
discussion and raise a few points for discussion:

   1.

   The current action collects the NDV stats on a table level using an
   aggregate function, since this already requires reading the columns and
   processing the data tuple by tuple adding the rest of the stats should add
   negligible overhead and therefore we can change the action to compute all
   level stats instead of only the NDV stats (for example see here
   
<https://github.com/guykhazma/iceberg/blob/8acc252fa849c9a9cd9ee3482b0d51ffbb12e757/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java#L118>).
   min/max and null counts will be pushed down as @huaxingao already mentioned.
   2.

   It seems to me that the worry that was raised about combining theta
   sketches for different partitions is actually present in the current
   approach as well. Because the table level theta sketch is computed using an
   aggregate function it is affected by the number of tasks used when reading
   the table (which is affected by `read.split.target-size` parameter). Maybe
   we should consider forcing one task per partition?
   3.

   It is not clear to me why we are storing the sketch blob and not only
   the resulting NDV value. Assuming it remains at a table level what is the
   use case to keep the sketch? If we have a partition level it makes sense to
   use it during query time for queries which target specific partitions (even
   though the cost is not clear).
   4.

   The rest of the stats (min/max, null count, average len, max len) can be
   stored in a table level metadata and still be updated incrementally (for
   average we can store the sum and number of items) so maybe we can keep
   track of these stats both on the partition level (for more fine grained
   stats) and on the table level.

Thanks,
Guy



On 2024/08/07 16:23:17 Steven Wu wrote:

> I also like the middle ground of partition level stats, which is also

> easier to perform incremental refresh (at partition level). if the roll-up

> of partition level stats turned out to be slow, I don't mind adding table

> level stats aggregated from partition level stats. Having partition level

> stats first is a good foundation.

>

> For an unpartitioned table, it can probably be treated as a special single

> partition as someone was suggesting?

>

> On Wed, Aug 7, 2024 at 7:25 AM Manish Malhotra <

> manish.malhotra.w...@gmail.com> wrote:

>

> > First of all thanks a lot Huaxin for starting an important proposal and

> > thread!

> >

> > A lot of important points are already discussed.

> >

> > For me, my thoughts were also tilting towards the partition level stats,

> > what Piotr, Alex, Anton and a few others have mentioned as well.

> >

> > IMO, partition level stats might be a good middle ground.

> >

> > Plus before that, does it make sense, we can try to derive the stats (in

> > the current state) and see, does it adds a lot of time?

> >

> > Regards,

> > Manish

> >

> > On Wed, Aug 7, 2024 at 3:38 AM Piotr Findeisen <pi...@gmail.com>

> > wrote:

> >

> >> Hi All,

> >>

> >> Thank you for interesting discussion so far, and many view points
shared!

> >>

> >>

> >> > Not all tables have partition definition and table-level stats would

> >> benefit these tables

> >>

> >> Agreed that tables not always have partitions.

> >> Current partition stats are appropriate for partitioned tables only

> >> mainly because they contain information that's available in table
metadata

> >> anyway.

> >>

> >> If we evolve partition stats to also include column min/maxes, I would

> >> advocate for partition stats to be applicable to unpartitioned table as

> >> well, perhaps just by omitting the `1 partition` field in such case.

> >>

> >> The partition-level information has the important advantage that is

> >> allows a query engine to come up with best estimate for *current
query*,

> >> taking the query effective predicates into account and pruning the

> >> information.

> >> So sooner or later we will want to add min/max information to partition

> >> stats. (is it WIP already?)

> >> Since partition stats are Parquet, deriving table-level min/max

> >> information from partition-level min/max information is as simple as

> >> reading the Parquet file footer.

> >>

> >> > Given that we already maintain tabel-level NDV, it seems consistent
to

> >> include corresponding table-level stats for min/max values.

> >>

> >> That's a reasonable thinking and consistency is important.

> >> There is, however, an important difference between NDV and min/max. The

> >> table-level NDV cannot be easily aggregated from partition-level NDV,

> >> because it quickly turns out to be CPU intensive process.

> >> Keeping partition-level NDV information (number, and a sketch) is a
good

> >> idea, but it's suitable only for queries which read one to few
partitions.

> >> Queries reading large number of partitions would still prefer to use

> >> table-level NDV information (if available).

> >>

> >> Best

> >> Piotr

> >>

> >>

> >>

> >>

> >>

> >>

> >>

> >>

> >>

> >> On Wed, 7 Aug 2024 at 05:07, huaxin gao <hu...@gmail.com> wrote:

> >>

> >>> Thanks Alexander, Xianjin, Gang and Anton for your valuable insights!

> >>>

> >>> Regarding deriving min/max values on the fly, I currently don't have a

> >>> good algorithm. I rely on iterating through FileScanTask objects in
memory

> >>> to aggregate results, which leads me to favor pre calculating min/max

> >>> values.

> >>>

> >>> I have a slight preference for table-level stats over partition-level

> >>> stats. Given that we already maintain tabel-level NDV, it seems
consistent

> >>> to include corresponding table-level stats for min/max values. When

> >>> computing the table-level stats, if partition filters are applied, we
have

> >>> already filtered out unwanted partitions, which, in my view, does not
give

> >>> partition-level stats an advantage in this context. For

> >>> incremental updates, even with table level stats, I think we can
still have

> >>> a way to calculate only the new or updated partitions and then update
the

> >>> table level stats. However, I am open to other solutions.

> >>>

> >>> Thanks,

> >>> Huaxin

> >>>

> >>>

> >>> On Tue, Aug 6, 2024 at 10:40 AM Anton Okolnychyi <ao...@gmail.com>

> >>> wrote:

> >>>

> >>>> I'd like to entertain the idea of deriving min/max values on the fly
to

> >>>> understand our baseline. What will the algorithm for that look like?
I

> >>>> assume the naive approach will be to keep min/max stats for selected

> >>>> columns while planning (as opposed to discarding them upon
filtering) and

> >>>> then iterate through FileScanTask objects in memory to aggregate the

> >>>> results?

> >>>>

> >>>> - Anton

> >>>>

> >>>> вт, 6 серп. 2024 р. о 08:33 Gang Wu <us...@gmail.com> пише:

> >>>>

> >>>>> Just give my two cents. Not all tables have partition definition and

> >>>>> table-level stats would

> >>>>> benefit these tables. In addition, NDV might not be easily populated

> >>>>> from partition-level

> >>>>> statistics.

> >>>>>

> >>>>> Thanks,

> >>>>> Gang

> >>>>>

> >>>>> On Tue, Aug 6, 2024 at 9:48 PM Xianjin YE <xi...@apache.org> wrote:

> >>>>>

> >>>>>> Thanks for raising the discussion Huaxin.

> >>>>>>

> >>>>>> I also think partition-level statistics file(s) are more useful and

> >>>>>> has advantage over table-level stats. For instance:

> >>>>>> 1. It would be straight forward to support incremental stats

> >>>>>> computing for large tables: by recalculating new or updated
partitions only

> >>>>>> 2. Table level stats could be built from partition level stats

> >>>>>> easily. It might take some additional time to merge though.

> >>>>>> 3. SparkScan usually involve parts of partitions if `partition

> >>>>>> filter` is involved, it would be more accurate to infer the stats
from the

> >>>>>> selected partitions only

> >>>>>>

> >>>>>> On Aug 6, 2024, at 02:42, Alexander Jo <al...@starburstdata.com>

> >>>>>> wrote:

> >>>>>>

> >>>>>> Thanks for starting this thread Huaxin,

> >>>>>>

> >>>>>> The existing statistics, on a per data file basis, are definitely
too

> >>>>>> granular for use in planning/analysis time query optimizations.

> >>>>>> It's worked so far, as tables have been relatively small, but from

> >>>>>> what I've seen in the Trino community it is starting to be a
problem for

> >>>>>> some.

> >>>>>>

> >>>>>> However, I'm not sure that rolling the stats all the way up to the

> >>>>>> table level is what we want to add next.

> >>>>>> Would extending the partition stats to include some of the data

> >>>>>> present for data files be possible?

> >>>>>> To me, it seems like doing some amount of derivation at query time
is

> >>>>>> okay, as long as the time it takes to do the derivation doesn't
increase

> >>>>>> significantly as the table gets larger.

> >>>>>>

> >>>>>> Partition level stats also have the advantage of being able to

> >>>>>> provide more accurate estimates for queries with filters on the
partition

> >>>>>> columns.

> >>>>>>

> >>>>>> Looking forward to talking more about the project,

> >>>>>> Alex Jo

> >>>>>>

> >>>>>>

> >>>>>>

> >>>>>> On Fri, Aug 2, 2024 at 11:24 PM huaxin gao <hu...@gmail.com>

> >>>>>> wrote:

> >>>>>>

> >>>>>>> Thanks, Samrose and Piotr, for the discussion! This issue is not

> >>>>>>> addressed by the partition statistics feature. What we need are
table level

> >>>>>>> stats.

> >>>>>>>

> >>>>>>> Given that our primary goal in collecting statistics is for

> >>>>>>> performance optimization, I believe it's not a good approach to
derive

> >>>>>>> these statistics at query execution time. That's why I propose
saving these

> >>>>>>> metrics in the table-level stats file. I am thinking of reusing
the

> >>>>>>> existing aggregate pushdown mechanism to compute min, max and
null counts.

> >>>>>>> We currently have MinAggregate

> >>>>>>> <
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java>
,

> >>>>>>> MaxAggregate

> >>>>>>> <
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java>
,

> >>>>>>> CountStar

> >>>>>>> <
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/expressions/CountStar.java>
,

> >>>>>>> and CountNotNull

> >>>>>>> <
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java>
.

> >>>>>>> Null counts can be derived from CountStar and CountNotNull.

> >>>>>>>

> >>>>>>> Thanks,

> >>>>>>> Huaxin

> >>>>>>>

> >>>>>>> On Fri, Aug 2, 2024 at 1:45 PM Piotr Findeisen <

> >>>>>>> piotr.findei...@gmail.com> wrote:

> >>>>>>>

> >>>>>>>> Hi,

> >>>>>>>>

> >>>>>>>> First of all, thank you Huaxin for raising this topic. It's

> >>>>>>>> important for Spark, but also for Trino.

> >>>>>>>>

> >>>>>>>> Min, max, and null counts can be derived from manifests.

> >>>>>>>> I am not saying that a query engine should derive them from

> >>>>>>>> manifests at query time, but it definitely can.

> >>>>>>>> If we want to pull min, max, and null counts table-level summary

> >>>>>>>> into stats Puffin file (which probably makes sense),  my concern
would be

> >>>>>>>> what the exact mechanics for that should be. NDVs need to be
derived from

> >>>>>>>> the data files, but maybe we can do something smarter for min,
max, and

> >>>>>>>> null counts.

> >>>>>>>>

> >>>>>>>> Best,

> >>>>>>>> Piotr

> >>>>>>>>

> >>>>>>>>

> >>>>>>>>

> >>>>>>>> On Fri, 2 Aug 2024 at 20:47, Samrose Ahmed <sa...@gmail.com>

> >>>>>>>> wrote:

> >>>>>>>>

> >>>>>>>>> Isn't this addressed by the partition statistics feature, or do

> >>>>>>>>> you want to have one row for the entire table?

> >>>>>>>>>

> >>>>>>>>> On Fri, Aug 2, 2024, 10:47 AM huaxin gao <hu...@gmail.com>

> >>>>>>>>> wrote:

> >>>>>>>>>

> >>>>>>>>>> I would like to initiate a discussion on implementing a

> >>>>>>>>>> table-level statistics file to store column statistics,
specifically min,

> >>>>>>>>>> max, and null counts. The original discussion can be found in
this Slack

> >>>>>>>>>> thread:

> >>>>>>>>>>
https://apache-iceberg.slack.com/archives/C03LG1D563F/p1676395480005779

> >>>>>>>>>> .

> >>>>>>>>>>

> >>>>>>>>>> In Spark 3.4, I introduced Column Statistics

> >>>>>>>>>> <
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java#L33>

> >>>>>>>>>> within the Statistics interface

> >>>>>>>>>> <
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java#L38>
,

> >>>>>>>>>> enabling Iceberg to implement and report these metrics to
Spark. As a

> >>>>>>>>>> result, Spark can utilize the column statistics for cost-based
optimization

> >>>>>>>>>> (CBO), including join reordering.

> >>>>>>>>>>

> >>>>>>>>>> Here’s how the process operates:

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>>    - Use the ANALYZE TABLE command to compute column statistics

> >>>>>>>>>>    and store them appropriately. The SQL syntax is:

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>> ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS col1, col2…

> >>>>>>>>>>

> >>>>>>>>>> This command will trigger a ComputeTableStatsSparkAction to

> >>>>>>>>>> compute a data-sketch for NDV (number of distinct values) and
saves this in

> >>>>>>>>>> the StatisticsFile. Here is the ComputeTableStatsAction PR

> >>>>>>>>>> <https://github.com/apache/iceberg/pull/10288>

> >>>>>>>>>>

> >>>>>>>>>> Additionally, the ANALYZE TABLE command needs to calculate

> >>>>>>>>>> table-level min, max, and null counts for columns like col1
and col2 using

> >>>>>>>>>> the manifest files. These table-level statistics are then
saved. Currently,

> >>>>>>>>>> we do not have a designated location to store these
table-level column

> >>>>>>>>>> statistics. The StatisticsFile

> >>>>>>>>>> <
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/StatisticsFile.java>
,

> >>>>>>>>>> being tightly coupled with the Puffin file, does not seem to
be a suitable

> >>>>>>>>>> repository for these column stats.

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>>    - If Spark's CBO is on and

> >>>>>>>>>>    SparkSQLProperties.REPORT_COLUMN_STATS

> >>>>>>>>>>    <
https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java#L95>

> >>>>>>>>>>    is enabled, NDV will be retrieved from the StatisticsFile,
and the

> >>>>>>>>>>    min/max/null counts will also be retrieved from a
table-level column

> >>>>>>>>>>    statistics file. These statistics will be utilized to
construct the

> >>>>>>>>>>    Statistics object in estimateStatistics

> >>>>>>>>>>    <
https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java#L186>
.

> >>>>>>>>>>    Spark then employs the statistics returned from

> >>>>>>>>>>    estimateStatistics

> >>>>>>>>>>    <
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala#L86>

> >>>>>>>>>>    for cost-based optimization (CBO).

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>> Please share your thoughts on this.

> >>>>>>>>>>

> >>>>>>>>>> Thanks,

> >>>>>>>>>>

> >>>>>>>>>> Huaxin

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>>>>>

> >>>>>>

>

Reply via email to