[ https://issues.apache.org/jira/browse/FLINK-26453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marios Trivyzas updated FLINK-26453: ------------------------------------ Description: * *checkNotAllowedConfigurations()* should be called by *StreamContextEnvironment#executeAsync()* * Description of the *DeploymentOption* should be more clear, and it's not only checked by application mode. * When using a configuration which is the same as the one in flink-conf.yaml Modified code of StreamSQLExample to pass extra config {noformat} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf = new Configuration(); conf.set( ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER, ExecutionConfigOptions.NotNullEnforcer.DROP); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); final DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final DataStream<Order> orderB = env.fromCollection( Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert the first DataStream to a Table object // it will be used "inline" and is not registered in a catalog final Table tableA = tableEnv.fromDataStream(orderA); // convert the second DataStream and register it as a view // it will be accessible under a name tableEnv.createTemporaryView("TableB", orderB); // union the two tables final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM TableB WHERE amount < 2"); // convert the Table back to an insert-only DataStream of type `Order` tableEnv.toDataStream(result, Order.class).print(); // after the table program is converted to a DataStream program, // we must use `env.execute()` to submit the job env.execute(); {noformat} ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in flink-conf.yaml and yet no exception is thrown, that is because in StreamTableEnvironmentImpl: {noformat} public static StreamTableEnvironment create( StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {{noformat} we use the {noformat} public static Executor lookupExecutor( ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {{noformat} so we don't follow the same path to call the StreamContextEnvironment#setAsContext which checks for overriding options depending on the new flag. was: * *checkNotAllowedConfigurations()* should be called by {*}{*}{*}StreamContextEnvironment#executeAsync(){*} * Description of the *DeploymentOption* should be more clear, and it's not only checked by application mode. * When using a combination of TableAPI and DataStreamApi, the check for overriding config options is not applied: Modified code of StreamSQLExample to pass extra config {noformat} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf = new Configuration(); conf.set( ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER, ExecutionConfigOptions.NotNullEnforcer.DROP); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); final DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final DataStream<Order> orderB = env.fromCollection( Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); // convert the first DataStream to a Table object // it will be used "inline" and is not registered in a catalog final Table tableA = tableEnv.fromDataStream(orderA); // convert the second DataStream and register it as a view // it will be accessible under a name tableEnv.createTemporaryView("TableB", orderB); // union the two tables final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " + "SELECT * FROM TableB WHERE amount < 2"); // convert the Table back to an insert-only DataStream of type `Order` tableEnv.toDataStream(result, Order.class).print(); // after the table program is converted to a DataStream program, // we must use `env.execute()` to submit the job env.execute(); {noformat} ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in flink-conf.yaml and yet no exception is thrown, that is because in StreamTableEnvironmentImpl: {noformat} public static StreamTableEnvironment create( StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {{noformat} we use the {noformat} public static Executor lookupExecutor( ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {{noformat} so we don't follow the same path to call the StreamContextEnvironment#setAsContext which checks for overriding options depending on the new flag. > execution.allow-client-job-configurations not checked for executeAsync > ---------------------------------------------------------------------- > > Key: FLINK-26453 > URL: https://issues.apache.org/jira/browse/FLINK-26453 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.15.0 > Reporter: Marios Trivyzas > Assignee: Fabian Paul > Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > * *checkNotAllowedConfigurations()* should be called by > *StreamContextEnvironment#executeAsync()* > * Description of the *DeploymentOption* should be more clear, and it's not > only checked by application mode. > * When using a configuration which is the same as the one in flink-conf.yaml > Modified code of StreamSQLExample to pass extra config > {noformat} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Configuration conf = new Configuration(); > conf.set( > ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER, > ExecutionConfigOptions.NotNullEnforcer.DROP); > // set up the Java Table API > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env, > EnvironmentSettings.fromConfiguration(conf)); > final DataStream<Order> orderA = > env.fromCollection( > Arrays.asList( > new Order(1L, "beer", 3), > new Order(1L, "diaper", 4), > new Order(3L, "rubber", 2))); > final DataStream<Order> orderB = > env.fromCollection( > Arrays.asList( > new Order(2L, "pen", 3), > new Order(2L, "rubber", 3), > new Order(4L, "beer", 1))); > // convert the first DataStream to a Table object > // it will be used "inline" and is not registered in a catalog > final Table tableA = tableEnv.fromDataStream(orderA); > // convert the second DataStream and register it as a view > // it will be accessible under a name > tableEnv.createTemporaryView("TableB", orderB); > // union the two tables > final Table result = > tableEnv.sqlQuery( > "SELECT * FROM " > + tableA > + " WHERE amount > 2 UNION ALL " > + "SELECT * FROM TableB WHERE amount < 2"); > // convert the Table back to an insert-only DataStream of type `Order` > tableEnv.toDataStream(result, Order.class).print(); > // after the table program is converted to a DataStream program, > // we must use `env.execute()` to submit the job > env.execute(); > {noformat} > ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in > flink-conf.yaml and yet no exception is thrown, that is because in > StreamTableEnvironmentImpl: > {noformat} > public static StreamTableEnvironment create( > StreamExecutionEnvironment executionEnvironment, > EnvironmentSettings settings, > TableConfig tableConfig) {{noformat} > we use the > {noformat} > public static Executor lookupExecutor( > ClassLoader classLoader, > String executorIdentifier, > StreamExecutionEnvironment executionEnvironment) {{noformat} > so we don't follow the same path to call the > StreamContextEnvironment#setAsContext > which checks for overriding options depending on the new flag. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)