Hi Jark, Thanks for your review comments. I have revised the FIP document accordingly. I sincerely apologize that progress on this FIP was paused for quite some time due to other tasks; I will continue to move this work forward.
Best, Yang Jark Wu <[email protected]> 于2025年8月10日周日 13:11写道: > Hi Yang, > > Thanks for driving this excellent work! I left some comments below: > > *1. "If we use a filter() like Fluent API, we may lead users to > misunderstand the real semantics of the interface."* > > I don't think this will give users the impression that the server side > performs row-by-row filtering, because we have already provides similar > methods like "project()" and "limit()", they all not row-by-row operations. > We can add necessary Javadocs for the methods to make it clear it performs > a server-sde recordbatch-level filtering. > > Personally, I don't like "createLogScanner(Predicate recordBatchFilter)" > because it will explode the method when we have more filter types and > parameters. The existing builder API pattern has better extensibility. > Besides, the filter pushdown is record-batch-level today, but maybe > file-level in the future, and that optimization should be transparent for > users, and don't require a API change. > > *2. The "Predicate.proto" file* > > The FIP introduced a dedicated "Predicate.proto" file for the predicate > expressions, but it breaks the fluss single-file RPC definition. I don't > see benefits to breaks into mulitple proto files, but make it hard for > 3rd-parts to understand the Fluss RPC definition. Therefore, I suggest just > put the predicate definition in "FlussApi.proto" > > *3. Suggestions on some Predicate proto definitions* > (1) no definition of PbLiteralValue, this is a public API, and shouldn't > omit in FIP. API is the most important part to disucss in FIP, even if it > will make the FIP large. > (2) Can we use an int index to represent the PbFieldRef? The server side > has the schema information of the table, so it can derive the data types > and column names in server side. We also does this for projection pushdown. > (3) > > *4. Statistics Data format* > > The Statistics data format is the most critical part in this FIP and should > be drawn like the RecordBatch Format. I think we have many topics to > discuss with the Statistics data format: > (1) Statistics Data should be before or after Records Data? I think put > statistics with header together can make filtering record batch faster as > it can reduce to one IO. > (2) Can it statistics format support lazily deserialize for sepcific > columns? Considering 1000 columsn with only filtering one column, the > deserialization cost maybe larger than direct read without filtering. > (3) How the statistics format evolve to future statistics? > > > *5. FilteredLogRecordBatchIterator reads each RecordBatch* > > You said the FilteredLogRecordBatchIterator will try to read the statistics > information from each the RecordBatch. I'm afraid the performance > degression introduced by this. Because fluss reads chunks without > deserialization each record batch header by default (if no projection > pushdown). However, if there is a filter pushdown, even if the filter > doesn't filter anything, and even if the log table doesn't have statistics, > it fallback to read each batch header with will be much slower than before. > > *6. Migration Strategy: Deploy new version with feature flag disabled* > > The migration strategy you replied to Yuxia sounds very complex and > error-prone to me. > > I think an easier to for the migration or backward-compatibility is > introducing a table-level statistic configuration, such as > "table.log.statistic.enabled". Every table created in the new version > cluster, the table will be added this configuration with "true" value > unless users disable it explicilty. Then for the old tables, we still keep > the old behavior that not generating the statistics as it requires the > client upgrade. For the new tables enabled the config, it requires an > upgrade of the client, otherwise, the client or server should throw an > exception about no statistics. > > > Best, > Jark > > > > On Fri, 8 Aug 2025 at 13:46, Yang Wang <[email protected]> wrote: > > > 发件人: Yang Wang <[email protected]> > > Date: 2025年8月8日周五 11:17 > > Subject: Re: [DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown > > To: <[email protected]> > > > > > > Hi Cheng, > > > > > Can we clarify that this filter evaluation works on a best-effort basis > > at the beginning of the FIP document? Specifically, it only performs > > coarse-grained block skipping by leveraging RecordBatch > statistics. To > > be honest, the table.newScan().filter(recordBatchFilter) API gave me the > > impression that the server side performs row-by-row filtering. > > > > This question relates to what I want to apologize to @HongShun again for, > > as my reply to his review yesterday was not well considered. I will > clarify > > that the previously designed API: > > > > > LogScanner createLogScanner(Predicate recordBatchFilter); > > > > It can clearly hint to the user that the filter is responsible for > > filtering recordBatch only (not at the row level) for log tables. If we > use > > a filter() like Fluent API, we may lead users to misunderstand the real > > semantics of the interface. > > > > Best regards, > > Yang > > > > > > Wang Cheng <[email protected]> 于2025年8月8日周五 08:38写道: > > > > > Thanks Yang for driving this work. > > > > > > > > > Can we clarify that this filter evaluation works on a best-effort basis > > at > > > the beginning of the FIP document? Specifically, it only performs > > > coarse-grained block skipping by leveraging RecordBatch > > statistics. To > > > be honest, the table.newScan().filter(recordBatchFilter) API gave me > the > > > impression that the server side performs row-by-row filtering. > > > > > > > > > > > > Regards, > > > Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------ Original ------------------ > > > From: > > > "dev" > > > < > > > [email protected]>; > > > Date: Thu, Aug 7, 2025 11:11 AM > > > To: "dev"<[email protected]>; > > > > > > Subject: [DISCUSS] FIP-10: Support Log RecordBatch Filter Pushdown > > > > > > > > > > > > Hello Fluss Community, > > > > > > I propose initiating discussion on FIP-10: Support Log RecordBatch > Filter > > > Pushdown ( > > > > > > > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-10%3A+Support+Log+RecordBatch+Filter+Pushdown > > > ). > > > This optimization aims to improve the performance of Log table queries > > and > > > is now ready for community feedback. > > > > > > This FIP introduces RecordBatch-level filter pushdown to enable early > > > filtering at the storage layer, thereby optimizing CPU, memory, and > > network > > > resources by skipping non-matching log record batches. > > > > > > A proof-of-concept (PoC) has been implemented in the logfilter branch > in > > > https://github.com/platinumhamburg/fluss and is ready for testing and > > > preview. > > >
