[ 
https://issues.apache.org/jira/browse/FLINK-26453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marios Trivyzas updated FLINK-26453:
------------------------------------
    Description: 
* *checkNotAllowedConfigurations()* should be called by  
*StreamContextEnvironment#executeAsync()*
 * Description of the *DeploymentOption* should be more clear, and it's not 
only checked by application mode.
 * When using a configuration which is the same as the one in flink-conf.yaml 

Modified code of StreamSQLExample to pass extra config
{noformat}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration conf = new Configuration();
conf.set(
        ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
        ExecutionConfigOptions.NotNullEnforcer.DROP);

// set up the Java Table API
final StreamTableEnvironment tableEnv =
        StreamTableEnvironment.create(env, 
EnvironmentSettings.fromConfiguration(conf));

final DataStream<Order> orderA =
        env.fromCollection(
                Arrays.asList(
                        new Order(1L, "beer", 3),
                        new Order(1L, "diaper", 4),
                        new Order(3L, "rubber", 2)));

final DataStream<Order> orderB =
        env.fromCollection(
                Arrays.asList(
                        new Order(2L, "pen", 3),
                        new Order(2L, "rubber", 3),
                        new Order(4L, "beer", 1)));

// convert the first DataStream to a Table object
// it will be used "inline" and is not registered in a catalog
final Table tableA = tableEnv.fromDataStream(orderA);

// convert the second DataStream and register it as a view
// it will be accessible under a name
tableEnv.createTemporaryView("TableB", orderB);

// union the two tables
final Table result =
        tableEnv.sqlQuery(
                "SELECT * FROM "
                        + tableA
                        + " WHERE amount > 2 UNION ALL "
                        + "SELECT * FROM TableB WHERE amount < 2");

// convert the Table back to an insert-only DataStream of type `Order`
tableEnv.toDataStream(result, Order.class).print();

// after the table program is converted to a DataStream program,
// we must use `env.execute()` to submit the job
env.execute();


{noformat}
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in 
flink-conf.yaml and yet no exception is thrown, that is because in

StreamTableEnvironmentImpl:
{noformat}
public static StreamTableEnvironment create(
        StreamExecutionEnvironment executionEnvironment,
        EnvironmentSettings settings,
        TableConfig tableConfig) {{noformat}
we use the 
{noformat}
public static Executor lookupExecutor(
        ClassLoader classLoader,
        String executorIdentifier,
        StreamExecutionEnvironment executionEnvironment) {{noformat}
so we don't follow the same path to call the 
StreamContextEnvironment#setAsContext

which checks for overriding options depending on the new flag.

 

 

 

  was:
* *checkNotAllowedConfigurations()* should be called by  
{*}{*}{*}StreamContextEnvironment#executeAsync(){*}
 *  Description of the *DeploymentOption* should be more clear, and it's not 
only checked by application mode.
 * When using a combination of TableAPI and DataStreamApi, the check for 
overriding config options is not applied:

Modified code of StreamSQLExample to pass extra config
{noformat}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration conf = new Configuration();
conf.set(
        ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
        ExecutionConfigOptions.NotNullEnforcer.DROP);

// set up the Java Table API
final StreamTableEnvironment tableEnv =
        StreamTableEnvironment.create(env, 
EnvironmentSettings.fromConfiguration(conf));

final DataStream<Order> orderA =
        env.fromCollection(
                Arrays.asList(
                        new Order(1L, "beer", 3),
                        new Order(1L, "diaper", 4),
                        new Order(3L, "rubber", 2)));

final DataStream<Order> orderB =
        env.fromCollection(
                Arrays.asList(
                        new Order(2L, "pen", 3),
                        new Order(2L, "rubber", 3),
                        new Order(4L, "beer", 1)));

// convert the first DataStream to a Table object
// it will be used "inline" and is not registered in a catalog
final Table tableA = tableEnv.fromDataStream(orderA);

// convert the second DataStream and register it as a view
// it will be accessible under a name
tableEnv.createTemporaryView("TableB", orderB);

// union the two tables
final Table result =
        tableEnv.sqlQuery(
                "SELECT * FROM "
                        + tableA
                        + " WHERE amount > 2 UNION ALL "
                        + "SELECT * FROM TableB WHERE amount < 2");

// convert the Table back to an insert-only DataStream of type `Order`
tableEnv.toDataStream(result, Order.class).print();

// after the table program is converted to a DataStream program,
// we must use `env.execute()` to submit the job
env.execute();


{noformat}
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in 
flink-conf.yaml and yet no exception is thrown, that is because in

StreamTableEnvironmentImpl:
{noformat}
public static StreamTableEnvironment create(
        StreamExecutionEnvironment executionEnvironment,
        EnvironmentSettings settings,
        TableConfig tableConfig) {{noformat}
we use the 
{noformat}
public static Executor lookupExecutor(
        ClassLoader classLoader,
        String executorIdentifier,
        StreamExecutionEnvironment executionEnvironment) {{noformat}
so we don't follow the same path to call the 
StreamContextEnvironment#setAsContext

which checks for overriding options depending on the new flag.

 

 

 


> execution.allow-client-job-configurations not checked for executeAsync
> ----------------------------------------------------------------------
>
>                 Key: FLINK-26453
>                 URL: https://issues.apache.org/jira/browse/FLINK-26453
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.0
>            Reporter: Marios Trivyzas
>            Assignee: Fabian Paul
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> * *checkNotAllowedConfigurations()* should be called by  
> *StreamContextEnvironment#executeAsync()*
>  * Description of the *DeploymentOption* should be more clear, and it's not 
> only checked by application mode.
>  * When using a configuration which is the same as the one in flink-conf.yaml 
> Modified code of StreamSQLExample to pass extra config
> {noformat}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
> conf.set(
>         ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
>         ExecutionConfigOptions.NotNullEnforcer.DROP);
> // set up the Java Table API
> final StreamTableEnvironment tableEnv =
>         StreamTableEnvironment.create(env, 
> EnvironmentSettings.fromConfiguration(conf));
> final DataStream<Order> orderA =
>         env.fromCollection(
>                 Arrays.asList(
>                         new Order(1L, "beer", 3),
>                         new Order(1L, "diaper", 4),
>                         new Order(3L, "rubber", 2)));
> final DataStream<Order> orderB =
>         env.fromCollection(
>                 Arrays.asList(
>                         new Order(2L, "pen", 3),
>                         new Order(2L, "rubber", 3),
>                         new Order(4L, "beer", 1)));
> // convert the first DataStream to a Table object
> // it will be used "inline" and is not registered in a catalog
> final Table tableA = tableEnv.fromDataStream(orderA);
> // convert the second DataStream and register it as a view
> // it will be accessible under a name
> tableEnv.createTemporaryView("TableB", orderB);
> // union the two tables
> final Table result =
>         tableEnv.sqlQuery(
>                 "SELECT * FROM "
>                         + tableA
>                         + " WHERE amount > 2 UNION ALL "
>                         + "SELECT * FROM TableB WHERE amount < 2");
> // convert the Table back to an insert-only DataStream of type `Order`
> tableEnv.toDataStream(result, Order.class).print();
> // after the table program is converted to a DataStream program,
> // we must use `env.execute()` to submit the job
> env.execute();
> {noformat}
> ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in 
> flink-conf.yaml and yet no exception is thrown, that is because in
> StreamTableEnvironmentImpl:
> {noformat}
> public static StreamTableEnvironment create(
>         StreamExecutionEnvironment executionEnvironment,
>         EnvironmentSettings settings,
>         TableConfig tableConfig) {{noformat}
> we use the 
> {noformat}
> public static Executor lookupExecutor(
>         ClassLoader classLoader,
>         String executorIdentifier,
>         StreamExecutionEnvironment executionEnvironment) {{noformat}
> so we don't follow the same path to call the 
> StreamContextEnvironment#setAsContext
> which checks for overriding options depending on the new flag.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to