wuchong commented on code in PR #2323:
URL: https://github.com/apache/fluss/pull/2323#discussion_r2751303517


##########
fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java:
##########
@@ -111,6 +117,60 @@ public void validateParameter(String parameterName, String 
parameterValue) {
         }
     }
 
+    /**
+     * Validates a data type for this aggregation function.
+     *
+     * @param fieldType the field data type
+     * @throws IllegalArgumentException if the data type is invalid
+     */
+    public void validateDataType(DataType fieldType) {
+        switch (this) {
+                // The bool_and and bool_or don't have specific DataFamily, 
validate them by
+                // dataType directly.
+            case BOOL_AND:
+            case BOOL_OR:
+                checkArgument(
+                        fieldType instanceof BooleanType,
+                        "Data type for %s column must be 'BooleanType' but was 
'%s'.",
+                        toString(),
+                        fieldType);
+                break;
+            default:
+                DataTypeFamily[] dataTypeFamilies = getSupportedDataFamilies();
+                checkArgument(
+                        fieldType.isAnyOf(dataTypeFamilies),
+                        "Data type for %s column must be part of %s but was 
'%s'.",
+                        toString(),
+                        Arrays.deepToString(dataTypeFamilies),
+                        fieldType);
+                break;
+        }
+    }
+
+    private DataTypeFamily[] getSupportedDataFamilies() {
+        switch (this) {
+            case SUM:
+            case PRODUCT:
+                return new DataTypeFamily[] {DataTypeFamily.NUMERIC};
+            case MAX:
+            case MIN:
+                return new DataTypeFamily[] {
+                    DataTypeFamily.CHARACTER_STRING, DataTypeFamily.NUMERIC, 
DataTypeFamily.DATETIME
+                };
+            case LAST_VALUE:
+            case LAST_VALUE_IGNORE_NULLS:
+            case FIRST_VALUE:
+            case FIRST_VALUE_IGNORE_NULLS:
+                return DataTypeFamily.values();
+            case LISTAGG:
+            case STRING_AGG:
+                return new DataTypeFamily[] {DataTypeFamily.CHARACTER_STRING};
+            default:

Review Comment:
   Listing all function types explicitly is error-prone when adding new 
aggregate functions, as it’s easy to forget to update the data type validation 
logic.  
   
   To avoid missing data type checks for new functions, I suggest making 
**supported parameter types** and **supported data type families** explicit 
member fields of the `AggFunctionType` class. This way, we can directly 
retrieve the allowed type families from each function type and perform 
consistent, centralized validation—reducing maintenance overhead and improving 
correctness.



##########
fluss-common/src/test/java/org/apache/fluss/metadata/TableDescriptorTest.java:
##########
@@ -344,7 +345,7 @@ void testInvalidSumFunctionWithParameters() {
         Map<String, String> params = new HashMap<>();
         params.put("some_param", "value");
 
-        assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, 
params).validate())
+        assertThatThrownBy(() -> AggFunctions.of(AggFunctionType.SUM, 
params).validateParameters())

Review Comment:
   Add unit tests for the data type validation for agg function columns. 



##########
fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java:
##########
@@ -1556,4 +1556,102 @@ public void testAddAndRemoveServerTags() throws 
Exception {
                         "Server tag PERMANENT_OFFLINE not exists for server 2, 
the current "
                                 + "server tag of this server is 
TEMPORARY_OFFLINE.");
     }
+
+    @Test
+    public void testCreateTableWithInvalidAggFunctionDataType() throws 
Exception {
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_TABLE_PATH.getDatabaseName(),
+                        "test_invalid_data_type_for_aggfunction");
+        Map<String, String> propertiesAggregate = new HashMap<>();
+        propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), 
"aggregation");
+
+        Schema schema1 =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("and_value", DataTypes.STRING(), 
AggFunctions.BOOL_AND())
+                        .primaryKey("id")
+                        .build();
+        TableDescriptor t1 =
+                TableDescriptor.builder()
+                        .schema(schema1)
+                        .comment("aggregate merge engine table")
+                        .properties(propertiesAggregate)
+                        .build();
+        assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())

Review Comment:
   The integration test (IT) case is quite heavy. I suggest limiting it to only 
one or two common error scenarios. The remaining aggregate function validations 
should be moved to unit tests—such as `TableDescriptorTest`—to keep the IT 
suite fast and focused.



##########
fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java:
##########
@@ -104,18 +106,31 @@ public boolean hasParameters() {
     }
 
     /**
-     * Validates all parameters of this aggregation function.
+     * Validates all parameters and data type of this aggregation function.
      *
      * <p>This method checks that:
      *
      * <ul>
      *   <li>All parameter names are supported by the function type
      *   <li>All parameter values are valid
+     *   <li>The field data type is valid
      * </ul>
      *
-     * @throws IllegalArgumentException if any parameter is invalid
+     * @param fieldType the field data type
+     * @throws IllegalArgumentException if any parameter is invalid or data 
type is invalid
      */
-    public void validate() {
+    public void validate(DataType fieldType) {

Review Comment:
   This method looks like just validating the data types. I think we don't need 
the combined method. Just separate them into 2 public methods 
`validateParameters()` and `validateDataType(fieldType)` is more clear. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to