Hi Keith Lee,

Thanks for your feedback—this is a good question. I believe RowMerger mode
can cover most common cases. The main difference across aggregate functions
is the underlying data type of the accumulator state. In some cases, we may
need a more complex state type, such as ArrayType or MapType.

For example, for percentile aggregation, we could introduce a “syntactic
sugar” data type like Histogram (like prometheus). It could be read from
and written to as an INT/BIGINT, while its underlying storage is an
ArrayType. We could then implement a dedicated aggregate function on top of
it.

Best regards,
Yang

Keith Lee <[email protected]> 于2025年12月19日周五 04:48写道:

> Hi Yang,
>
> Thank you for the well thought through proposal. I have mainly one question
>
> 1. Have you considered adding percentile aggregations? e.g. median, p99 or
> p1. I find that these can be tremendously useful to get a good
> representation of the dataset without being affected by outliers e.g.
> min/max. Granted, the current `RowMerger` interface allows for merging of
> individual rows and calculating percentile in streaming manner needs more
> information than current and last rows; Considering what it would take to
> implement percentile aggregations may reveal solutions more amenable to
> implement more types of useful aggregations in the future.
>
> Best regards
> Keith Lee
>
>
> On Thu, Dec 18, 2025 at 12:14 PM Yang Wang <[email protected]>
> wrote:
>
> > Hi, Jark.
> > Thanks for your comprehensive and valuable feedback. This has made the
> > FIP proposal
> > more complete. Let me respond to the comments one by one:
> >
> >    - Comments (1) and (2) essentially target the same issue: for buckets
> >    without checkpoints, how do we determine which updates need to be
> rolled
> >    back? Persisting state to Fluss is an excellent solution. It allows us
> > to
> >    distinguish whether an instance is starting for the first time or
> >    recovering from a failover without checkpoints. This is a very
> important
> >    suggestion. I will add a handling plan for this corner case to the
> FIP.
> >    - I strongly agree with suggestions (3), (4), (5), and (6). I will
> >    update the FIP accordingly.
> >    - Regarding suggestion (7), I have some concerns. In fact, our API
> >    design and implementation architecture influence each other. The core
> > issue
> >    is that for the same bucket of the same table, we cannot allow the
> send
> >    queue to contain mixed WriteBatches based on different agg_modes, as
> > this
> >    would lead to non-deterministic write results. To introduce agg_mode
> >    cleanly, we would need significant refactoring of both the upper-layer
> >    write API and the batching/aggregation sending architecture. This
> >    complexity may be unnecessary at the moment. Designing a
> “Recovery-mode”
> >    connection is actually a compromise to introduce the minimal
> complexity
> >    while still providing correct semantics. Perhaps we can discuss this
> >    further.
> >    - Regarding suggestion (8), considering that users may work with very
> >    wide tables, they might be forced to add extra configuration items for
> > many
> >    columns that have no special aggregation needs, which would hurt user
> >    experience and bloat configuration. Apache Paimon defaults to using
> the
> >    last_non_null_value aggregation function for unspecified columns, and
> I
> >    believe most users may already be accustomed to this behavior. It
> might
> > be
> >    better for us to stay consistent with Paimon.
> >
> > Best regards,
> > Yang
> >
> > Jark Wu <[email protected]> 于2025年12月14日周日 23:37写道:
> >
> > > Hi Yanng,
> > >
> > > Thanks for the great proposal, it’s very detailed and covers all
> > > aspects comprehensively.
> > >
> > > I have the following comments on the design:
> > >
> > > **(1) Handling failover before the first checkpoint is completed**
> > > If a job starts, writes some data, but failover before completing its
> > > first checkpoint, the undo log mechanism won’t be triggered, leading
> > > to duplicate data. To address this, I think we may need to rely on
> > > Fluss to store the initial offset state.
> > >
> > > One possible solution:
> > > ① Only the Coordinator can accept `AcquireColumnLockRequest`. The
> > > request can carry offset information, and the Coordinator persists the
> > > `owner-id`, `columns`, and `offsets` to ZooKeeper.
> > > ② At job startup, Sink Task-0 fetches a full snapshot of the table’s
> > > offsets and registers a column lock with the Fluss Coordinator.
> > > ③ Upon failover, the sink checks whether it’s recovering from state or
> > > starting statelessly. In the stateless case, it retrieves the
> > > persisted offsets for the `owner-id` from the Coordinator/ZooKeeper,
> > > compares them with the latest cluster offsets, and then reconstructs
> > > the redo log accordingly.
> > >
> > > **Tip**: We don’t need `schema_id` in `AcquireColumnLockRequest` if we
> > > use column ids instead of column indexes.
> > >
> > > **(2) Handling Dynamic Partitioning**
> > > When dynamic partitioning is enabled on a table, a similar issue
> > > arises: new partitions may be created and written to, but if failover
> > > occurs before the first checkpoint, duplicate data can appear in those
> > > new partitions.
> > >
> > > To handle this, during failover recovery from state, we must retrieve
> > > **all partitions and their corresponding offsets**. If a partition’s
> > > offset is missing from the state, we should start consuming its redo
> > > log from offset `0`.
> > >
> > > **(3) Improvements to lock owner ID and TTL parameters**
> > > The current parameters `client.writer.lock-owner-id` and
> > > `client.column-lock.default-ttl` can be simplified and unified as:
> > > - `client.column-lock.owner-id`
> > > - `client.column-lock.ttl`
> > >
> > > Moreover, these parameters should work **by default without explicit
> > > configuration**. For example, we can generate a default `owner-id`
> > > from the Flink Job ID + column index:
> > > - The Job ID ensures a new ID for every fresh job submission.
> > > - During failover restarts, the Job ID remains unchanged.
> > >
> > > Additionally, we should **checkpoint the `owner-id` into the sink
> > > state**, so it remains consistent during failover or job version
> > > upgrades.
> > >
> > > **(4) Connector Options & TableDescriptor Configuration API**
> > > Currently, users must configure aggregate columns via connector
> > > options in the Java client. However, we consider aggregate functions
> > > part of the **schema definition**, similar to how StarRocks [1] and
> > > ClickHouse [2] define aggregation directly in column DDL (e.g., `pv
> > > BIGINT SUM`).
> > >
> > > The need for options today stems from Flink SQL’s lack of support for
> > > such DDL syntax. But for future engine integrations, we’d prefer
> > > native DDL-based way.
> > >
> > > Therefore:
> > >
> > > Add a method in the `Schema`/`SchemaBuilder` layer:
> > >
> > > ```java
> > > aggColumn(String columnName, DataType dataType, AggFunction
> aggFunction)
> > > ```
> > > The `AggFunction` should be an Enum type. This method is similar to
> > > how we added `incrementColumn`.
> > >
> > > Then rename `table.fields.total_orders.aggregate-function` to the
> > > connector option `fields.total_orders.agg`. We can shorten
> > > `aggregate-function` to `agg` here. The connector option is translated
> > > into the schema builder.
> > >
> > >
> > > **(5) Default value for `table.column-lock.enabled`**
> > > Currently, this is `false` by default. However, if the aggregate merge
> > > engine **requires** column locking, Fluss coordinator can **enable it
> > > automatically** when creating an agg merge engine table, unless the
> > > user explicitly sets it to `false`.
> > >
> > > **(6) Aggregate functions**
> > > The behavior of `last_value` ignoring `NULL`s is standardized via the
> > > `IGNORE NULLS` clause (e.g., in Databricks [3] and ClickHouse [4]).
> > > Thus, I suggest renaming:
> > > - `last_non_null_value` → `last_value_ignore_nulls`
> > > - `first_non_null_value` → `first_value_ignore_nulls`
> > >
> > > Additionally, we should also introduce:
> > > - `string_agg` (alias for `listagg`) [5]
> > > - `array_agg` (returns `ARRAY<T>`) [6]
> > >
> > > **(7) Connection Interface for Recovery Mode**
> > > Exposing a "recovery mode" at the `Connection` level leaks too much
> > > internal complexity. The `Connection` is the primary API entry point,
> > > while redo log handling is an advanced feature rarely used directly by
> > > end users.
> > >
> > > Instead, I suggest embedding the override mode directly in
> > > `PutKvRequest` via a new field:
> > > ```proto
> > > optional int32 agg_mode = 6; // null (default): replace, 0: replace,
> > > 1: accumulate, 2: merge_state
> > > ```
> > >
> > > In the future, `merge_state` mode could allow the sink to perform
> > > **local aggregation** and send the aggregated state to the server for
> > > direct merging to reduce RocksDB write amplification. This would be a
> > > valuable production optimization, analogous to Flink’s mini-batch [7].
> > >
> > > **(8) If aggregate-function not configured, default to
> > last_non_null_value
> > > I suggest **requiring users to explicitly configure an aggregate
> > > function**, as this makes semantics clearer. We can consider adding a
> > > fallback behavior later **only if** user feedback shows it’s truly
> > > needed.
> > >
> > > Best,
> > > Jark Wu
> > >
> > > **References**
> > > [1]
> > >
> >
> https://docs.starrocks.io/docs/table_design/table_types/aggregate_table/#create-a-table
> > > [2]
> > >
> >
> https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
> > > [3]
> > >
> >
> https://docs.databricks.com/aws/en/sql/language-manual/functions/last_value
> > > [4]
> > >
> >
> https://clickhouse.com/docs/engines/table-engines/mergetree-family/aggregatingmergetree
> > > [5]
> > >
> >
> https://docs.databricks.com/aws/en/sql/language-manual/functions/string_agg
> > > [6]
> > >
> >
> https://docs.databricks.com/aws/en/sql/language-manual/functions/array_agg
> > > [7]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/
> > >
> > > On Mon, 8 Dec 2025 at 15:30, Yang Wang <[email protected]>
> > wrote:
> > > >
> > > > Hi Cheng,
> > > > Thanks for the thoughtful feedback and for bringing up the RocksDB
> 2PC
> > > > approach.
> > > > You've identified the core challenge precisely: data visibility vs.
> > > > real-time processing. This is exactly why we chose the Undo Recovery
> > > > mechanism over transaction-based approaches in the proposal.
> > > >
> > > >
> > > >
> > > > *Key Considerations:1. Real-time Visibility Conflict*As you
> mentioned,
> > > > RocksDB 2PC would require delayed visibility until transaction
> commit.
> > > For
> > > > Fluss's positioning as a real-time streaming storage, this conflicts
> > with
> > > > our fundamental requirement that writes should be immediately
> > queryable.
> > > In
> > > > typical scenarios (e.g., real-time dashboards), users expect
> > second-level
> > > > updates, not waiting for Flink checkpoint completion (which could be
> > tens
> > > > of seconds).
> > > >
> > > > *2. Already Evaluated and Rejected*We actually evaluated transaction
> > > > mechanisms in the FIP design phase. From the "Rejected Alternatives"
> > > > section:
> > > > > Use Transaction Mechanism to Implement Exactly-Once
> > > > >
> > > > > Disadvantages: Extremely high implementation complexity, requires
> > > > refactoring Fluss's write path, high performance overhead (requires
> > > delayed
> > > > visibility, increased commit overhead), conflicts with Fluss's
> > real-time
> > > > visibility design philosophy
> > > > >
> > > > > Rejection Reason: Cost too high, inconsistent with Fluss's
> real-time
> > > > streaming storage positioning
> > > > (See FIP Section: "Rejected Alternatives")
> > > >
> > > > *3. Additional Complexity with RocksDB 2PC*Beyond visibility issues:
> > > >
> > > >    - Distributed coordination: Requires a global transaction
> > coordinator
> > > >    across multiple TabletServers
> > > >    - Flink checkpoint alignment: How to coordinate RocksDB commit
> with
> > > >    asynchronous Flink checkpoints?
> > > >    - Multi-job concurrency: Column-level partial updates would
> require
> > > >    complex transaction isolation coordination
> > > >    - Performance overhead: Prepare/commit overhead exists for every
> > > write,
> > > >    even in normal cases
> > > >
> > > >
> > > > *4. Why Undo Recovery Fits Better*Our approach optimizes for the
> common
> > > > case:
> > > >
> > > >    - Normal writes: Zero transaction overhead, immediate visibility
> > > >    - Failover (rare): Pay the cost of undo operations only when
> needed
> > > >    - Lightweight: Leverages existing Changelog capability, no global
> > > >    coordinator needed
> > > >    - Localized: Each bucket handles recovery independently via offset
> > > >    comparison
> > > >
> > > > *Summary*
> > > > While RocksDB 2PC is theoretically cleaner from a database
> perspective,
> > > it
> > > > introduces unacceptable trade-offs for Fluss's real-time streaming
> use
> > > > cases. The Undo Recovery approach better aligns with our "optimize
> for
> > > the
> > > > common path" philosophy and maintains Fluss's real-time
> > characteristics.
> > > > Would love to discuss further if you have additional thoughts!
> > > >
> > > > Best regards,
> > > > Yang
> > >
> >
>

Reply via email to