This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bc600b3090 Split `push_down_filter.slt` into standalone sqllogictest 
files to reduce long-tail runtime (#20566)
bc600b3090 is described below

commit bc600b3090d783a20eef0760098ddfa9d6ec8595
Author: kosiew <[email protected]>
AuthorDate: Fri Feb 27 16:36:31 2026 +0800

    Split `push_down_filter.slt` into standalone sqllogictest files to reduce 
long-tail runtime (#20566)
    
    ## Which issue does this PR close?
    
    * Part of #20524
    
    ## Rationale for this change
    
    `datafusion/sqllogictest/test_files/push_down_filter.slt` had grown into
    a large sqllogictest file. Since the sqllogictest runner parallelizes at
    **file granularity**, a single heavyweight file can become a straggler
    and dominate wall-clock time.
    
    This PR performs a non-invasive split of that file into smaller,
    self-contained `.slt` files so the runner can distribute work more
    evenly across threads, improving overall suite balance without changing
    SQL semantics or test coverage.
    
    ## What changes are included in this PR?
    
    * Removed the monolithic `push_down_filter.slt`.
    * Added new standalone sqllogictest files, each with the minimal
    setup/teardown required to run independently:
    
    * `push_down_filter_unnest.slt` — unnest filter pushdown coverage
    (including struct/field cases).
    * `push_down_filter_parquet.slt` — parquet filter pushdown + limit +
    cast predicate behavior + dynamic filter pushdown (swapped join inputs).
    * `push_down_filter_outer_joins.slt` — LEFT/RIGHT join and anti-join
    logical filter pushdown checks.
    * `push_down_filter_regression.slt` — regression coverage for issues
    #17188 and #17512, plus aggregate dynamic filter pushdown checks.
    * Updated scratch output paths to be file-scoped (e.g.
    `test_files/scratch/push_down_filter_parquet/...`) to reduce the chance
    of conflicts when tests execute in parallel.
    * Preserved all original query expectations and explain-plan assertions;
    changes are organizational only.
    
    ## Are these changes tested?
    
    Yes, with a python script to compare text blocks in the new slt files vs
    old single slt file.
    
    ```
    python - <<'PY'
    import subprocess, re
    from collections import defaultdict, deque
    
    repo = '.'
    old_spec = 
'692a7cb67^:datafusion/sqllogictest/test_files/push_down_filter.slt'
    new_specs = [
      
'HEAD:datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt',
      'HEAD:datafusion/sqllogictest/test_files/push_down_filter_parquet.slt',
      'HEAD:datafusion/sqllogictest/test_files/push_down_filter_regression.slt',
      'HEAD:datafusion/sqllogictest/test_files/push_down_filter_unnest.slt',
    ]
    
    def git_show(spec):
      return subprocess.check_output(['git', '-C', repo, 'show', spec], 
text=True)
    
    def normalize_sql(sql):
      s = sql.strip().lower()
      s = re.sub(
        r"test_files/scratch/push_down_filter(?:_[^'\s;)/]+)?[^'\s;)]*",
        "test_files/scratch/__norm__",
        s,
      )
      s = re.sub(r'\s+', ' ', s)
      return s
    
    def blocks(text):
      lines = text.splitlines()
      out = []
      i = 0
      while i < len(lines):
        m = lines[i].strip()
        if m.startswith('query '):
          i += 1
          b = []
          while i < len(lines) and lines[i].strip() != '----':
            if not lines[i].lstrip().startswith('#'):
              b.append(lines[i])
            i += 1
          sql = '\n'.join(b).strip()
          if sql:
            out.append(('query', normalize_sql(sql)))
        elif m.startswith('statement '):
          i += 1
          b = []
          while i < len(lines):
            s = lines[i].strip()
            if s == '':
              break
            if s.startswith('query ') or s.startswith('statement '):
              i -= 1
              break
            if not lines[i].lstrip().startswith('#'):
              b.append(lines[i])
            i += 1
          sql = '\n'.join(b).strip()
          if sql:
            out.append(('statement', normalize_sql(sql)))
        i += 1
      return out
    
    old_blocks = blocks(git_show(old_spec))
    new_blocks = []
    for s in new_specs:
      new_blocks.extend(blocks(git_show(s)))
    
    q = defaultdict(deque)
    for item in new_blocks:
      q[item].append(item)
    
    missing = 0
    extra = 0
    for item in old_blocks:
      if q[item]:
        q[item].popleft()
      else:
        missing += 1
    
    for v in q.values():
      extra += len(v)
    
    print(f'old_blocks={len(old_blocks)}')
    print(f'new_blocks={len(new_blocks)}')
    print(f'missing={missing}')
    print(f'extra={extra}')
    print(f'baseline={old_spec}')
    if missing != 0:
      raise SystemExit(1)
    PY
    ```
    
    Output:
    ```
    old_blocks=107
    new_blocks=108
    missing=0
    extra=1
    ```
    
    The extra(1) is this statement block:
    
    set datafusion.explain.physical_plan_only = true;
    Why it shows as extra:
    
    In split files, it appears 3 times:
    push_down_filter_parquet.slt:21
    push_down_filter_unnest.slt:21
    push_down_filter_regression.slt:129
    In the baseline monolithic file at e937cadbc^, it appears 2 times.
    So comparison reports 3 - 2 = extra 1.
    
    ## Are there any user-facing changes?
    
    No user-facing behavior changes. This is a test-suite
    organization/performance improvement only.
    
    ## Note before merging
    
    Revert e8369bb (it is a commit to trigger the CI extented tests for
    sqllogictest)
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and tested.
---
 .../sqllogictest/test_files/push_down_filter.slt   | 745 ---------------------
 .../test_files/push_down_filter_outer_joins.slt    | 264 ++++++++
 .../test_files/push_down_filter_parquet.slt        | 188 ++++++
 .../test_files/push_down_filter_regression.slt     | 200 ++++++
 .../test_files/push_down_filter_unnest.slt         | 148 ++++
 5 files changed, 800 insertions(+), 745 deletions(-)

diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt 
b/datafusion/sqllogictest/test_files/push_down_filter.slt
deleted file mode 100644
index edafcfaa54..0000000000
--- a/datafusion/sqllogictest/test_files/push_down_filter.slt
+++ /dev/null
@@ -1,745 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-
-#   http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Test push down filter
-
-statement ok
-set datafusion.explain.physical_plan_only = true;
-
-statement ok
-CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
-
-query I
-select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 
= 2;
-----
-3
-4
-5
-
-# test push down filter for unnest with filter on non-unnest column
-# filter plan is pushed down into projection plan
-query TT
-explain select uc2 from (select unnest(column2) as uc2, column1 from v) where 
column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
-02)--UnnestExec
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
-05)--------FilterExec: column1@0 = 2, projection=[column2@1]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query I
-select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
-----
-4
-5
-
-# test push down filter for unnest with filter on unnest column
-query TT
-explain select uc2 from (select unnest(column2) as uc2, column1 from v) where 
uc2 > 3;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------UnnestExec
-05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query II
-select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) 
where uc2 > 3 AND column1 = 2;
-----
-4 2
-5 2
-
-# Could push the filter (column1 = 2) down below unnest
-query TT
-explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from 
v) where uc2 > 3 AND column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, 
column1@1 as column1]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
-03)----UnnestExec
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), 
column1@0 as column1]
-06)----------FilterExec: column1@0 = 2
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query II
-select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) 
where uc2 > 3 OR column1 = 2;
-----
-3 2
-4 2
-5 2
-
-# only non-unnest filter in AND clause could be pushed down
-query TT
-explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from 
v) where uc2 > 3 OR column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, 
column1@1 as column1]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------UnnestExec
-05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), 
column1@0 as column1]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table v;
-
-# test with unnest struct, should not push down filter
-statement ok
-CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 
2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
-
-query I?
-select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
-----
-1 {a: 1, b: 2}
-
-query TT
-explain select * from (select column1, unnest(column2) as o from d) where 
o['a'] = 1;
-----
-physical_plan
-01)ProjectionExec: expr=[column1@0 as column1, 
__unnest_placeholder(d.column2,depth=1)@1 as o]
-02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, 
__unnest_placeholder(d.column2,depth=1)@2]
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------ProjectionExec: 
expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as 
__datafusion_extracted_1, column1@0 as column1, 
__unnest_placeholder(d.column2,depth=1)@1 as 
__unnest_placeholder(d.column2,depth=1)]
-05)--------UnnestExec
-06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as 
__unnest_placeholder(d.column2)]
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table d;
-
-statement ok
-CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 
'b', 4)), (named_struct('a', 5, 'b', 6));
-
-query II
-select * from (select unnest(column1) from d) where 
"__unnest_placeholder(d.column1).b" > 5;
-----
-5 6
-
-query TT
-explain select * from (select unnest(column1) from d) where 
"__unnest_placeholder(d.column1).b" > 5;
-----
-physical_plan
-01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
-02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-03)----UnnestExec
-04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
-05)--------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table d;
-
-# Test push down filter with limit for parquet
-statement ok
-set datafusion.execution.parquet.pushdown_filters = true;
-
-# this one is also required to make DF skip second file due to "sufficient" 
amount of rows
-statement ok
-set datafusion.execution.collect_statistics = true;
-
-# Create a table as a data source
-statement ok
-CREATE TABLE src_table (
-    part_key INT,
-    value INT
-) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), 
(3, 5), (3, 6);
-
-
-# There will be more than 2 records filtered from the table to check that 
`limit 1` actually applied.
-# Setup 3 files, i.e., as many as there are partitions:
-
-# File 1:
-query I
-COPY (SELECT * FROM src_table where part_key = 1)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet'
-STORED AS PARQUET;
-----
-3
-
-# File 2:
-query I
-COPY (SELECT * FROM src_table where part_key = 2)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet'
-STORED AS PARQUET;
-----
-4
-
-# File 3:
-query I
-COPY (SELECT * FROM src_table where part_key = 3)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet'
-STORED AS PARQUET;
-----
-3
-
-statement ok
-CREATE EXTERNAL TABLE test_filter_with_limit
-(
-  part_key INT,
-  value INT
-)
-STORED AS PARQUET
-LOCATION 'test_files/scratch/push_down_filter/test_filter_with_limit/';
-
-query TT
-explain select * from test_filter_with_limit where value = 2 limit 1;
-----
-physical_plan
-01)CoalescePartitionsExec: fetch=1
-02)--DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet]]},
 projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 
