twalthr commented on code in PR #26704: URL: https://github.com/apache/flink/pull/26704#discussion_r2207789358
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts. + * + * <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring: + * + * <ul> + * <li>The argument count is odd (className + pairs of key-value arguments) + * <li>The first argument is a non-null STRING/VARCHAR representing the class name + * <li>All key arguments (odd positions after the first) are non-null STRING/VARCHAR types + * <li>Field names are unique across all key-value pairs + * <li>Value arguments (even positions after the first) can be any type + * </ul> + * + * <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2, + * ...)} + * + * <p>Example valid calls: + * + * <ul> + * <li>{@code OBJECT_OF('com.example.User')} - empty object + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice')} - single field + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} - multiple fields + * </ul> + * + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF + * @see ObjectOfTypeStrategy + */ +public class ObjectOfInputTypeStrategy implements InputTypeStrategy { + + private static final ArgumentCount AT_LEAST_ONE_ODD = + new ArgumentCount() { + @Override + public boolean isValidCount(final int count) { + return count % 2 == 1; + } + + @Override + public Optional<Integer> getMinCount() { + return Optional.of(1); + } + + @Override + public Optional<Integer> getMaxCount() { + return Optional.empty(); + } + }; + + private static void validateClassArgument(final DataType firstArgumentDataType) { + final LogicalType classArgumentType = firstArgumentDataType.getLogicalType(); + + final String errorMessage = + "The first argument must be a non-nullable character string representing the class name."; + if (classArgumentType.isNullable() + || !classArgumentType.is(LogicalTypeFamily.CHARACTER_STRING)) { Review Comment: This validation would fail for `objectOf(someField, 'name', 'Bob', 'age', 42)`. The input type inference must ensure that the argument is a literal. And `callContext.getArgumentValue(0, String.class)` is not empty. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts. + * + * <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring: + * + * <ul> + * <li>The argument count is odd (className + pairs of key-value arguments) + * <li>The first argument is a non-null STRING/VARCHAR representing the class name + * <li>All key arguments (odd positions after the first) are non-null STRING/VARCHAR types + * <li>Field names are unique across all key-value pairs + * <li>Value arguments (even positions after the first) can be any type + * </ul> + * + * <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2, + * ...)} + * + * <p>Example valid calls: + * + * <ul> + * <li>{@code OBJECT_OF('com.example.User')} - empty object + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice')} - single field + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} - multiple fields + * </ul> + * + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF + * @see ObjectOfTypeStrategy + */ +public class ObjectOfInputTypeStrategy implements InputTypeStrategy { + + private static final ArgumentCount AT_LEAST_ONE_ODD = + new ArgumentCount() { + @Override + public boolean isValidCount(final int count) { + return count % 2 == 1; + } + + @Override + public Optional<Integer> getMinCount() { + return Optional.of(1); + } + + @Override + public Optional<Integer> getMaxCount() { + return Optional.empty(); + } + }; + + private static void validateClassArgument(final DataType firstArgumentDataType) { + final LogicalType classArgumentType = firstArgumentDataType.getLogicalType(); + + final String errorMessage = + "The first argument must be a non-nullable character string representing the class name."; + if (classArgumentType.isNullable() + || !classArgumentType.is(LogicalTypeFamily.CHARACTER_STRING)) { + throw new ValidationException(errorMessage); + } + } + + private static void validateKeyArguments( + final CallContext callContext, final List<DataType> argumentDataTypes) { + final Set<String> fieldNames = new HashSet<>(); + for (int i = 1; i < argumentDataTypes.size(); i += 2) { + final LogicalType fieldNameLogicalType = argumentDataTypes.get(i).getLogicalType(); + validateFieldNameArgument(callContext, i, fieldNameLogicalType, fieldNames); + } + } + + private static void validateFieldNameArgument( + final CallContext callContext, + final int idx, + final LogicalType logicalType, + final Set<String> fieldNames) { + final int keyIndex = idx + 1; + final boolean nullable = logicalType.isNullable(); + + if (nullable || !logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { + final String nullableType = nullable ? "nullable" : "non-nullable"; + final String message = + String.format( + "The field key at position %d must be a non-nullable STRING/VARCHAR type, but was %s %s.", + keyIndex, nullableType, logicalType.asSummaryString()); + throw new ValidationException(message); + } + + final String fieldName = + callContext + .getArgumentValue(idx, String.class) + .orElseThrow(IllegalStateException::new); + + if (!fieldNames.add(fieldName)) { + final String message = + String.format( + "The field name '%s' at position %d is repeated.", fieldName, keyIndex); + throw new ValidationException(message); + } + } + + @Override + public ArgumentCount getArgumentCount() { + return AT_LEAST_ONE_ODD; + } + + @Override + public Optional<List<DataType>> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + try { + validateClassArgument(argumentDataTypes.get(0)); + validateKeyArguments(callContext, argumentDataTypes); + } catch (ValidationException e) { + callContext.fail(throwOnFailure, e.getMessage(), argumentDataTypes); Review Comment: ```suggestion callContext.fail(throwOnFailure, e.getMessage()); ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.StructuredType; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@code OBJECT_OF} function that infers the output type as a structured + * type. + * + * <p>This strategy creates a {@link DataTypes#STRUCTURED} type based on the provided class name and + * key-value pairs. The function signature is: {@code OBJECT_OF(className, key1, value1, key2, + * value2, ...)} + * + * <p>The strategy performs the following operations: + * + * <ul> + * <li>Extracts the class name from the first argument (must be a non-null string literal) + * <li>Processes key-value pairs starting from the second argument + * <li>Extracts field names (keys) from odd-positioned arguments (indices 1, 3, 5, ...) + * </ul> + * + * <p><b>Examples:</b> + * + * <ul> + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} → {@code + * STRUCTURED<com.example.User>(name CHAR(5), age INT)} + * <li>{@code OBJECT_OF('com.example.Point', 'x', 1.5, 'y', 2.0)} → {@code + * STRUCTURED<com.example.Point>(x DOUBLE, y DOUBLE)} + * </ul> + * + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF + * @see ObjectOfInputTypeStrategy + */ +public class ObjectOfTypeStrategy implements TypeStrategy { + + private static DataType toStructuredType( + final String className, final CallContext callContext) { + final DataTypeFactory dataTypeFactory = callContext.getDataTypeFactory(); + final ClassLoader classLoader = dataTypeFactory.getClassLoader(); + final Optional<Class<?>> resolveClass = StructuredType.resolveClass(classLoader, className); + return resolveClass + .map(dataTypeFactory::createDataType) Review Comment: We don't need another pass through through reflective analysis. A `DataTypes.STRUCTURED(Class, )` should do the job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
