Hi Yang, Thanks for your significant job. I am interested in the Column Lock Design and would like to ask some questions: 1. The lock granularity is (tableId, partitionId, columns). Does this mean that for a single (tableId, partitionId), there could be multiple locks for different columns? If not, it seems that (tableId, partitionId) would suffice as the lock key.
2. The ClientColumnLockManager (rather than the coordinator server) holds the lock. What happens if a deadlock occurs? For example: Client 1 holds the lock for tablet server 1 while Client 2 also holds the lock for tablet server 2. Then both ask for another lock. Best, Hongshun On Fri, Dec 19, 2025 at 11:00 AM Yang Wang <[email protected]> wrote: > Hi Keith Lee, > > Thanks for your feedback—this is a good question. I believe RowMerger mode > can cover most common cases. The main difference across aggregate functions > is the underlying data type of the accumulator state. In some cases, we may > need a more complex state type, such as ArrayType or MapType. > > For example, for percentile aggregation, we could introduce a “syntactic > sugar” data type like Histogram (like prometheus). It could be read from > and written to as an INT/BIGINT, while its underlying storage is an > ArrayType. We could then implement a dedicated aggregate function on top of > it. > > Best regards, > Yang > > Keith Lee <[email protected]> 于2025年12月19日周五 04:48写道: > > > Hi Yang, > > > > Thank you for the well thought through proposal. I have mainly one > question > > > > 1. Have you considered adding percentile aggregations? e.g. median, p99 > or > > p1. I find that these can be tremendously useful to get a good > > representation of the dataset without being affected by outliers e.g. > > min/max. Granted, the current `RowMerger` interface allows for merging of > > individual rows and calculating percentile in streaming manner needs more > > information than current and last rows; Considering what it would take to > > implement percentile aggregations may reveal solutions more amenable to > > implement more types of useful aggregations in the future. > > > > Best regards > > Keith Lee > > > > > > On Thu, Dec 18, 2025 at 12:14 PM Yang Wang <[email protected]> > > wrote: > > > > > Hi, Jark. > > > Thanks for your comprehensive and valuable feedback. This has made the > > > FIP proposal > > > more complete. Let me respond to the comments one by one: > > > > > > - Comments (1) and (2) essentially target the same issue: for > buckets > > > without checkpoints, how do we determine which updates need to be > > rolled > > > back? Persisting state to Fluss is an excellent solution. It allows > us > > > to > > > distinguish whether an instance is starting for the first time or > > > recovering from a failover without checkpoints. This is a very > > important > > > suggestion. I will add a handling plan for this corner case to the > > FIP. > > > - I strongly agree with suggestions (3), (4), (5), and (6). I will > > > update the FIP accordingly. > > > - Regarding suggestion (7), I have some concerns. In fact, our API > > > design and implementation architecture influence each other. The > core > > > issue > > > is that for the same bucket of the same table, we cannot allow the > > send > > > queue to contain mixed WriteBatches based on different agg_modes, as > > > this > > > would lead to non-deterministic write results. To introduce agg_mode > > > cleanly, we would need significant refactoring of both the > upper-layer > > > write API and the batching/aggregation sending architecture. This > > > complexity may be unnecessary at the moment. Designing a > > “Recovery-mode” > > > connection is actually a compromise to introduce the minimal > > complexity > > > while still providing correct semantics. Perhaps we can discuss this > > > further. > > > - Regarding suggestion (8), considering that users may work with > very > > > wide tables, they might be forced to add extra configuration items > for > > > many > > > columns that have no special aggregation needs, which would hurt > user > > > experience and bloat configuration. Apache Paimon defaults to using > > the > > > last_non_null_value aggregation function for unspecified columns, > and > > I > > > believe most users may already be accustomed to this behavior. It > > might > > > be > > > better for us to stay consistent with Paimon. > > > > > > Best regards, > > > Yang > > > > > > Jark Wu <[email protected]> 于2025年12月14日周日 23:37写道: > > > > > > > Hi Yanng, > > > > > > > > Thanks for the great proposal, it’s very detailed and covers all > > > > aspects comprehensively. > > > > > > > > I have the following comments on the design: > > > > > > > > **(1) Handling failover before the first checkpoint is completed** > > > > If a job starts, writes some data, but failover before completing its > > > > first checkpoint, the undo log mechanism won’t be triggered, leading > > > > to duplicate data. To address this, I think we may need to rely on > > > > Fluss to store the initial offset state. > > > > > > > > One possible solution: > > > > ① Only the Coordinator can accept `AcquireColumnLockRequest`. The > > > > request can carry offset information, and the Coordinator persists > the > > > > `owner-id`, `columns`, and `offsets` to ZooKeeper. > > > > ② At job startup, Sink Task-0 fetches a full snapshot of the table’s > > > > offsets and registers a column lock with the Fluss Coordinator. > > > > ③ Upon failover, the sink checks whether it’s recovering from state > or > > > > starting statelessly. In the stateless case, it retrieves the > > > > persisted offsets for the `owner-id` from the Coordinator/ZooKeeper, > > > > compares them with the latest cluster offsets, and then reconstructs > > > > the redo log accordingly. > > > > > > > > **Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if > we > > > > use column ids instead of column indexes. > > > > > > > > **(2) Handling Dynamic Partitioning** > > > > When dynamic partitioning is enabled on a table, a similar issue > > > > arises: new partitions may be created and written to, but if failover > > > > occurs before the first checkpoint, duplicate data can appear in > those > > > > new partitions. > > > > > > > > To handle this, during failover recovery from state, we must retrieve > > > > **all partitions and their corresponding offsets**. If a partition’s > > > > offset is missing from the state, we should start consuming its redo > > > > log from offset `0`. > > > > > > > > **(3) Improvements to lock owner ID and TTL parameters** > > > > The current parameters `client.writer.lock-owner-id` and > > > > `client.column-lock.default-ttl` can be simplified and unified as: > > > > - `client.column-lock.owner-id` > > > > - `client.column-lock.ttl` > > > > > > > > Moreover, these parameters should work **by default without explicit > > > > configuration**. For example, we can generate a default `owner-id` > > > > from the Flink Job ID + column index: > > > > - The Job ID ensures a new ID for every fresh job submission. > > > > - During failover restarts, the Job ID remains unchanged. > > > > > > > > Additionally, we should **checkpoint the `owner-id` into the sink > > > > state**, so it remains consistent during failover or job version > > > > upgrades. > > > > > > > > **(4) Connector Options & TableDescriptor Configuration API** > > > > Currently, users must configure aggregate columns via connector > > > > options in the Java client. However, we consider aggregate functions > > > > part of the **schema definition**, similar to how StarRocks [1] and > > > > ClickHouse [2] define aggregation directly in column DDL (e.g., `pv > > > > BIGINT SUM`). > > > > > > > > The need for options today stems from Flink SQL’s lack of support for > > > > such DDL syntax. But for future engine integrations, we’d prefer > > > > native DDL-based way. > > > > > > > > Therefore: > > > > > > > > Add a method in the `Schema`/`SchemaBuilder` layer: > > > > > > > > ```java > > > > aggColumn(String columnName, DataType dataType, AggFunction > > aggFunction) > > > > ``` > > > > The `AggFunction` should be an Enum type. This method is similar to > > > > how we added `incrementColumn`. > > > > > > > > Then rename `table.fields.total_orders.aggregate-function` to the > > > > connector option `fields.total_orders.agg`. We can shorten > > > > `aggregate-function` to `agg` here. The connector option is > translated > > > > into the schema builder. > > > > > > > > > > > > **(5) Default value for `table.column-lock.enabled`** > > > > Currently, this is `false` by default. However, if the aggregate > merge > > > > engine **requires** column locking, Fluss coordinator can **enable it > > > > automatically** when creating an agg merge engine table, unless the > > > > user explicitly sets it to `false`. > > > > > > > > **(6) Aggregate functions** > > > > The behavior of `last_value` ignoring `NULL`s is standardized via the > > > > `IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]). > > > > Thus, I suggest renaming: > > > > - `last_non_null_value` → `last_value_ignore_nulls` > > > > - `first_non_null_value` → `first_value_ignore_nulls` > > > > > > > > Additionally, we should also introduce: > > > > - `string_agg` (alias for `listagg`) [5] > > > > - `array_agg` (returns `ARRAY<T>`) [6] > > > > > > > > **(7) Connection Interface for Recovery Mode** > > > > Exposing a "recovery mode" at the `Connection` level leaks too much > > > > internal complexity. The `Connection` is the primary API entry point, > > > > while redo log handling is an advanced feature rarely used directly > by > > > > end users. > > > > > > > > Instead, I suggest embedding the override mode directly in > > > > `PutKvRequest` via a new field: > > > > ```proto > > > > optional int32 agg_mode = 6; // null (default): replace, 0: replace, > > > > 1: accumulate, 2: merge_state > > > > ``` > > > > > > > > In the future, `merge_state` mode could allow the sink to perform > > > > **local aggregation** and send the aggregated state to the server for > > > > direct merging to reduce RocksDB write amplification. This would be a > > > > valuable production optimization, analogous to Flink’s mini-batch > [7]. > > > > > > > > **(8) If aggregate-function not configured, default to > > > last_non_null_value > > > > I suggest **requiring users to explicitly configure an aggregate > > > > function**, as this makes semantics clearer. We can consider adding a > > > > fallback behavior later **only if** user feedback shows it’s truly > > > > needed. > > > > > > > > Best, > > > > Jark Wu > > > > > > > > **References** > > > > [1] > > > > > > > > > > https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table > > > > [2] > > > > > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > > [3] > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value > > > > [4] > > > > > > > > > > https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree > > > > [5] > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg > > > > [6] > > > > > > > > > > https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg > > > > [7] > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/ > > > > > > > > On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]> > > > wrote: > > > > > > > > > > Hi Cheng, > > > > > Thanks for the thoughtful feedback and for bringing up the RocksDB > > 2PC > > > > > approach. > > > > > You've identified the core challenge precisely: data visibility vs. > > > > > real-time processing. This is exactly why we chose the Undo > Recovery > > > > > mechanism over transaction-based approaches in the proposal. > > > > > > > > > > > > > > > > > > > > *Key Considerations:1. Real-time Visibility Conflict*As you > > mentioned, > > > > > RocksDB 2PC would require delayed visibility until transaction > > commit. > > > > For > > > > > Fluss's positioning as a real-time streaming storage, this > conflicts > > > with > > > > > our fundamental requirement that writes should be immediately > > > queryable. > > > > In > > > > > typical scenarios (e.g., real-time dashboards), users expect > > > second-level > > > > > updates, not waiting for Flink checkpoint completion (which could > be > > > tens > > > > > of seconds). > > > > > > > > > > *2. Already Evaluated and Rejected*We actually evaluated > transaction > > > > > mechanisms in the FIP design phase. From the "Rejected > Alternatives" > > > > > section: > > > > > > Use Transaction Mechanism to Implement Exactly-Once > > > > > > > > > > > > Disadvantages: Extremely high implementation complexity, requires > > > > > refactoring Fluss's write path, high performance overhead (requires > > > > delayed > > > > > visibility, increased commit overhead), conflicts with Fluss's > > > real-time > > > > > visibility design philosophy > > > > > > > > > > > > Rejection Reason: Cost too high, inconsistent with Fluss's > > real-time > > > > > streaming storage positioning > > > > > (See FIP Section: "Rejected Alternatives") > > > > > > > > > > *3. Additional Complexity with RocksDB 2PC*Beyond visibility > issues: > > > > > > > > > > - Distributed coordination: Requires a global transaction > > > coordinator > > > > > across multiple TabletServers > > > > > - Flink checkpoint alignment: How to coordinate RocksDB commit > > with > > > > > asynchronous Flink checkpoints? > > > > > - Multi-job concurrency: Column-level partial updates would > > require > > > > > complex transaction isolation coordination > > > > > - Performance overhead: Prepare/commit overhead exists for every > > > > write, > > > > > even in normal cases > > > > > > > > > > > > > > > *4. Why Undo Recovery Fits Better*Our approach optimizes for the > > common > > > > > case: > > > > > > > > > > - Normal writes: Zero transaction overhead, immediate visibility > > > > > - Failover (rare): Pay the cost of undo operations only when > > needed > > > > > - Lightweight: Leverages existing Changelog capability, no > global > > > > > coordinator needed > > > > > - Localized: Each bucket handles recovery independently via > offset > > > > > comparison > > > > > > > > > > *Summary* > > > > > While RocksDB 2PC is theoretically cleaner from a database > > perspective, > > > > it > > > > > introduces unacceptable trade-offs for Fluss's real-time streaming > > use > > > > > cases. The Undo Recovery approach better aligns with our "optimize > > for > > > > the > > > > > common path" philosophy and maintains Fluss's real-time > > > characteristics. > > > > > Would love to discuss further if you have additional thoughts! > > > > > > > > > > Best regards, > > > > > Yang > > > > > > > > > >