2, pruning_predicat [...]
-
-query II
-select * from test_filter_with_limit where value = 2 limit 1;
-----
-2 2
-
-
-# Tear down test_filter_with_limit table:
-statement ok
-DROP TABLE test_filter_with_limit;
-
-# Tear down src_table table:
-statement ok
-DROP TABLE src_table;
-
-
-query I
-COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
-TO 'test_files/scratch/push_down_filter/t.parquet'
-STORED AS PARQUET;
-----
-10
-
-statement ok
-CREATE EXTERNAL TABLE t
-(
-  a INT
-)
-STORED AS PARQUET
-LOCATION 'test_files/scratch/push_down_filter/t.parquet';
-
-
-# The predicate should not have a column cast  when the value is a valid i32
-query TT
-explain select a from t where a = '100';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 = 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= 
a_max@1, required_guarantees=[a in (100)]
-
-# The predicate should not have a column cast  when the value is a valid i32
-query TT
-explain select a from t where a != '100';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 != 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != 
a_max@1), required_guarantees=[a not in (100)]
-
-# The predicate should still have the column cast when the value is a NOT 
valid i32
-query TT
-explain select a from t where a = '99999999999';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
-
-# The predicate should still have the column cast when the value is a NOT 
valid i32
-query TT
-explain select a from t where a = '99.99';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
-
-# The predicate should still have the column cast when the value is a NOT 
valid i32
-query TT
-explain select a from t where a = '';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 
-
-# The predicate should not have a column cast when the operator is = or != and 
the literal can be round-trip casted without losing information.
-query TT
-explain select a from t where cast(a as string) = '100';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 = 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= 
a_max@1, required_guarantees=[a in (100)]
-
-# The predicate should still have the column cast when the literal alters its 
string representation after round-trip casting (leading zero lost).
-query TT
-explain select a from t where CAST(a AS string) = '0123';
-----
-physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
-
-
-# Test dynamic filter pushdown with swapped join inputs (issue #17196)
-# Create tables with different sizes to force join input swapping
-statement ok
-copy (select i as k from generate_series(1, 100) t(i)) to 
'test_files/scratch/push_down_filter/small_table.parquet';
-
-statement ok
-copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 
'test_files/scratch/push_down_filter/large_table.parquet';
-
-statement ok
-create external table small_table stored as parquet location 
'test_files/scratch/push_down_filter/small_table.parquet';
-
-statement ok
-create external table large_table stored as parquet location 
'test_files/scratch/push_down_filter/large_table.parquet';
-
-# Test that dynamic filter is applied to the correct table after join input 
swapping
-# The small_table should be the build side, large_table should be the probe 
side with dynamic filter
-query TT
-explain select * from small_table join large_table on small_table.k = 
large_table.k where large_table.v >= 50;
-----
-physical_plan
-01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
 projection=[k], file_type=parquet
