Hi Keith Lee, Thanks for your feedback—this is a good question. I believe RowMerger mode can cover most common cases. The main difference across aggregate functions is the underlying data type of the accumulator state. In some cases, we may need a more complex state type, such as ArrayType or MapType.
For example, for percentile aggregation, we could introduce a “syntactic sugar” data type like Histogram (like prometheus). It could be read from and written to as an INT/BIGINT, while its underlying storage is an ArrayType. We could then implement a dedicated aggregate function on top of it. Best regards, Yang Keith Lee <[email protected]> 于2025年12月19日周五 04:48写道: > Hi Yang, > > Thank you for the well thought through proposal. I have mainly one question > > 1. Have you considered adding percentile aggregations? e.g. median, p99 or > p1. I find that these can be tremendously useful to get a good > representation of the dataset without being affected by outliers e.g. > min/max. Granted, the current `RowMerger` interface allows for merging of > individual rows and calculating percentile in streaming manner needs more > information than current and last rows; Considering what it would take to > implement percentile aggregations may reveal solutions more amenable to > implement more types of useful aggregations in the future. > > Best regards > Keith Lee > > > On Thu, Dec 18, 2025 at 12:14 PM Yang Wang <[email protected]> > wrote: > > > Hi, Jark. > > Thanks for your comprehensive and valuable feedback. This has made the > > FIP proposal > > more complete. Let me respond to the comments one by one: > > > > - Comments (1) and (2) essentially target the same issue: for buckets > > without checkpoints, how do we determine which updates need to be > rolled > > back? Persisting state to Fluss is an excellent solution. It allows us > > to > > distinguish whether an instance is starting for the first time or > > recovering from a failover without checkpoints. This is a very > important > > suggestion. I will add a handling plan for this corner case to the > FIP. > > - I strongly agree with suggestions (3), (4), (5), and (6). I will > > update the FIP accordingly. > > - Regarding suggestion (7), I have some concerns. In fact, our API > > design and implementation architecture influence each other. The core > > issue > > is that for the same bucket of the same table, we cannot allow the > send > > queue to contain mixed WriteBatches based on different agg_modes, as > > this > > would lead to non-deterministic write results. To introduce agg_mode > > cleanly, we would need significant refactoring of both the upper-layer > > write API and the batching/aggregation sending architecture. This > > complexity may be unnecessary at the moment. Designing a > “Recovery-mode” > > connection is actually a compromise to introduce the minimal > complexity > > while still providing correct semantics. Perhaps we can discuss this > > further. > > - Regarding suggestion (8), considering that users may work with very > > wide tables, they might be forced to add extra configuration items for > > many > > columns that have no special aggregation needs, which would hurt user > > experience and bloat configuration. Apache Paimon defaults to using > the > > last_non_null_value aggregation function for unspecified columns, and > I > > believe most users may already be accustomed to this behavior. It > might > > be > > better for us to stay consistent with Paimon. > > > > Best regards, > > Yang > > > > Jark Wu <[email protected]> 于2025年12月14日周日 23:37写道: > > > > > Hi Yanng, > > > > > > Thanks for the great proposal, it’s very detailed and covers all > > > aspects comprehensively. > > > > > > I have the following comments on the design: > > > > > > **(1) Handling failover before the first checkpoint is completed** > > > If a job starts, writes some data, but failover before completing its > > > first checkpoint, the undo log mechanism won’t be triggered, leading > > > to duplicate data. To address this, I think we may need to rely on > > > Fluss to store the initial offset state. > > > > > > One possible solution: > > > ① Only the Coordinator can accept `AcquireColumnLockRequest`. The > > > request can carry offset information, and the Coordinator persists the > > > `owner-id`, `columns`, and `offsets` to ZooKeeper. > > > ② At job startup, Sink Task-0 fetches a full snapshot of the table’s > > > offsets and registers a column lock with the Fluss Coordinator. > > > ③ Upon failover, the sink checks whether it’s recovering from state or > > > starting statelessly. In the stateless case, it retrieves the > > > persisted offsets for the `owner-id` from the Coordinator/ZooKeeper, > > > compares them with the latest cluster offsets, and then reconstructs > > > the redo log accordingly. > > > > > > **Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if we > > > use column ids instead of column indexes. > > > > > > **(2) Handling Dynamic Partitioning** > > > When dynamic partitioning is enabled on a table, a similar issue > > > arises: new partitions may be created and written to, but if failover > > > occurs before the first checkpoint, duplicate data can appear in those > > > new partitions. > > > > > > To handle this, during failover recovery from state, we must retrieve > > > **all partitions and their corresponding offsets**. If a partition’s > > > offset is missing from the state, we should start consuming its redo > > > log from offset `0`. > > > > > > **(3) Improvements to lock owner ID and TTL parameters** > > > The current parameters `client.writer.lock-owner-id` and > > > `client.column-lock.default-ttl` can be simplified and unified as: > > > - `client.column-lock.owner-id` > > > - `client.column-lock.ttl` > > > > > > Moreover, these parameters should work **by default without explicit > > > configuration**. For example, we can generate a default `owner-id` > > > from the Flink Job ID + column index: > > > - The Job ID ensures a new ID for every fresh job submission. > > > - During failover restarts, the Job ID remains unchanged. > > > > > > Additionally, we should **checkpoint the `owner-id` into the sink > > > state**, so it remains consistent during failover or job version > > > upgrades. > > > > > > **(4) Connector Options & TableDescriptor Configuration API** > > > Currently, users must configure aggregate columns via connector > > > options in the Java client. However, we consider aggregate functions > > > part of the **schema definition**, similar to how StarRocks [1] and > > > ClickHouse [2] define aggregation directly in column DDL (e.g., `pv > > > BIGINT SUM`). > > > > > > The need for options today stems from Flink SQL’s lack of support for > > > such DDL syntax. But for future engine integrations, we’d prefer > > > native DDL-based way. > > > > > > Therefore: > > > > > > Add a method in the `Schema`/`SchemaBuilder` layer: > > > > > > ```java > > > aggColumn(String columnName, DataType dataType, AggFunction > aggFunction) > > > ``` > > > The `AggFunction` should be an Enum type. This method is similar to > > > how we added `incrementColumn`. > > > > > > Then rename `table.fields.total_orders.aggregate-function` to the > > > connector option `fields.total_orders.agg`. We can shorten > > > `aggregate-function` to `agg` here. The connector option is translated > > > into the schema builder. > > > > > > > > > **(5) Default value for `table.column-lock.enabled`** > > > Currently, this is `false` by default. However, if the aggregate merge > > > engine **requires** column locking, Fluss coordinator can **enable it > > > automatically** when creating an agg merge engine table, unless the > > > user explicitly sets it to `false`. > > > > > > **(6) Aggregate functions** > > > The behavior of `last_value` ignoring `NULL`s is standardized via the > > > `IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]). > > > Thus, I suggest renaming: > > > - `last_non_null_value` → `last_value_ignore_nulls` > > > - `first_non_null_value` → `first_value_ignore_nulls` > > > > > > Additionally, we should also introduce: > > > - `string_agg` (alias for `listagg`) [5] > > > - `array_agg` (returns `ARRAY<T>`) [6] > > > > > > **(7) Connection Interface for Recovery Mode** > > > Exposing a "recovery mode" at the `Connection` level leaks too much > > > internal complexity. The `Connection` is the primary API entry point, > > > while redo log handling is an advanced feature rarely used directly by > > > end users. > > > > > > Instead, I suggest embedding the override mode directly in > > > `PutKvRequest` via a new field: > > > ```proto > > > optional int32 agg_mode = 6; // null (default): replace, 0: replace, > > > 1: accumulate, 2: merge_state > > > ``` > > > > > > In the future, `merge_state` mode could allow the sink to perform > > > **local aggregation** and send the aggregated state to the server for > > > direct merging to reduce RocksDB write amplification. This would be a > > > valuable production optimization, analogous to Flink’s mini-batch [7]. > > > > > > **(8) If aggregate-function not configured, default to > > last_non_null_value > > > I suggest **requiring users to explicitly configure an aggregate > > > function**, as this makes semantics clearer. We can consider adding a > > > fallback behavior later **only if** user feedback shows it’s truly > > > needed. > > > > > > Best, > > > Jark Wu > > > > > > **References** > > > [1] > > > > > > https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table > > > [2] > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > [3] > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value > > > [4] > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > [5] > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg > > > [6] > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg > > > [7] > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/ > > > > > > On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]> > > wrote: > > > > > > > > Hi Cheng, > > > > Thanks for the thoughtful feedback and for bringing up the RocksDB > 2PC > > > > approach. > > > > You've identified the core challenge precisely: data visibility vs. > > > > real-time processing. This is exactly why we chose the Undo Recovery > > > > mechanism over transaction-based approaches in the proposal. > > > > > > > > > > > > > > > > *Key Considerations:1. Real-time Visibility Conflict*As you > mentioned, > > > > RocksDB 2PC would require delayed visibility until transaction > commit. > > > For > > > > Fluss's positioning as a real-time streaming storage, this conflicts > > with > > > > our fundamental requirement that writes should be immediately > > queryable. > > > In > > > > typical scenarios (e.g., real-time dashboards), users expect > > second-level > > > > updates, not waiting for Flink checkpoint completion (which could be > > tens > > > > of seconds). > > > > > > > > *2. Already Evaluated and Rejected*We actually evaluated transaction > > > > mechanisms in the FIP design phase. From the "Rejected Alternatives" > > > > section: > > > > > Use Transaction Mechanism to Implement Exactly-Once > > > > > > > > > > Disadvantages: Extremely high implementation complexity, requires > > > > refactoring Fluss's write path, high performance overhead (requires > > > delayed > > > > visibility, increased commit overhead), conflicts with Fluss's > > real-time > > > > visibility design philosophy > > > > > > > > > > Rejection Reason: Cost too high, inconsistent with Fluss's > real-time > > > > streaming storage positioning > > > > (See FIP Section: "Rejected Alternatives") > > > > > > > > *3. Additional Complexity with RocksDB 2PC*Beyond visibility issues: > > > > > > > > - Distributed coordination: Requires a global transaction > > coordinator > > > > across multiple TabletServers > > > > - Flink checkpoint alignment: How to coordinate RocksDB commit > with > > > > asynchronous Flink checkpoints? > > > > - Multi-job concurrency: Column-level partial updates would > require > > > > complex transaction isolation coordination > > > > - Performance overhead: Prepare/commit overhead exists for every > > > write, > > > > even in normal cases > > > > > > > > > > > > *4. Why Undo Recovery Fits Better*Our approach optimizes for the > common > > > > case: > > > > > > > > - Normal writes: Zero transaction overhead, immediate visibility > > > > - Failover (rare): Pay the cost of undo operations only when > needed > > > > - Lightweight: Leverages existing Changelog capability, no global > > > > coordinator needed > > > > - Localized: Each bucket handles recovery independently via offset > > > > comparison > > > > > > > > *Summary* > > > > While RocksDB 2PC is theoretically cleaner from a database > perspective, > > > it > > > > introduces unacceptable trade-offs for Fluss's real-time streaming > use > > > > cases. The Undo Recovery approach better aligns with our "optimize > for > > > the > > > > common path" philosophy and maintains Fluss's real-time > > characteristics. > > > > Would love to discuss further if you have additional thoughts! > > > > > > > > Best regards, > > > > Yang > > > > > >
