Marios Trivyzas created FLINK-26453: ---------------------------------------
Summary: execution.allow-client-job-configurations not checked for executeAsync Key: FLINK-26453 URL: https://issues.apache.org/jira/browse/FLINK-26453 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.15.0 Reporter: Marios Trivyzas Assignee: Fabian Paul * *checkNotAllowedConfigurations()* should be called by {*}{*}{*}StreamContextEnvironment#executeAsync(){*} * Description of the *DeploymentOption* should be more clear, and it's not only checked by application mode. * When using a combination of TableAPI and DataStreamApi, the check for overriding config options is not applied: Modified code of StreamSQLExample to pass extra config {noformat} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf = new Configuration(); conf.set( ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER, ExecutionConfigOptions.NotNullEnforcer.DROP); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); final DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final DataStream<Order> orderB = env.fromCollection( Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert the first DataStream to a Table object // it will be used "inline" and is not registered in a catalog final Table tableA = tableEnv.fromDataStream(orderA); // convert the second DataStream and register it as a view // it will be accessible under a name tableEnv.createTemporaryView("TableB", orderB); // union the two tables final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM TableB WHERE amount < 2"); // convert the Table back to an insert-only DataStream of type `Order` tableEnv.toDataStream(result, Order.class).print(); // after the table program is converted to a DataStream program, // we must use `env.execute()` to submit the job env.execute(); {noformat} ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in flink-conf.yaml and yet no exception is thrown, that is because in StreamTableEnvironmentImpl: {noformat} public static StreamTableEnvironment create( StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {{noformat} we use the {noformat} public static Executor lookupExecutor( ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {{noformat} so we don't follow the same path to call the StreamContextEnvironment#setAsContext which checks for overriding options depending on the new flag. -- This message was sent by Atlassian Jira (v8.20.1#820001)