wuchong commented on code in PR #2492:
URL: https://github.com/apache/fluss/pull/2492#discussion_r2742565353


##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java:
##########
@@ -91,9 +94,15 @@ public Upsert partialUpdate(String... targetColumnNames) {
         return partialUpdate(targetColumns);
     }
 
+    @Override
+    public Upsert aggregationMode(AggMode mode) {
+        checkNotNull(mode, "aggregation mode");
+        return new TableUpsert(tablePath, tableInfo, writerClient, 
this.targetColumns, mode);
+    }
+
     @Override
     public UpsertWriter createWriter() {
-        return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, 
writerClient);
+        return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, 
writerClient, aggMode);

Review Comment:
   We should add check that if `aggMode` is `OVERWRITE` or `LOCAL_AGGREGATE`, 
the table should be aggregation merge engine. 



##########
fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java:
##########
@@ -120,6 +121,15 @@ public abstract boolean tryAppend(WriteRecord writeRecord, 
WriteCallback callbac
 
     public abstract void abortRecordAppends();
 
+    /**
+     * Get the aggregation mode for this batch (only applicable to KV batches).
+     *
+     * @return the aggregation mode, defaults to AGGREGATE for log batches
+     */
+    public AggMode getAggMode() {

Review Comment:
   It's very confusing that `ArrowLogWriteBatch` has a default 
`AggMode#AGGREGATE`. I checked the code, it seems that providing this method 
only in `KvWriteBatch` can work. 



##########
fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.protocol;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Aggregation mode for write operations.
+ *
+ * <p>This enum controls how the server handles data aggregation when writing 
to tables with
+ * aggregation merge engine.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public enum AggMode {

Review Comment:
   I’ve reconsidered the enum API design, and it’s indeed confusing to have an 
`aggMode` flag on every `PutRequest`, especially since it implies 
aggregation-specific semantics while can being used across different merge 
engines. A more general and consistent approach would be to introduce a 
**`MergeMode`** that applies uniformly to all merge engines.
   
   Here’s the proposed refinement:
   
   1. **Rename `AggMode#AGGREGATE` to `MergeMode#DEFAULT`**  
      This represents the default merge behavior for each engine:  
      - For the *aggregate* merge engine: perform aggregation.  
      - For the *first-row* merge engine: retain the first observed row.  
      - For the *versioned* merge engine: keep the row with the highest 
version.  
   
   2. **Rename `AggMode#OVERWRITE` to `MergeMode#OVERWRITE`**  
      This mode bypasses all merge logic and directly replaces existing values 
with new ones—similar to an `OVERWRITE INTO` statement. It’s useful for 
scenarios like undo/recovery across all merge engines.
   
   3. **Remove `AggMode#LOCAL_AGGREGATE`**  
      This is purely a client-side optimization that performs pre-aggregation 
before sending data. It actually doesn't change the protocol between client and 
server. The client still send the result using `MergeMode#DEFAULT`, as the 
on-wire schema remains unchanged.
   
   This change simplifies the API, improves consistency across merge engines, 
and clarifies the intent of each mode.
   
   If you agree on the new API, we can rename the enum type in this PR, and 
postpone the general support MergeMode for all merge eninges in next PR. But if 
you decide to support it in this PR, please remember to add tests.



##########
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java:
##########


Review Comment:
   We should add check if the `aggMode` is not the default value, we should 
throw exception, because the `TypedUpsertWriterImpl` doesn't support the 
`aggMode` yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to