-03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
 projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ 
empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, 
required_guarantees=[]
-
-statement ok
-drop table small_table;
-
-statement ok
-drop table large_table;
-
-statement ok
-drop table t;
-
-# Regression test for https://github.com/apache/datafusion/issues/17188
-query I
-COPY (select i as k from generate_series(1, 10000000) as t(i))
-TO 'test_files/scratch/push_down_filter/t1.parquet'
-STORED AS PARQUET;
-----
-10000000
-
-query I
-COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
-TO 'test_files/scratch/push_down_filter/t2.parquet'
-STORED AS PARQUET;
-----
-10000000
-
-statement ok
-create external table t1 stored as parquet location 
'test_files/scratch/push_down_filter/t1.parquet';
-
-statement ok
-create external table t2 stored as parquet location 
'test_files/scratch/push_down_filter/t2.parquet';
-
-# The failure before https://github.com/apache/datafusion/pull/17197 was 
non-deterministic and random
-# So we'll run the same query a couple of times just to have more certainty 
it's fixed
-# Sorry about the spam in this slt test...
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-# Regression test for https://github.com/apache/datafusion/issues/17512
-
-query I
-COPY (
-    SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 
'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
-)
-TO 'test_files/scratch/push_down_filter/17512.parquet'
-STORED AS PARQUET;
-----
-1
-
-statement ok
-CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 
'test_files/scratch/push_down_filter/17512.parquet';
-
-query I
-SELECT 1
-FROM (
-    SELECT start_timestamp
-    FROM records
-    WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
-) AS t
-WHERE t.start_timestamp::time < '00:00:01'::time;
-----
-1
-
-# Test aggregate dynamic filter pushdown
-# Note: most of the test coverage lives in 
`datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
-# , to compare dynamic filter content easier. Here the tests are simple 
end-to-end
-# exercises.
-
-statement ok
-set datafusion.explain.format = 'indent';
-
-statement ok
-set datafusion.explain.physical_plan_only = true;
-
-statement ok
-set datafusion.execution.target_partitions = 2;
-
-statement ok
-set datafusion.execution.parquet.pushdown_filters = true;
-
-statement ok
-set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
-
-statement ok
-set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
-
-statement ok
-create external table agg_dyn_test stored as parquet location 
'../core/tests/data/test_statistics_per_partition';
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id) from agg_dyn_test where id > 1;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
-
-query I
-select max(id) from agg_dyn_test where id > 1;
-----
-4
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id) from agg_dyn_test where (id+1) > 1;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id), min(id) from agg_dyn_test where id < 10;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), 
min(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), 
min(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
-
-# Dynamic filter should not be available for grouping sets
-query TT
-explain select max(id) from agg_dyn_test where id < 10
-group by grouping sets ((), (id))
-----
-physical_plan
-01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
-02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as 
__grouping_id], aggr=[max(agg_dyn_test.id)]
-03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), 
input_partitions=2
-04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], 
aggr=[max(agg_dyn_test.id)]
-05)--------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQ
 [...]
-
-statement ok
-drop table agg_dyn_test;
-
-statement ok
-drop table t1;
-
-statement ok
-drop table t2;
-
-
-
-# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
-
-statement ok
-create table t1(k int, v int);
-
-statement ok
-create table t2(k int, v int);
-
-statement ok
-insert into t1 values
-  (1, 10),
-  (2, 20),
-  (3, 30),
-  (null, 40),
-  (50, null),
-  (null, null);
-
-statement ok
-insert into t2 values
-  (1, 11),
-  (2, 21),
-  (2, 22),
-  (null, 41),
-  (51, null),
-  (null, null);
-
-statement ok
-set datafusion.explain.physical_plan_only = false;
-
-statement ok
-set datafusion.explain.logical_plan_only = true;
-
-
-# left join + filter on join key -> pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
-----
-2 20 2 21
-2 20 2 22
-3 30 NULL NULL
-50 NULL NULL NULL
-
-# left join + filter on another column -> not pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
-----
-1 10 1 11
-2 20 2 21
-2 20 2 22
-3 30 NULL NULL
-NULL 40 NULL NULL
-
-# left join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 
20;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-3 30 NULL NULL
-50 NULL NULL NULL
-NULL 40 NULL NULL
-
-
-# right join + filter on join key -> pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
-----
-2 20 2 21
-2 20 2 22
-
-# right join + filter on another column -> not pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
-----
-1 10 1 11
-2 20 2 21
-2 20 2 22
-
-# right join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 
20;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-
-
-# left anti join + filter on join key -> pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
-----
-3 30
-50 NULL
-
-# left anti join + filter on another column -> not pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
-----
-3 30
-NULL 40
-
-# left anti join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or 
t1.v > 20;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-3 30
-50 NULL
-NULL 40
-
-
-# right anti join + filter on join key -> pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
-----
-51 NULL
-
-# right anti join + filter on another column -> not pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--TableScan: t1 projection=[k]
-03)--Filter: t2.v > Int32(1)
-04)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
-----
-NULL 41
-
-# right anti join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or 
t2.v > 20;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--TableScan: t1 projection=[k]
-03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
-04)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
-----
-51 NULL
-NULL 41
-
-
-statement ok
-set datafusion.explain.logical_plan_only = false;
-
-statement ok
-drop table t1;
-
-statement ok
-drop table t2;
diff --git 
a/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt 
b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt
new file mode 100644
index 0000000000..2e5f7c317f
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
+
+statement ok
+create table t1(k int, v int);
+
+statement ok
+create table t2(k int, v int);
+
+statement ok
+insert into t1 values
+  (1, 10),
+  (2, 20),
+  (3, 30),
+  (null, 40),
+  (50, null),
+  (null, null);
+
+statement ok
+insert into t2 values
+  (1, 11),
+  (2, 21),
+  (2, 22),
+  (null, 41),
+  (51, null),
+  (null, null);
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+
+# left join + filter on join key -> pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
+----
+2 20 2 21
+2 20 2 22
+3 30 NULL NULL
+50 NULL NULL NULL
+
+# left join + filter on another column -> not pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
+----
+1 10 1 11
+2 20 2 21
+2 20 2 22
+3 30 NULL NULL
+NULL 40 NULL NULL
+
+# left join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 
20;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+3 30 NULL NULL
+50 NULL NULL NULL
+NULL 40 NULL NULL
+
+
+# right join + filter on join key -> pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
+----
+2 20 2 21
+2 20 2 22
+
+# right join + filter on another column -> not pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
+----
+1 10 1 11
+2 20 2 21
+2 20 2 22
+
+# right join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 
20;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+
+
+# left anti join + filter on join key -> pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
+----
+3 30
+50 NULL
+
+# left anti join + filter on another column -> not pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
+----
+3 30
+NULL 40
+
+# left anti join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or 
t1.v > 20;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+3 30
+50 NULL
+NULL 40
+
+
+# right anti join + filter on join key -> pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
+----
+51 NULL
+
+# right anti join + filter on another column -> not pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--TableScan: t1 projection=[k]
+03)--Filter: t2.v > Int32(1)
+04)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
+----
+NULL 41
+
+# right anti join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or 
t2.v > 20;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--TableScan: t1 projection=[k]
+03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
+04)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
+----
+51 NULL
+NULL 41
+
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt 
b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
new file mode 100644
index 0000000000..e1c83c8c33
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# Test push down filter with limit for parquet
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+# this one is also required to make DF skip second file due to "sufficient" 
amount of rows
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+    part_key INT,
+    value INT
+) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), 
(3, 5), (3, 6);
+
+
+# There will be more than 2 records filtered from the table to check that 
`limit 1` actually applied.
+# Setup 3 files, i.e., as many as there are partitions:
+
+# File 1:
+query I
+COPY (SELECT * FROM src_table where part_key = 1)
+TO 
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet'
+STORED AS PARQUET;
+----
+3
+
+# File 2:
+query I
+COPY (SELECT * FROM src_table where part_key = 2)
+TO 
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet'
+STORED AS PARQUET;
+----
+4
+
+# File 3:
+query I
+COPY (SELECT * FROM src_table where part_key = 3)
+TO 
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet'
+STORED AS PARQUET;
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE test_filter_with_limit
+(
+  part_key INT,
+  value INT
+)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/';
+
+query TT
+explain select * from test_filter_with_limit where value = 2 limit 1;
+----
+physical_plan
+01)CoalescePartitionsExec: fetch=1
+02)--DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet]]},
 projection=[part_key, value], limit=1, file_type=parquet, predicate=value [...]
+
+query II
+select * from test_filter_with_limit where value = 2 limit 1;
+----
+2 2
+
+
+# Tear down test_filter_with_limit table:
+statement ok
+DROP TABLE test_filter_with_limit;
+
+# Tear down src_table table:
+statement ok
+DROP TABLE src_table;
+
+
+query I
+COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
+TO 'test_files/scratch/push_down_filter_parquet/t.parquet'
+STORED AS PARQUET;
+----
+10
+
+statement ok
+CREATE EXTERNAL TABLE t
+(
+  a INT
+)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/push_down_filter_parquet/t.parquet';
+
+
+# The predicate should not have a column cast  when the value is a valid i32
+query TT
+explain select a from t where a = '100';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 = 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= 
a_max@1, required_guarantees=[a in (100)]
+
+# The predicate should not have a column cast  when the value is a valid i32
+query TT
+explain select a from t where a != '100';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 != 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != 
a_max@1), required_guarantees=[a not in (100)]
+
+# The predicate should still have the column cast when the value is a NOT 
valid i32
+query TT
+explain select a from t where a = '99999999999';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
+
+# The predicate should still have the column cast when the value is a NOT 
valid i32
+query TT
+explain select a from t where a = '99.99';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
+
+# The predicate should still have the column cast when the value is a NOT 
valid i32
+query TT
+explain select a from t where a = '';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 
+
+# The predicate should not have a column cast when the operator is = or != and 
the literal can be round-trip casted without losing information.
+query TT
+explain select a from t where cast(a as string) = '100';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=a@0 = 100, 
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= 
a_max@1, required_guarantees=[a in (100)]
+
+# The predicate should still have the column cast when the literal alters its 
string representation after round-trip casting (leading zero lost).
+query TT
+explain select a from t where CAST(a AS string) = '0123';
+----
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
+
+
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to 
'test_files/scratch/push_down_filter_parquet/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 
'test_files/scratch/push_down_filter_parquet/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location 
'test_files/scratch/push_down_filter_parquet/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location 
'test_files/scratch/push_down_filter_parquet/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input 
swapping
+# The small_table should be the build side, large_table should be the probe 
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k = 
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]},
 projection=[k], file_type=parquet
+03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]},
 projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ 
empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, 
required_guarantees=[]
+
+statement ok
+drop table small_table;
+
+statement ok
+drop table large_table;
+
+statement ok
+drop table t;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt 
b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
new file mode 100644
index 0000000000..ca4a30fa96
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+# Regression test for https://github.com/apache/datafusion/issues/17188
+query I
+COPY (select i as k from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter_regression/t1.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+query I
+COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter_regression/t2.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+statement ok
+create external table t1 stored as parquet location 
'test_files/scratch/push_down_filter_regression/t1.parquet';
+
+statement ok
+create external table t2 stored as parquet location 
'test_files/scratch/push_down_filter_regression/t2.parquet';
+
+# The failure before https://github.com/apache/datafusion/pull/17197 was 
non-deterministic and random
+# So we'll run the same query a couple of times just to have more certainty 
it's fixed
+# Sorry about the spam in this slt test...
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+# Regression test for https://github.com/apache/datafusion/issues/17512
+
+query I
+COPY (
+    SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 
'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
+)
+TO 'test_files/scratch/push_down_filter_regression/17512.parquet'
+STORED AS PARQUET;
+----
+1
+
+statement ok
+CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 
'test_files/scratch/push_down_filter_regression/17512.parquet';
+
+query I
+SELECT 1
+FROM (
+    SELECT start_timestamp
+    FROM records
+    WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
+) AS t
+WHERE t.start_timestamp::time < '00:00:01'::time;
+----
+1
+
+# Test aggregate dynamic filter pushdown
+# Note: most of the test coverage lives in 
`datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
+# , to compare dynamic filter content easier. Here the tests are simple 
end-to-end
+# exercises.
+
+statement ok
+set datafusion.explain.format = 'indent';
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
+
+statement ok
+set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
+
+statement ok
+create external table agg_dyn_test stored as parquet location 
'../core/tests/data/test_statistics_per_partition';
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id) from agg_dyn_test where id > 1;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
+
+query I
+select max(id) from agg_dyn_test where id > 1;
+----
+4
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id) from agg_dyn_test where (id+1) > 1;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id), min(id) from agg_dyn_test where id < 10;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), 
min(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), 
min(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
 [...]
+
+# Dynamic filter should not be available for grouping sets
+query TT
+explain select max(id) from agg_dyn_test where id < 10
+group by grouping sets ((), (id))
+----
+physical_plan
+01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as 
__grouping_id], aggr=[max(agg_dyn_test.id)]
+03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), 
input_partitions=2
+04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], 
aggr=[max(agg_dyn_test.id)]
+05)--------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQ
 [...]
