Marios Trivyzas created FLINK-26453:
---------------------------------------

             Summary: execution.allow-client-job-configurations not checked for 
executeAsync
                 Key: FLINK-26453
                 URL: https://issues.apache.org/jira/browse/FLINK-26453
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.15.0
            Reporter: Marios Trivyzas
            Assignee: Fabian Paul


* *checkNotAllowedConfigurations()* should be called by  
{*}{*}{*}StreamContextEnvironment#executeAsync(){*}
 *  Description of the *DeploymentOption* should be more clear, and it's not 
only checked by application mode.
 * When using a combination of TableAPI and DataStreamApi, the check for 
overriding config options is not applied:

Modified code of StreamSQLExample to pass extra config
{noformat}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration conf = new Configuration();
conf.set(
        ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
        ExecutionConfigOptions.NotNullEnforcer.DROP);

// set up the Java Table API
final StreamTableEnvironment tableEnv =
        StreamTableEnvironment.create(env, 
EnvironmentSettings.fromConfiguration(conf));

final DataStream<Order> orderA =
        env.fromCollection(
                Arrays.asList(
                        new Order(1L, "beer", 3),
                        new Order(1L, "diaper", 4),
                        new Order(3L, "rubber", 2)));

final DataStream<Order> orderB =
        env.fromCollection(
                Arrays.asList(
                        new Order(2L, "pen", 3),
                        new Order(2L, "rubber", 3),
                        new Order(4L, "beer", 1)));

// convert the first DataStream to a Table object
// it will be used "inline" and is not registered in a catalog
final Table tableA = tableEnv.fromDataStream(orderA);

// convert the second DataStream and register it as a view
// it will be accessible under a name
tableEnv.createTemporaryView("TableB", orderB);

// union the two tables
final Table result =
        tableEnv.sqlQuery(
                "SELECT * FROM "
                        + tableA
                        + " WHERE amount > 2 UNION ALL "
                        + "SELECT * FROM TableB WHERE amount < 2");

// convert the Table back to an insert-only DataStream of type `Order`
tableEnv.toDataStream(result, Order.class).print();

// after the table program is converted to a DataStream program,
// we must use `env.execute()` to submit the job
env.execute();


{noformat}
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in 
flink-conf.yaml and yet no exception is thrown, that is because in

StreamTableEnvironmentImpl:
{noformat}
public static StreamTableEnvironment create(
        StreamExecutionEnvironment executionEnvironment,
        EnvironmentSettings settings,
        TableConfig tableConfig) {{noformat}
we use the 
{noformat}
public static Executor lookupExecutor(
        ClassLoader classLoader,
        String executorIdentifier,
        StreamExecutionEnvironment executionEnvironment) {{noformat}
so we don't follow the same path to call the 
StreamContextEnvironment#setAsContext

which checks for overriding options depending on the new flag.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to