dawidwys commented on code in PR #25928:
URL: https://github.com/apache/flink/pull/25928#discussion_r1907380281


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Extends the {@link TypeInference} function-aware by additional system 
columns and validation.
+ *
+ * <p>During planning system columns are available and can be accessed in SQL, 
during runtime those
+ * columns are not passed or returned by the eval() method. They are handled 
with custom code paths.
+ *
+ * <p>For example, for {@link ProcessTableFunction}, this utility class 
implicitly adds the optional
+ * {@code uid} and {@code on_time} args and an additional {@code rowtime} 
column in the output.
+ * Additionally, it adds a validation layer for complex {@link 
StaticArgument}s.
+ */
+@Internal
+public class SystemTypeInference {
+
+    private static final List<StaticArgument> 
PROCESS_TABLE_FUNCTION_SYSTEM_ARGS =
+            List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true));
+
+    /** Format of unique identifiers for {@link ProcessTableFunction}. */
+    private static final Predicate<String> UID_FORMAT =
+            Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate();
+
+    public static TypeInference of(FunctionKind functionKind, TypeInference 
origin) {
+        final TypeInference.Builder builder = TypeInference.newBuilder();
+
+        final List<StaticArgument> defaultArgs =
+                applyDefaultArgs(builder, functionKind, 
origin.getStaticArguments().orElse(null));
+        builder.inputTypeStrategy(origin.getInputTypeStrategy());
+        builder.stateTypeStrategies(origin.getStateTypeStrategies());
+        builder.outputTypeStrategy(origin.getOutputTypeStrategy());
+
+        final List<StaticArgument> systemArgs = applySystemArgs(builder, 
functionKind, defaultArgs);
+        applySystemInputStrategy(builder, functionKind, systemArgs, origin);
+        applySystemOutputStrategy(builder, functionKind, origin);
+
+        return builder.build();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static @Nullable List<StaticArgument> applyDefaultArgs(

Review Comment:
   nit: is the return type necessary? We nver modify the input `defaultArgs`. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Extends the {@link TypeInference} function-aware by additional system 
columns and validation.
+ *
+ * <p>During planning system columns are available and can be accessed in SQL, 
during runtime those
+ * columns are not passed or returned by the eval() method. They are handled 
with custom code paths.
+ *
+ * <p>For example, for {@link ProcessTableFunction}, this utility class 
implicitly adds the optional
+ * {@code uid} and {@code on_time} args and an additional {@code rowtime} 
column in the output.
+ * Additionally, it adds a validation layer for complex {@link 
StaticArgument}s.
+ */
+@Internal
+public class SystemTypeInference {
+
+    private static final List<StaticArgument> 
PROCESS_TABLE_FUNCTION_SYSTEM_ARGS =
+            List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true));
+
+    /** Format of unique identifiers for {@link ProcessTableFunction}. */
+    private static final Predicate<String> UID_FORMAT =
+            Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate();
+
+    public static TypeInference of(FunctionKind functionKind, TypeInference 
origin) {
+        final TypeInference.Builder builder = TypeInference.newBuilder();
+
+        final List<StaticArgument> defaultArgs =
+                applyDefaultArgs(builder, functionKind, 
origin.getStaticArguments().orElse(null));
+        builder.inputTypeStrategy(origin.getInputTypeStrategy());
+        builder.stateTypeStrategies(origin.getStateTypeStrategies());
+        builder.outputTypeStrategy(origin.getOutputTypeStrategy());
+
+        final List<StaticArgument> systemArgs = applySystemArgs(builder, 
functionKind, defaultArgs);
+        applySystemInputStrategy(builder, functionKind, systemArgs, origin);
+        applySystemOutputStrategy(builder, functionKind, origin);
+
+        return builder.build();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static @Nullable List<StaticArgument> applyDefaultArgs(
+            TypeInference.Builder builder,
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> defaultArgs) {
+        if (defaultArgs == null) {
+            return null;
+        }
+        if (functionKind != FunctionKind.PROCESS_TABLE) {
+            checkScalarArgsOnly(defaultArgs);
+        }
+        builder.staticArguments(defaultArgs);

Review Comment:
   nit: I believe you could set it only once in `applySystemArgs`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Extends the {@link TypeInference} function-aware by additional system 
columns and validation.
+ *
+ * <p>During planning system columns are available and can be accessed in SQL, 
during runtime those
+ * columns are not passed or returned by the eval() method. They are handled 
with custom code paths.
+ *
+ * <p>For example, for {@link ProcessTableFunction}, this utility class 
implicitly adds the optional
+ * {@code uid} and {@code on_time} args and an additional {@code rowtime} 
column in the output.
+ * Additionally, it adds a validation layer for complex {@link 
StaticArgument}s.
+ */
+@Internal
+public class SystemTypeInference {
+
+    private static final List<StaticArgument> 
PROCESS_TABLE_FUNCTION_SYSTEM_ARGS =
+            List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true));
+
+    /** Format of unique identifiers for {@link ProcessTableFunction}. */
+    private static final Predicate<String> UID_FORMAT =
+            Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate();
+
+    public static TypeInference of(FunctionKind functionKind, TypeInference 
origin) {
+        final TypeInference.Builder builder = TypeInference.newBuilder();
+
+        final List<StaticArgument> defaultArgs =
+                applyDefaultArgs(builder, functionKind, 
origin.getStaticArguments().orElse(null));
+        builder.inputTypeStrategy(origin.getInputTypeStrategy());
+        builder.stateTypeStrategies(origin.getStateTypeStrategies());
+        builder.outputTypeStrategy(origin.getOutputTypeStrategy());
+
+        final List<StaticArgument> systemArgs = applySystemArgs(builder, 
functionKind, defaultArgs);
+        applySystemInputStrategy(builder, functionKind, systemArgs, origin);
+        applySystemOutputStrategy(builder, functionKind, origin);
+
+        return builder.build();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static @Nullable List<StaticArgument> applyDefaultArgs(
+            TypeInference.Builder builder,
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> defaultArgs) {
+        if (defaultArgs == null) {
+            return null;
+        }
+        if (functionKind != FunctionKind.PROCESS_TABLE) {
+            checkScalarArgsOnly(defaultArgs);
+        }
+        builder.staticArguments(defaultArgs);
+        return defaultArgs;
+    }
+
+    private static void checkScalarArgsOnly(List<StaticArgument> defaultArgs) {
+        defaultArgs.forEach(
+                arg -> {
+                    if (!arg.is(StaticArgumentTrait.SCALAR)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Only scalar arguments are supported 
at this location. "
+                                                + "But argument '%s' declared 
the following traits: %s",
+                                        arg.getName(), arg.getTraits()));
+                    }
+                });
+    }
+
+    private static @Nullable List<StaticArgument> applySystemArgs(
+            TypeInference.Builder builder,
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> defaultArgs) {
+        if (functionKind != FunctionKind.PROCESS_TABLE) {
+            return defaultArgs;
+        }
+        if (defaultArgs == null) {
+            throw new ValidationException(
+                    "Function requires a static signature that is not 
overloaded and doesn't contain varargs.");
+        }
+
+        checkReservedArgs(defaultArgs);
+
+        final List<StaticArgument> newStaticArgs = new 
ArrayList<>(defaultArgs);
+        newStaticArgs.addAll(PROCESS_TABLE_FUNCTION_SYSTEM_ARGS);
+        builder.staticArguments(newStaticArgs);
+        return newStaticArgs;
+    }
+
+    private static void checkReservedArgs(List<StaticArgument> staticArgs) {
+        final Set<String> declaredArgs =
+                
staticArgs.stream().map(StaticArgument::getName).collect(Collectors.toSet());
+        final Set<String> reservedArgs =
+                PROCESS_TABLE_FUNCTION_SYSTEM_ARGS.stream()
+                        .map(StaticArgument::getName)
+                        .collect(Collectors.toSet());
+        if (reservedArgs.stream().anyMatch(declaredArgs::contains)) {
+            throw new ValidationException(
+                    "Function signature must not declare system arguments. "
+                            + "Reserved argument names are: "
+                            + reservedArgs);
+        }
+    }
+
+    private static void applySystemInputStrategy(
+            TypeInference.Builder builder,
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> staticArgs,
+            TypeInference origin) {
+        if (functionKind != FunctionKind.PROCESS_TABLE) {
+            return;
+        }
+        builder.inputTypeStrategy(
+                new SystemInputStrategy(staticArgs, 
origin.getInputTypeStrategy()));
+    }
+
+    private static void applySystemOutputStrategy(
+            TypeInference.Builder builder, FunctionKind functionKind, 
TypeInference origin) {
+        if (functionKind != FunctionKind.TABLE && functionKind != 
FunctionKind.PROCESS_TABLE) {
+            return;
+        }
+        builder.outputTypeStrategy(new 
SystemOutputStrategy(origin.getOutputTypeStrategy()));
+    }
+
+    private static class SystemOutputStrategy implements TypeStrategy {
+
+        private final TypeStrategy origin;
+
+        private SystemOutputStrategy(TypeStrategy origin) {
+            this.origin = origin;
+        }
+
+        @Override
+        public Optional<DataType> inferType(CallContext callContext) {
+            return origin.inferType(callContext)
+                    .map(
+                            dataType -> {
+                                final List<DataType> fieldTypes =
+                                        DataType.getFieldDataTypes(dataType);
+                                final List<String> fieldNames = 
DataType.getFieldNames(dataType);
+                                final List<Field> fields = new ArrayList<>();
+                                if (fieldTypes.isEmpty()) {
+                                    // Before the system type inference was 
introduced, SQL and
+                                    // Table API chose a different default 
field name.
+                                    // EXPR$0 is chosen for best-effort 
backwards compatibility for
+                                    // SQL users.
+                                    fields.add(DataTypes.FIELD("EXPR$0", 
dataType));
+                                } else {
+                                    IntStream.range(0, fieldTypes.size())
+                                            .mapToObj(
+                                                    pos ->
+                                                            DataTypes.FIELD(
+                                                                    
fieldNames.get(pos),
+                                                                    
fieldTypes.get(pos)))
+                                            .forEach(fields::add);
+                                }
+
+                                return DataTypes.ROW(fields).notNull();
+                            });
+        }
+    }
+
+    private static class SystemInputStrategy implements InputTypeStrategy {
+
+        private final List<StaticArgument> staticArgs;
+        private final InputTypeStrategy origin;
+
+        private SystemInputStrategy(List<StaticArgument> staticArgs, 
InputTypeStrategy origin) {
+            this.staticArgs = staticArgs;
+            this.origin = origin;
+        }
+
+        @Override
+        public ArgumentCount getArgumentCount() {
+            // Static arguments declare the count
+            return InputTypeStrategies.WILDCARD.getArgumentCount();

Review Comment:
   Shouldn't you return the count of static arguments then?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Extends the {@link TypeInference} function-aware by additional system 
columns and validation.
+ *
+ * <p>During planning system columns are available and can be accessed in SQL, 
during runtime those
+ * columns are not passed or returned by the eval() method. They are handled 
with custom code paths.
+ *
+ * <p>For example, for {@link ProcessTableFunction}, this utility class 
implicitly adds the optional
+ * {@code uid} and {@code on_time} args and an additional {@code rowtime} 
column in the output.
+ * Additionally, it adds a validation layer for complex {@link 
StaticArgument}s.
+ */
+@Internal
+public class SystemTypeInference {
+
+    private static final List<StaticArgument> 
PROCESS_TABLE_FUNCTION_SYSTEM_ARGS =
+            List.of(StaticArgument.scalar("uid", DataTypes.STRING(), true));
+
+    /** Format of unique identifiers for {@link ProcessTableFunction}. */
+    private static final Predicate<String> UID_FORMAT =
+            Pattern.compile("^[a-zA-Z_][a-zA-Z-_0-9]*$").asPredicate();
+
+    public static TypeInference of(FunctionKind functionKind, TypeInference 
origin) {
+        final TypeInference.Builder builder = TypeInference.newBuilder();
+
+        final List<StaticArgument> defaultArgs =
+                applyDefaultArgs(builder, functionKind, 
origin.getStaticArguments().orElse(null));
+        builder.inputTypeStrategy(origin.getInputTypeStrategy());
+        builder.stateTypeStrategies(origin.getStateTypeStrategies());
+        builder.outputTypeStrategy(origin.getOutputTypeStrategy());
+
+        final List<StaticArgument> systemArgs = applySystemArgs(builder, 
functionKind, defaultArgs);
+        applySystemInputStrategy(builder, functionKind, systemArgs, origin);
+        applySystemOutputStrategy(builder, functionKind, origin);
+
+        return builder.build();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static @Nullable List<StaticArgument> applyDefaultArgs(
+            TypeInference.Builder builder,
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> defaultArgs) {
+        if (defaultArgs == null) {
+            return null;
+        }
+        if (functionKind != FunctionKind.PROCESS_TABLE) {
+            checkScalarArgsOnly(defaultArgs);
+        }
+        builder.staticArguments(defaultArgs);

Review Comment:
   nit: Probably a matter of taste, but you could do that for all three: 
`inputStrategy`, `outputStrategy`, `staticArguments`. First calculate them:
   
   ```
   if (functionKind == FunctionKind.PROCESS_TABLE) {
     return new SystemInput/OutputTypeStrategy(...);
   } else {
       return origin.getInput/OutputTypeStrategy()
   }
   ```
   
   and then set it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to