Hi Hongshun,

Thank you for your detailed review. You have raised some excellent
questions and suggestions. I'll address your points sequentially:

1. I agree with your assessment that the fluent style API is more elegant
than the current overloaded form. I will modify the API design to align
with this approach.

2. No, the statistics data has its own version field that determines how to
serialize and deserialize the bytes data. Therefore, if we add more
statistics in the future, we won't need to upgrade the RecordBatch format.

3、The semantic of RecordBatch filter pushdown follows the "best effort"
principle. The SupportsFilterPushDown#applyFilters method attempts to push
down filters when the filter expression matches the index type and data
type requirements, while retaining them in the returned remainingFilters
(so that Flink will also filter data by row again). When RecordBatch
filters are pushed down to the server side, the
FilteredLogRecordBatchIterator tries to read the statistics information
from the RecordBatch. If the statistics information is not present, the
RecordBatch will not be filtered out. This mechanism ensures that we can
support different LogRecordBatch formats simultaneously without
compatibility issues.

Best

Yang

Hongshun Wang <[email protected]> 于2025年8月7日周四 14:19写道:

> Hi Yang,
>
> Thanks for your great work — this change indeed reduces the cost of
> filtered queries. I just have a few questions for clarification:
>
>
> 1. Fluent API Design for LogScanner Currently, we have:
>
> > LogScanner createLogScanner(Predicate recordBatchFilter);
> >
>  Would it be possible to make the interface more aligned with the fluent
> design pattern used in Jark’s refactoring?[1] For example:
>
> > table.newScan() .project(projectedFields) .filter(recordBatchFilter)
> > .createLogScanner();
> >
>
> 2. LogRecordBatchStatistics now supports min, max, and null count, and will
> be serialized into RecordBatch headers (requiring an upgrade from V1 to V2
> format). If we plan to support additional statistics in the future, will we
> need to upgrade to V3? Or has V2 already been designed with extensibility
> in mind?
>
> 3. When SupportsFilterPushDown#applyFilters pushes filters down to the
> source, how does the source determine whether a filter can actually be
> pushed down? Even if the user is on the latest version of Fluss that
> supports V2 format, existing data might still be in V1 format (which
> doesn’t include statistics). Will this compatibility issue be handled on
> the client side?
>
> Looking forward to your thoughts!
>
>
> Best
>
> Hongshun
>
> [1] https://github.com/apache/fluss/issues/340
>
> On Thu, Aug 7, 2025 at 11:12 AM Yang Wang <[email protected]>
> wrote:
>
> > Hello Fluss Community,
> >
> > I propose initiating discussion on FIP-10: Support Log RecordBatch Filter
> > Pushdown (
> >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-10%3A+Support+Log+RecordBatch+Filter+Pushdown
> > ).
> > This optimization aims to improve the performance of Log table queries
> and
> > is now ready for community feedback.
> >
> > This FIP introduces RecordBatch-level filter pushdown to enable early
> > filtering at the storage layer, thereby optimizing CPU, memory, and
> network
> > resources by skipping non-matching log record batches.
> >
> > A proof-of-concept (PoC) has been implemented in the logfilter branch in
> > https://github.com/platinumhamburg/fluss and is ready for testing and
> > preview.
> >
>

Reply via email to