+
+statement ok
+drop table agg_dyn_test;
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt 
b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
new file mode 100644
index 0000000000..58fe24e2e2
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
+
+query I
+select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 
= 2;
+----
+3
+4
+5
+
+# test push down filter for unnest with filter on non-unnest column
+# filter plan is pushed down into projection plan
+query TT
+explain select uc2 from (select unnest(column2) as uc2, column1 from v) where 
column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--UnnestExec
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
+05)--------FilterExec: column1@0 = 2, projection=[column2@1]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I
+select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
+----
+4
+5
+
+# test push down filter for unnest with filter on unnest column
+query TT
+explain select uc2 from (select unnest(column2) as uc2, column1 from v) where 
uc2 > 3;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------UnnestExec
+05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) 
where uc2 > 3 AND column1 = 2;
+----
+4 2
+5 2
+
+# Could push the filter (column1 = 2) down below unnest
+query TT
+explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from 
v) where uc2 > 3 AND column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, 
column1@1 as column1]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+03)----UnnestExec
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), 
column1@0 as column1]
+06)----------FilterExec: column1@0 = 2
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+select uc2, column1 from  (select unnest(column2) as uc2, column1 from v) 
where uc2 > 3 OR column1 = 2;
+----
+3 2
+4 2
+5 2
+
+# only non-unnest filter in AND clause could be pushed down
+query TT
+explain select uc2, column1 from  (select unnest(column2) as uc2, column1 from 
v) where uc2 > 3 OR column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, 
column1@1 as column1]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------UnnestExec
+05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), 
column1@0 as column1]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table v;
+
+# test with unnest struct, should not push down filter
+statement ok
+CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 
2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
+
+query I?
+select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
+----
+1 {a: 1, b: 2}
+
+query TT
+explain select * from (select column1, unnest(column2) as o from d) where 
o['a'] = 1;
+----
+physical_plan
+01)ProjectionExec: expr=[column1@0 as column1, 
__unnest_placeholder(d.column2,depth=1)@1 as o]
+02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, 
__unnest_placeholder(d.column2,depth=1)@2]
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: 
expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as 
__datafusion_extracted_1, column1@0 as column1, 
__unnest_placeholder(d.column2,depth=1)@1 as 
__unnest_placeholder(d.column2,depth=1)]
+05)--------UnnestExec
+06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as 
__unnest_placeholder(d.column2)]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table d;
+
+statement ok
+CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 
'b', 4)), (named_struct('a', 5, 'b', 6));
+
+query II
+select * from (select unnest(column1) from d) where 
"__unnest_placeholder(d.column1).b" > 5;
+----
+5 6
+
+query TT
+explain select * from (select unnest(column1) from d) where 
"__unnest_placeholder(d.column1).b" > 5;
+----
+physical_plan
+01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
+02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----UnnestExec
+04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table d;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to