alamb commented on code in PR #103:
URL: https://github.com/apache/datafusion-site/pull/103#discussion_r2337137648


##########
content/blog/2025-09-10-dynamic-filters.md:
##########
@@ -0,0 +1,649 @@
+---
+layout: post
+title: Dynamic Filters: Passing Information Between Operators During Execution 
for 10x Faster Queries
+date: 2025-09-10
+author: Adrian Garcia Badaracco (Pydantic), Andrew Lamb (InfluxData)
+categories: [features]
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+<!-- 
+diagrams source: 
https://docs.google.com/presentation/d/1FFYy27ydZdeFZWWuMjZGnYKUx9QNJfzuVLAH8AE5wlc/edit?slide=id.g364a74cba3d_0_92#slide=id.g364a74cba3d_0_92
+Intended Audience: Query engine / data systems developers who want to learn 
about topk optimization
+Goal: Introduce TopK and dynamic filters as general optimization techniques 
for query engines, and how they were used to improve performance in DataFusion.
+-->
+
+This blog post introduces the query engine optimization techniques called TopK
+and dynamic filters. We describe the motivating use case, how these
+optimizations work, and how we implemented them with the [Apache DataFusion]
+community to improve performance by an order of magnitude for some query
+patterns.
+
+[Apache DataFusion]: https://datafusion.apache.org/
+
+## Motivation and Results
+
+The main commercial product at [Pydantic], [Logfire], is an observability
+platform built on DataFusion. One of the most common workflows / queries is
+"show me the last K traces" which translates to a query similar to:
+
+[Pydantic]: https://pydantic.dev
+[Logfire]: https://pydantic.dev/logfire
+
+```sql
+SELECT * FROM records ORDER BY start_timestamp DESC LIMIT 1000;
+```
+
+We noticed this was *pretty slow*, even though DataFusion has long had the
+classic `TopK` optimization (described below). After implementing the dynamic
+filter techniques described in this blog, we saw performance improve *by over 
10x*
+for this query pattern, and are applying the optimization to other queries and
+operators as well.
+
+Let's look at some preliminary numbers, using [ClickBench] [Q23], which has 
+the same pattern as our motivating example:
+
+```sql
+SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
+```
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/execution-time.svg" 
+  width="80%" 
+  class="img-responsive" 
+  alt="Q23 Performance Improvement with Dynamic Filters and Late 
Materialization"
+/>
+</div>
+
+**Figure 1**: Execution times for ClickBench Q23 with and without dynamic
+filters (DF)<sup id="fn1">[1](#footnote1)</sup>, and late materialization
+(LM)<sup id="fn2">[2](#footnote2)</sup> for different partitions / core usage.
+Dynamic filters alone (yellow) and late materialization alone (red) show a 
large
+improvement over the baseline (blue). When both optimizations are enabled 
(green)
+performance improves by up to 22x. See the appendix for more measurement 
details.
+
+
+## Background: TopK and Dynamic Filters
+
+To explain how dynamic filters improve query performance, we first need to
+explain the so-called "TopK" optimization. To do so, we will use a simplified
+version of ClickBench Q23:
+
+```sql
+SELECT * 
+FROM hits 
+ORDER BY "EventTime"
+LIMIT 10
+```
+
+[Q23]: 
https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries/q23.sql
+[ClickBench]: https://benchmark.clickhouse.com/
+
+A straightforward, though slow, plan to answer this query is shown in Figure 2.
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-naive.png" 
+  width="80%" 
+  class="img-responsive" 
+  alt="Naive Query Plan"
+/>
+</div>
+
+**Figure 2**: Simple Query Plan for ClickBench Q23. Data flows in plans from 
the
+scan at the bottom to the limit at the top. This plan reads all 100M rows of 
the
+`hits` table, sorts them by `EventTime`, and then discards everything except 
the top 10 rows.
+
+This naive plan requires substantial effort as all columns from all rows are
+decoded and sorted, even though only 10 are returned. 
+
+High-performance query engines typically avoid the expensive full sort with a
+specialized operator that tracks the current top rows using a [heap], rather
+than sorting all the data. For example, this operator
+is called [TopK in DataFusion], [SortWithLimit in Snowflake], and [topn in
+DuckDB]. The plan for Q23 using this specialized operator is shown in Figure 3.
+
+[heap]: https://en.wikipedia.org/wiki/Heap_(data_structure)
+[TopK in DataFusion]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html
+[SortWithLimit in Snowflake]: 
https://docs.snowflake.com/en/user-guide/ui-snowsight-activity
+[topn in DuckDB]: https://duckdb.org/2024/10/25/topn.html#introduction-to-top-n
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-topk.png" 
+  width="80%" 
+  class="img-responsive" 
+  alt="TopK Query Plan"
+/>
+</div>
+
+**Figure 3**: Query plan for Q23 in DataFusion using the TopK operator. This
+plan still reads all 100M rows of the `hits` table, but instead of first 
sorting
+them all by `EventTime`, the TopK operator keeps track of the current top 10
+rows using a min/max heap. Credit to [Visualgo](https://visualgo.net/en) for 
the
+heap icon
+
+Figure 3 is better, but it still reads and decodes all 100M rows of the `hits` 
table,
+which is often unnecessary once we have found the top 10 rows. For example,
+while running the query, if the current top 10 rows all have `EventTime` in
+2025, then any subsequent rows with `EventTime` in 2024 or earlier can be
+skipped entirely without reading or decoding them. This technique is especially
+effective at skipping entire files or row groups if the top 10 values are in 
the
+first few files read, which is very common when the
+data insert order is approximately the same as the timestamp order.
+
+Leveraging this insight is the key idea behind dynamic filters, which introduce
+a runtime mechanism for the TopK operator to provide the current top values to
+the scan operator, allowing it to skip unnecessary rows, entire files, or 
portions
+of files. The plan for Q23 with dynamic filters is shown in Figure 4.
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-topk-dynamic-filters.png" 
+  width="100%" 
+  class="img-responsive" 
+  alt="TopK Query Plan with Dynamic Filters"
+/>
+</div>
+
+**Figure 4**: Query plan for Q23 in DataFusion with specialized TopK operator
+and dynamic filters. The TopK operator provides the minimum `EventTime` of the
+current top 10 rows to the scan operator, allowing it to skip rows with
+`EventTime` later than that value. The scan operator uses this dynamic filter
+to skip unnecessary files and rows, reducing the amount of data that needs to
+be read and processed.
+
+## Worked Example
+
+To make dynamic filters more concrete, here is a fully worked example. Imagine
+we have a table `records` with a column `start_timestamp` and we are running 
the
+motivating query:
+
+```sql
+SELECT * 
+FROM records 
+ORDER BY start_timestamp 
+DESC LIMIT 3;
+```
+
+In this example, at some point during execution, the heap in the `TopK` 
operator
+will contain the actual 3 most recent values, which might be:
+
+| start_timestamp          |
+|--------------------------|
+| 2025-08-16T20:35:15.00Z  |
+| 2025-08-16T20:35:14.00Z  |
+| 2025-08-16T20:35:13.00Z  |
+
+Since `2025-08-16T20:35:13.00Z` is the smallest of these values, we know that
+any subsequent rows with `start_timestamp` less than or equal to this value
+cannot possibly be in the top 3, and can be skipped entirely.
+This knowledge is encoded in a filter of the form `start_timestamp >
+'2025-08-16T20:35:13.00Z'`. If we knew the correct timestamp value before
+starting the plan, we could simply write:
+
+```sql
+SELECT *
+FROM records
+WHERE start_timestamp > '2025-08-16T20:35:13.00Z'  -- Filter to skip rows
+ORDER BY start_timestamp DESC
+LIMIT 3;
+```
+
+And DataFusion's existing hierarchical pruning (described in [this blog]) would
+skip reading unnecessary files and row groups, and only decode
+the necessary rows.
+
+[this blog]: 
https://datafusion.apache.org/blog/2025/08/15/external-parquet-indexes/
+
+However, obviously when we start running the query we don't have the value
+`'2025-08-16T20:35:13.00Z'`, so what DataFusion now does is put a dynamic 
filter
+into the plan instead, which you can think of as a function call like
+`dynamic_filter()`, something like this:
+
+```sql
+SELECT *
+FROM records
+WHERE dynamic_filter() -- Updated during execution as we know more
+ORDER BY start_timestamp DESC
+LIMIT 3;
+```
+
+In this case, `dynamic_filter()` initially has the value `true` (passes all
+rows) but will be progressively updated by the TopK operator as the query
+progresses to filter more and more rows. Note that while we are using SQL for
+illustrative purposes in this example, these optimizations are done at the
+physical plan ([ExecutionPlan]) level — and they apply equally to SQL, 
DataFrame
+APIs, and custom query languages built with DataFusion.
+
+[ExecutionPlan]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
+
+## TopK + Dynamic Filters
+
+As mentioned above, DataFusion has a specialized sort operator named [TopK] 
that
+only keeps `K` rows in memory. For a `DESC` sort order, each new input batch is
+compared against the current `K` largest values, and then the current `K` rows
+possibly get replaced with any new input rows that are larger. The [code is
+here].
+
+[TopK]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html
+[code is here]: 
https://github.com/apache/datafusion/blob/b4a8b5ae54d939353b7cbd5ab8aee7d3bedecb66/datafusion/physical-plan/src/topk/mod.rs
+
+Prior to dynamic filters, DataFusion had no early termination: it would read 
the
+*entire* `records` table even if it already had the top `K` rows because it
+still had to check that there were no rows that had larger `start_timestamp`.
+You can see how this is a problem if you have 2 years' worth of time-series 
data
+and the largest `1000` values of `start_timestamp` are likely within the first
+few files read. Even once the `TopK` operator has seen 1000 timestamps (e.g. on
+August 16th, 2025), DataFusion would still read all remaining files (e.g. even
+those that contain data only from 2024) just to make sure.
+
+InfluxData [optimized a similar query pattern in InfluxDB IOx] using another
+operator called `ProgressiveEvalExec`. However, `ProgressiveEvalExec` requires 
that the data
+is already sorted and a careful analysis of ordering to prove that it can be
+used and still produce correct results. That is not the case for Logfire data 
(and many other datasets):
+data tends to be *roughly* sorted (e.g. if you append to files as you receive
+it) but that does not guarantee that it is fully sorted, either within or 
between
+files. 
+
+We [discussed possible solutions] with the community, and ultimately decided to
+implement generic "dynamic filters", which are general enough to be used in
+joins as well (see next section). Our implementation appears very similar to
+recently announced optimizations in closed-source, commercial systems such as
+[Accelerating TopK Queries in Snowflake], or [self-sharpening runtime filters 
in
+Alibaba Cloud's PolarDB], and we are excited that we can offer similar features
+in an open source query engine like DataFusion.
+
+[optimized a similar query pattern in InfluxDB IOx]:  
https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/
+[discussed possible solutions]: 
https://github.com/apache/datafusion/issues/15037
+[Accelerating TopK Queries in Snowflake]: 
https://program.berlinbuzzwords.de/bbuzz24/talk/3DTQJB/
+[self-sharpening runtime filters in Alibaba Cloud's PolarDB]: 
https://www.alibabacloud.com/blog/about-database-kernel-%7C-learn-about-polardb-imci-optimization-techniques_600274
+
+At the query plan level, Q23 looks like this before it is executed:
+
+```text
+┌───────────────────────────┐
+│       SortExec(TopK)      │
+│    --------------------   │
+│ EventTime@4 ASC NULLS LAST│
+│                           │
+│         limit: 10         │
+└─────────────┬─────────────┘
+┌─────────────┴─────────────┐
+│       DataSourceExec      │
+│    --------------------   │
+│         files: 100        │
+│      format: parquet      │
+│                           │
+│         predicate:        │
+│ CAST(URL AS Utf8View) LIKE│
+│      %google% AND true    │
+└───────────────────────────┘
+```
+
+**Figure 5**: Physical plan for ClickBench Q23 prior to execution. The dynamic
+filter is shown as `true` in the `predicate` field of the `DataSourceExec`
+operator.
+
+The dynamic filter is updated by the `SortExec(TopK)` operator during execution
+as shown in Figure 6.
+
+```text
+┌───────────────────────────┐
+│       SortExec(TopK)      │
+│    --------------------   │
+│ EventTime@4 ASC NULLS LAST│
+│                           │
+│         limit: 10         │
+└─────────────┬─────────────┘
+┌─────────────┴─────────────┐
+│       DataSourceExec      │
+│    --------------------   │
+│         files: 100        │
+│      format: parquet      │
+│                           │
+│         predicate:        │
+│ CAST(URL AS Utf8View) LIKE│
+│      %google% AND         │
+│ EventTime < 1372713773.0  │
+└───────────────────────────┘
+```
+**Figure 6**: Physical plan for ClickBench Q23 after execution. The dynamic 
filter has been
+updated to `EventTime < 1372713773.0`, which allows the `DataSourceExec` 
operator to skip
+files and rows that do not match the filter.
+
+## Hash Join + Dynamic Filters
+
+We spent significant effort to make dynamic filters a general-purpose
+optimization (see the Extensibility section below for more details). Instead of
+a one-off optimization for TopK queries, we created a general mechanism for
+passing information between operators during execution that can be used in 
multiple contexts. 
+We have already used the dynamic filter infrastructure to
+improve hash joins by implementing a technique called [sideways information
+passing], which is similar to [Bloom filter joins] in Apache Spark. See 
+[issue #7955] for more details.
+
+[sideways information passing]: 
https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
+[Bloom filter joins]: https://issues.apache.org/jira/browse/SPARK-32268
+[issue #7955]: https://github.com/apache/datafusion/issues/7955
+
+In a Hash Join, the query engine picks one input of the join to be the "build"
+input and the other input to be the "probe" side.
+
+* First, the **build side** is loaded into memory, and turned into a hash 
table.
+
+* Then, the **probe side** is scanned, and matching rows are found by looking 
+  in the hash table. Non-matching rows are discarded and thus joins often act 
as
+  filters.
+
+Many hash joins are very selective (only a small number of rows are matched), 
so

Review Comment:
   I tried to clarify in a4a779c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to