dawidwys commented on code in PR #25928: URL: https://github.com/apache/flink/pull/25928#discussion_r1907380281
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Extends the {@link TypeInference} function-aware by additional system columns and validation. + * + * <p>During planning system columns are available and can be accessed in SQL, during runtime those + * columns are not passed or returned by the eval() method. They are handled with custom code paths. + * + * <p>For example, for {@link ProcessTableFunction}, this utility class implicitly adds the optional + * {@code uid} and {@code on_time} args and an additional {@code rowtime} column in the output. + * Additionally, it adds a validation layer for complex {@link StaticArgument}s. + */ +@Internal +public class SystemTypeInference { + + private static final List<StaticArgument> PROCESS_TABLE_FUNCTION_SYSTEM_ARGS = + List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true)); + + /** Format of unique identifiers for {@link ProcessTableFunction}. */ + private static final Predicate<String> UID_FORMAT = + Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate(); + + public static TypeInference of(FunctionKind functionKind, TypeInference origin) { + final TypeInference.Builder builder = TypeInference.newBuilder(); + + final List<StaticArgument> defaultArgs = + applyDefaultArgs(builder, functionKind, origin.getStaticArguments().orElse(null)); + builder.inputTypeStrategy(origin.getInputTypeStrategy()); + builder.stateTypeStrategies(origin.getStateTypeStrategies()); + builder.outputTypeStrategy(origin.getOutputTypeStrategy()); + + final List<StaticArgument> systemArgs = applySystemArgs(builder, functionKind, defaultArgs); + applySystemInputStrategy(builder, functionKind, systemArgs, origin); + applySystemOutputStrategy(builder, functionKind, origin); + + return builder.build(); + } + + // -------------------------------------------------------------------------------------------- + + private static @Nullable List<StaticArgument> applyDefaultArgs( Review Comment: nit: is the return type necessary? We nver modify the input `defaultArgs`. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Extends the {@link TypeInference} function-aware by additional system columns and validation. + * + * <p>During planning system columns are available and can be accessed in SQL, during runtime those + * columns are not passed or returned by the eval() method. They are handled with custom code paths. + * + * <p>For example, for {@link ProcessTableFunction}, this utility class implicitly adds the optional + * {@code uid} and {@code on_time} args and an additional {@code rowtime} column in the output. + * Additionally, it adds a validation layer for complex {@link StaticArgument}s. + */ +@Internal +public class SystemTypeInference { + + private static final List<StaticArgument> PROCESS_TABLE_FUNCTION_SYSTEM_ARGS = + List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true)); + + /** Format of unique identifiers for {@link ProcessTableFunction}. */ + private static final Predicate<String> UID_FORMAT = + Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate(); + + public static TypeInference of(FunctionKind functionKind, TypeInference origin) { + final TypeInference.Builder builder = TypeInference.newBuilder(); + + final List<StaticArgument> defaultArgs = + applyDefaultArgs(builder, functionKind, origin.getStaticArguments().orElse(null)); + builder.inputTypeStrategy(origin.getInputTypeStrategy()); + builder.stateTypeStrategies(origin.getStateTypeStrategies()); + builder.outputTypeStrategy(origin.getOutputTypeStrategy()); + + final List<StaticArgument> systemArgs = applySystemArgs(builder, functionKind, defaultArgs); + applySystemInputStrategy(builder, functionKind, systemArgs, origin); + applySystemOutputStrategy(builder, functionKind, origin); + + return builder.build(); + } + + // -------------------------------------------------------------------------------------------- + + private static @Nullable List<StaticArgument> applyDefaultArgs( + TypeInference.Builder builder, + FunctionKind functionKind, + @Nullable List<StaticArgument> defaultArgs) { + if (defaultArgs == null) { + return null; + } + if (functionKind != FunctionKind.PROCESS_TABLE) { + checkScalarArgsOnly(defaultArgs); + } + builder.staticArguments(defaultArgs); Review Comment: nit: I believe you could set it only once in `applySystemArgs` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Extends the {@link TypeInference} function-aware by additional system columns and validation. + * + * <p>During planning system columns are available and can be accessed in SQL, during runtime those + * columns are not passed or returned by the eval() method. They are handled with custom code paths. + * + * <p>For example, for {@link ProcessTableFunction}, this utility class implicitly adds the optional + * {@code uid} and {@code on_time} args and an additional {@code rowtime} column in the output. + * Additionally, it adds a validation layer for complex {@link StaticArgument}s. + */ +@Internal +public class SystemTypeInference { + + private static final List<StaticArgument> PROCESS_TABLE_FUNCTION_SYSTEM_ARGS = + List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true)); + + /** Format of unique identifiers for {@link ProcessTableFunction}. */ + private static final Predicate<String> UID_FORMAT = + Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate(); + + public static TypeInference of(FunctionKind functionKind, TypeInference origin) { + final TypeInference.Builder builder = TypeInference.newBuilder(); + + final List<StaticArgument> defaultArgs = + applyDefaultArgs(builder, functionKind, origin.getStaticArguments().orElse(null)); + builder.inputTypeStrategy(origin.getInputTypeStrategy()); + builder.stateTypeStrategies(origin.getStateTypeStrategies()); + builder.outputTypeStrategy(origin.getOutputTypeStrategy()); + + final List<StaticArgument> systemArgs = applySystemArgs(builder, functionKind, defaultArgs); + applySystemInputStrategy(builder, functionKind, systemArgs, origin); + applySystemOutputStrategy(builder, functionKind, origin); + + return builder.build(); + } + + // -------------------------------------------------------------------------------------------- + + private static @Nullable List<StaticArgument> applyDefaultArgs( + TypeInference.Builder builder, + FunctionKind functionKind, + @Nullable List<StaticArgument> defaultArgs) { + if (defaultArgs == null) { + return null; + } + if (functionKind != FunctionKind.PROCESS_TABLE) { + checkScalarArgsOnly(defaultArgs); + } + builder.staticArguments(defaultArgs); + return defaultArgs; + } + + private static void checkScalarArgsOnly(List<StaticArgument> defaultArgs) { + defaultArgs.forEach( + arg -> { + if (!arg.is(StaticArgumentTrait.SCALAR)) { + throw new ValidationException( + String.format( + "Only scalar arguments are supported at this location. " + + "But argument '%s' declared the following traits: %s", + arg.getName(), arg.getTraits())); + } + }); + } + + private static @Nullable List<StaticArgument> applySystemArgs( + TypeInference.Builder builder, + FunctionKind functionKind, + @Nullable List<StaticArgument> defaultArgs) { + if (functionKind != FunctionKind.PROCESS_TABLE) { + return defaultArgs; + } + if (defaultArgs == null) { + throw new ValidationException( + "Function requires a static signature that is not overloaded and doesn't contain varargs."); + } + + checkReservedArgs(defaultArgs); + + final List<StaticArgument> newStaticArgs = new ArrayList<>(defaultArgs); + newStaticArgs.addAll(PROCESS_TABLE_FUNCTION_SYSTEM_ARGS); + builder.staticArguments(newStaticArgs); + return newStaticArgs; + } + + private static void checkReservedArgs(List<StaticArgument> staticArgs) { + final Set<String> declaredArgs = + staticArgs.stream().map(StaticArgument::getName).collect(Collectors.toSet()); + final Set<String> reservedArgs = + PROCESS_TABLE_FUNCTION_SYSTEM_ARGS.stream() + .map(StaticArgument::getName) + .collect(Collectors.toSet()); + if (reservedArgs.stream().anyMatch(declaredArgs::contains)) { + throw new ValidationException( + "Function signature must not declare system arguments. " + + "Reserved argument names are: " + + reservedArgs); + } + } + + private static void applySystemInputStrategy( + TypeInference.Builder builder, + FunctionKind functionKind, + @Nullable List<StaticArgument> staticArgs, + TypeInference origin) { + if (functionKind != FunctionKind.PROCESS_TABLE) { + return; + } + builder.inputTypeStrategy( + new SystemInputStrategy(staticArgs, origin.getInputTypeStrategy())); + } + + private static void applySystemOutputStrategy( + TypeInference.Builder builder, FunctionKind functionKind, TypeInference origin) { + if (functionKind != FunctionKind.TABLE && functionKind != FunctionKind.PROCESS_TABLE) { + return; + } + builder.outputTypeStrategy(new SystemOutputStrategy(origin.getOutputTypeStrategy())); + } + + private static class SystemOutputStrategy implements TypeStrategy { + + private final TypeStrategy origin; + + private SystemOutputStrategy(TypeStrategy origin) { + this.origin = origin; + } + + @Override + public Optional<DataType> inferType(CallContext callContext) { + return origin.inferType(callContext) + .map( + dataType -> { + final List<DataType> fieldTypes = + DataType.getFieldDataTypes(dataType); + final List<String> fieldNames = DataType.getFieldNames(dataType); + final List<Field> fields = new ArrayList<>(); + if (fieldTypes.isEmpty()) { + // Before the system type inference was introduced, SQL and + // Table API chose a different default field name. + // EXPR$0 is chosen for best-effort backwards compatibility for + // SQL users. + fields.add(DataTypes.FIELD("EXPR$0", dataType)); + } else { + IntStream.range(0, fieldTypes.size()) + .mapToObj( + pos -> + DataTypes.FIELD( + fieldNames.get(pos), + fieldTypes.get(pos))) + .forEach(fields::add); + } + + return DataTypes.ROW(fields).notNull(); + }); + } + } + + private static class SystemInputStrategy implements InputTypeStrategy { + + private final List<StaticArgument> staticArgs; + private final InputTypeStrategy origin; + + private SystemInputStrategy(List<StaticArgument> staticArgs, InputTypeStrategy origin) { + this.staticArgs = staticArgs; + this.origin = origin; + } + + @Override + public ArgumentCount getArgumentCount() { + // Static arguments declare the count + return InputTypeStrategies.WILDCARD.getArgumentCount(); Review Comment: Shouldn't you return the count of static arguments then? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Extends the {@link TypeInference} function-aware by additional system columns and validation. + * + * <p>During planning system columns are available and can be accessed in SQL, during runtime those + * columns are not passed or returned by the eval() method. They are handled with custom code paths. + * + * <p>For example, for {@link ProcessTableFunction}, this utility class implicitly adds the optional + * {@code uid} and {@code on_time} args and an additional {@code rowtime} column in the output. + * Additionally, it adds a validation layer for complex {@link StaticArgument}s. + */ +@Internal +public class SystemTypeInference { + + private static final List<StaticArgument> PROCESS_TABLE_FUNCTION_SYSTEM_ARGS = + List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true)); + + /** Format of unique identifiers for {@link ProcessTableFunction}. */ + private static final Predicate<String> UID_FORMAT = + Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate(); + + public static TypeInference of(FunctionKind functionKind, TypeInference origin) { + final TypeInference.Builder builder = TypeInference.newBuilder(); + + final List<StaticArgument> defaultArgs = + applyDefaultArgs(builder, functionKind, origin.getStaticArguments().orElse(null)); + builder.inputTypeStrategy(origin.getInputTypeStrategy()); + builder.stateTypeStrategies(origin.getStateTypeStrategies()); + builder.outputTypeStrategy(origin.getOutputTypeStrategy()); + + final List<StaticArgument> systemArgs = applySystemArgs(builder, functionKind, defaultArgs); + applySystemInputStrategy(builder, functionKind, systemArgs, origin); + applySystemOutputStrategy(builder, functionKind, origin); + + return builder.build(); + } + + // -------------------------------------------------------------------------------------------- + + private static @Nullable List<StaticArgument> applyDefaultArgs( + TypeInference.Builder builder, + FunctionKind functionKind, + @Nullable List<StaticArgument> defaultArgs) { + if (defaultArgs == null) { + return null; + } + if (functionKind != FunctionKind.PROCESS_TABLE) { + checkScalarArgsOnly(defaultArgs); + } + builder.staticArguments(defaultArgs); Review Comment: nit: Probably a matter of taste, but you could do that for all three: `inputStrategy`, `outputStrategy`, `staticArguments`. First calculate them: ``` if (functionKind == FunctionKind.PROCESS_TABLE) { return new SystemInput/OutputTypeStrategy(...); } else { return origin.getInput/OutputTypeStrategy() } ``` and then set it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
