davidradl commented on code in PR #26806:
URL: https://github.com/apache/flink/pull/26806#discussion_r2213656108


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Input type strategy for the {@link 
BuiltInFunctionDefinitions#OBJECT_UPDATE} function.
+ *
+ * <p>This strategy validates the input arguments for updating existing fields 
in a structured type:
+ *
+ * <ul>
+ *   <li>Ensures the function has an odd number of arguments (at least 3)
+ *   <li>Validates the first argument is a non-null structured type
+ *   <li>Validates that key arguments are non-null string literals
+ *   <li>Ensures field names are not repeated in the key-value pairs
+ *   <li>Ensures field names are part of the structured type's attributes
+ *   <li>Ensures field values match the expected types defined in the 
structured type
+ * </ul>
+ *
+ * <p>The expected signature is: {@code OBJECT_UPDATE(object, key1, value1, 
key2, value2, ...)}
+ */
+@Internal
+public class ObjectUpdateInputTypeStrategy implements InputTypeStrategy {
+
+    private static final ArgumentCount AT_LEAST_THREE_AND_ODD =
+            new ArgumentCount() {
+                @Override
+                public boolean isValidCount(final int count) {
+                    return count % 2 == 1;
+                }
+
+                @Override
+                public Optional<Integer> getMinCount() {
+                    return Optional.of(3);
+                }
+
+                @Override
+                public Optional<Integer> getMaxCount() {
+                    return Optional.empty();
+                }
+            };
+
+    private static StructuredType validateObjectArgument(final DataType 
firstArgumentType) {
+        final LogicalType firstArgumentLogicalType = 
firstArgumentType.getLogicalType();
+        if (!firstArgumentLogicalType.is(LogicalTypeRoot.STRUCTURED_TYPE)) {
+            throw new ValidationException(
+                    String.format(
+                            "The first argument must be a structured type, but 
was %s.",
+                            firstArgumentLogicalType));
+        }
+        return (StructuredType) firstArgumentLogicalType;
+    }
+
+    private static void validateKeyValueArguments(
+            final CallContext callContext,
+            final List<DataType> argumentDataTypes,
+            final StructuredType structuredType) {
+        final Set<String> fieldNames = new HashSet<>();
+        final Map<String, LogicalType> attributeNameToLogicalType =
+                structuredType.getAttributes().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        StructuredAttribute::getName,
+                                        StructuredAttribute::getType));
+
+        for (int i = 1; i < argumentDataTypes.size(); i += 2) {

Review Comment:
   if there is an odd number of argumentDataTypes, would this error nicely?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to