twalthr commented on code in PR #26806:
URL: https://github.com/apache/flink/pull/26806#discussion_r2227818783


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Input type strategy for the {@link 
BuiltInFunctionDefinitions#OBJECT_UPDATE} function.
+ *
+ * <p>This strategy validates the input arguments for updating existing fields 
in a structured type:
+ *
+ * <ul>
+ *   <li>Ensures the function has an odd number of arguments (at least 3)
+ *   <li>Validates the first argument is a structured type
+ *   <li>Validates that key arguments are non-null string literals
+ *   <li>Ensures field names are not repeated in the key-value pairs
+ *   <li>Ensures field names are part of the structured type's attributes
+ *   <li>Ensures field values match the expected types defined in the 
structured type

Review Comment:
   ```suggestion
   
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} 
function.
+ *
+ * <p>This strategy infers the return type for the OBJECT_UPDATE function by:
+ *
+ * <ul>
+ *   <li>Extracting the field definitions from the input structured type
+ *   <li>Resolving the class from the structured type

Review Comment:
   ```suggestion
   
   ```



##########
docs/data/sql_functions.yml:
##########
@@ -1219,6 +1219,35 @@ valueconstruction:
   - table: NUMERIC.rows
     description: Creates a NUMERIC interval of rows (commonly used in window 
creation).
 
+valuemodification:
+  - sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
+    table: OBJECT.objectUpdate(key, value [, key, value , ...])
+    description: |
+      Updates existing fields in a structured object by providing key-value 
pairs.
+      
+      This function takes a structured object and updates specified fields 
with new values.
+      The keys must be string literals that correspond to existing fields in 
the structured type.
+      If a key does not exist in the input object, an exception will be thrown.
+      If the value type is not compatible with the corresponding structured 
field type, 
+      an exception will also be thrown.
+      
+      The function expects alternating key-value pairs where keys are field 
names (non-null strings) 
+      and values are the new values for those fields. At least one key-value 
pair must be provided.
+      The total number of arguments must be odd (object + pairs of key-value 
arguments).
+      
+      The result type is the same structured type as the input, with the 
specified fields

Review Comment:
   ```suggestion
         The result type is the same structured type class, with the specified 
fields
   ```



##########
docs/data/sql_functions.yml:
##########
@@ -1219,6 +1219,35 @@ valueconstruction:
   - table: NUMERIC.rows
     description: Creates a NUMERIC interval of rows (commonly used in window 
creation).
 
+valuemodification:
+  - sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
+    table: OBJECT.objectUpdate(key, value [, key, value , ...])
+    description: |
+      Updates existing fields in a structured object by providing key-value 
pairs.
+      
+      This function takes a structured object and updates specified fields 
with new values.
+      The keys must be string literals that correspond to existing fields in 
the structured type.
+      If a key does not exist in the input object, an exception will be thrown.
+      If the value type is not compatible with the corresponding structured 
field type, 

Review Comment:
   update this sentence everywhere



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectUpdateFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#OBJECT_UPDATE}. */
+@Internal
+public class ObjectUpdateFunction extends BuiltInScalarFunction {
+
+    private Map<String, Integer> fieldNameToRowPosIndex = new HashMap<>();
+
+    public ObjectUpdateFunction(SpecializedFunction.SpecializedContext 
context) {
+        super(BuiltInFunctionDefinitions.OBJECT_UPDATE, context);
+        this.fieldNameToRowPosIndex = createIndex(context);
+    }
+
+    public RowData eval(RowData rowData, Object... fieldNameAndValuePairs) {
+        GenericRowData updatedRow = (GenericRowData) rowData;

Review Comment:
   return null if rowData is null.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java:
##########
@@ -197,6 +202,101 @@ private static Stream<TestSetSpec> objectOfTestCases() {
                                 "The first argument must be a non-nullable 
character string literal representing the class name."));
     }
 
+    private static Stream<TestSetSpec> objectUpdateTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_UPDATE)
+                        .onFieldsWithData(42, "Bob")
+                        .andDataTypes(DataTypes.INT(), DataTypes.STRING())
+                        .withFunction(Type1.Type1Constructor.class)
+                        .withFunction(Type2.Type2Constructor.class)
+                        .withFunction(NestedType.NestedConstructor.class)
+                        // Test update all fields equality
+                        .testResult(
+                                call("Type1Constructor", $("f0"), $("f1"))
+                                        .objectUpdate("a", 16, "b", "Alice"),
+                                "OBJECT_UPDATE(OBJECT_OF('"
+                                        + Type1.class.getName()
+                                        + "', 'a', f0, 'b', f1), 'a', 16, 'b', 
'Alice')",
+                                Type1.of(16, "Alice"),
+                                DataTypes.STRUCTURED(
+                                        Type1.class.getName(),
+                                        DataTypes.FIELD("a", 
DataTypes.INT().notNull()),
+                                        DataTypes.FIELD("b", 
DataTypes.CHAR(5).notNull())))
+                        // Test update single field
+                        .testResult(
+                                objectOf(Type1.class, "a", 42, "b", "Bob")
+                                        .objectUpdate("b", "Alice"),
+                                "OBJECT_UPDATE(OBJECT_OF('"
+                                        + Type1.class.getName()
+                                        + "', 'a', 42, 'b', 'Bob'), 'b', 
'Alice')",
+                                Type1.of(42, "Alice"),
+                                DataTypes.STRUCTURED(
+                                        Type1.class.getName(),
+                                        DataTypes.FIELD("a", 
DataTypes.INT().notNull()),
+                                        DataTypes.FIELD("b", 
DataTypes.CHAR(5).notNull())))
+                        // Test nested structured types
+                        .testResult(
+                                objectOf(
+                                                NestedType.class,
+                                                "n1",
+                                                call("Type1Constructor", 
$("f0"), $("f1")),
+                                                "n2",
+                                                call("Type2Constructor", 15, 
"Alice"))
+                                        .objectUpdate(
+                                                "n1",
+                                                objectOf(Type1.class, "a", 16, 
"b", "UpdatedBob")),
+                                "OBJECT_UPDATE(OBJECT_OF('"
+                                        + NestedType.class.getName()
+                                        + "', 'n1', Type1Constructor(f0, f1), 
'n2', Type2Constructor(15, 'Alice')), "
+                                        + "'n1', OBJECT_OF('"
+                                        + Type1.class.getName()
+                                        + "', 'a', 16, 'b', 'UpdatedBob'))",
+                                NestedType.of(Type1.of(16, "UpdatedBob"), 
Type2.of(15, "Alice")),
+                                DataTypes.STRUCTURED(
+                                        NestedType.class.getName(),
+                                        DataTypes.FIELD(
+                                                "n1",
+                                                DataTypes.STRUCTURED(
+                                                        Type1.class.getName(),
+                                                        DataTypes.FIELD(
+                                                                "a", 
DataTypes.INT().notNull()),
+                                                        DataTypes.FIELD(
+                                                                "b",
+                                                                
DataTypes.CHAR(10).notNull()))),
+                                        DataTypes.FIELD(
+                                                "n2",
+                                                DataTypes.STRUCTURED(
+                                                        Type2.class.getName(),
+                                                        DataTypes.FIELD("a", 
DataTypes.INT()),
+                                                        DataTypes.FIELD("b", 
DataTypes.STRING())))))
+                        // Test when class not found
+                        .testSqlResult(
+                                "OBJECT_UPDATE(OBJECT_OF('not.existing.clazz', 
'a', 42, 'b', 'Bob'), 'b', 'Alice')",
+                                Row.of(42, "Alice"),
+                                DataTypes.STRUCTURED(
+                                        "not.existing.clazz",
+                                        DataTypes.FIELD("a", 
DataTypes.INT().notNull()),
+                                        DataTypes.FIELD("b", 
DataTypes.CHAR(5).notNull())))
+                        // Test update field to null
+                        .testResult(
+                                objectOf(Type1.class, "a", 42, "b", "Bob")

Review Comment:
   add a test where the object itself is null, e.g. `OBJECT_UPDATED(CAST(NULL 
AS STRUCTURED<>), ...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to