twalthr commented on code in PR #26806: URL: https://github.com/apache/flink/pull/26806#discussion_r2227818783
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Input type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy validates the input arguments for updating existing fields in a structured type: + * + * <ul> + * <li>Ensures the function has an odd number of arguments (at least 3) + * <li>Validates the first argument is a structured type + * <li>Validates that key arguments are non-null string literals + * <li>Ensures field names are not repeated in the key-value pairs + * <li>Ensures field names are part of the structured type's attributes + * <li>Ensures field values match the expected types defined in the structured type Review Comment: ```suggestion ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy infers the return type for the OBJECT_UPDATE function by: + * + * <ul> + * <li>Extracting the field definitions from the input structured type + * <li>Resolving the class from the structured type Review Comment: ```suggestion ``` ########## docs/data/sql_functions.yml: ########## @@ -1219,6 +1219,35 @@ valueconstruction: - table: NUMERIC.rows description: Creates a NUMERIC interval of rows (commonly used in window creation). +valuemodification: + - sql: OBJECT_UPDATE(object, key, value [, key, value , ...]) + table: OBJECT.objectUpdate(key, value [, key, value , ...]) + description: | + Updates existing fields in a structured object by providing key-value pairs. + + This function takes a structured object and updates specified fields with new values. + The keys must be string literals that correspond to existing fields in the structured type. + If a key does not exist in the input object, an exception will be thrown. + If the value type is not compatible with the corresponding structured field type, + an exception will also be thrown. + + The function expects alternating key-value pairs where keys are field names (non-null strings) + and values are the new values for those fields. At least one key-value pair must be provided. + The total number of arguments must be odd (object + pairs of key-value arguments). + + The result type is the same structured type as the input, with the specified fields Review Comment: ```suggestion The result type is the same structured type class, with the specified fields ``` ########## docs/data/sql_functions.yml: ########## @@ -1219,6 +1219,35 @@ valueconstruction: - table: NUMERIC.rows description: Creates a NUMERIC interval of rows (commonly used in window creation). +valuemodification: + - sql: OBJECT_UPDATE(object, key, value [, key, value , ...]) + table: OBJECT.objectUpdate(key, value [, key, value , ...]) + description: | + Updates existing fields in a structured object by providing key-value pairs. + + This function takes a structured object and updates specified fields with new values. + The keys must be string literals that correspond to existing fields in the structured type. + If a key does not exist in the input object, an exception will be thrown. + If the value type is not compatible with the corresponding structured field type, Review Comment: update this sentence everywhere ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectUpdateFunction.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** Implementation of {@link BuiltInFunctionDefinitions#OBJECT_UPDATE}. */ +@Internal +public class ObjectUpdateFunction extends BuiltInScalarFunction { + + private Map<String, Integer> fieldNameToRowPosIndex = new HashMap<>(); + + public ObjectUpdateFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.OBJECT_UPDATE, context); + this.fieldNameToRowPosIndex = createIndex(context); + } + + public RowData eval(RowData rowData, Object... fieldNameAndValuePairs) { + GenericRowData updatedRow = (GenericRowData) rowData; Review Comment: return null if rowData is null. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java: ########## @@ -197,6 +202,101 @@ private static Stream<TestSetSpec> objectOfTestCases() { "The first argument must be a non-nullable character string literal representing the class name.")); } + private static Stream<TestSetSpec> objectUpdateTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_UPDATE) + .onFieldsWithData(42, "Bob") + .andDataTypes(DataTypes.INT(), DataTypes.STRING()) + .withFunction(Type1.Type1Constructor.class) + .withFunction(Type2.Type2Constructor.class) + .withFunction(NestedType.NestedConstructor.class) + // Test update all fields equality + .testResult( + call("Type1Constructor", $("f0"), $("f1")) + .objectUpdate("a", 16, "b", "Alice"), + "OBJECT_UPDATE(OBJECT_OF('" + + Type1.class.getName() + + "', 'a', f0, 'b', f1), 'a', 16, 'b', 'Alice')", + Type1.of(16, "Alice"), + DataTypes.STRUCTURED( + Type1.class.getName(), + DataTypes.FIELD("a", DataTypes.INT().notNull()), + DataTypes.FIELD("b", DataTypes.CHAR(5).notNull()))) + // Test update single field + .testResult( + objectOf(Type1.class, "a", 42, "b", "Bob") + .objectUpdate("b", "Alice"), + "OBJECT_UPDATE(OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'), 'b', 'Alice')", + Type1.of(42, "Alice"), + DataTypes.STRUCTURED( + Type1.class.getName(), + DataTypes.FIELD("a", DataTypes.INT().notNull()), + DataTypes.FIELD("b", DataTypes.CHAR(5).notNull()))) + // Test nested structured types + .testResult( + objectOf( + NestedType.class, + "n1", + call("Type1Constructor", $("f0"), $("f1")), + "n2", + call("Type2Constructor", 15, "Alice")) + .objectUpdate( + "n1", + objectOf(Type1.class, "a", 16, "b", "UpdatedBob")), + "OBJECT_UPDATE(OBJECT_OF('" + + NestedType.class.getName() + + "', 'n1', Type1Constructor(f0, f1), 'n2', Type2Constructor(15, 'Alice')), " + + "'n1', OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 16, 'b', 'UpdatedBob'))", + NestedType.of(Type1.of(16, "UpdatedBob"), Type2.of(15, "Alice")), + DataTypes.STRUCTURED( + NestedType.class.getName(), + DataTypes.FIELD( + "n1", + DataTypes.STRUCTURED( + Type1.class.getName(), + DataTypes.FIELD( + "a", DataTypes.INT().notNull()), + DataTypes.FIELD( + "b", + DataTypes.CHAR(10).notNull()))), + DataTypes.FIELD( + "n2", + DataTypes.STRUCTURED( + Type2.class.getName(), + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()))))) + // Test when class not found + .testSqlResult( + "OBJECT_UPDATE(OBJECT_OF('not.existing.clazz', 'a', 42, 'b', 'Bob'), 'b', 'Alice')", + Row.of(42, "Alice"), + DataTypes.STRUCTURED( + "not.existing.clazz", + DataTypes.FIELD("a", DataTypes.INT().notNull()), + DataTypes.FIELD("b", DataTypes.CHAR(5).notNull()))) + // Test update field to null + .testResult( + objectOf(Type1.class, "a", 42, "b", "Bob") Review Comment: add a test where the object itself is null, e.g. `OBJECT_UPDATED(CAST(NULL AS STRUCTURED<>), ...)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
