[ 
https://issues.apache.org/jira/browse/FLINK-30278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov updated FLINK-30278:
--------------------------------------
    Description: 
If we forbid changing configuration 
programmatically(`execution.program-config.enabled`) and try to use `FileSink`. 
The following exception will occur:
{noformat}
  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
   at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
   at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
   at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
   at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
   at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
   at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not 
allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
   at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
   at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
   at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
Source)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
   at java.base/java.lang.reflect.Method.invoke(Unknown Source)
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
   ... 16 more
{noformat}

It happens since inside of `SinkTransformationTranslator` we have following 
logic:
* Remeber the current parallelism
* Set parallelism to default
* Do transformation
* Set parallelism to remembered one

But if the initial prallelism is default we actually should do nothing but 
according current logic we explicitly set default value to the configuration 
which actually is the programmatic config mutation(which we want to avoid)

See 
org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
 

  was:
If we forbid changing configuration 
programmatically(`execution.program-config.enabled`) and try to use `FileSink`. 
The following exception will occur:
<noformat>
  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
   at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
   at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
   at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
   at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
   at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
   at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
   at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
   at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
   at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not 
allowed configuration change(s) were detected:
 - Configuration parallelism.default:1 not allowed in the configuration object 
ExecutionConfig.
   at 
org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
   at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
   at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
   at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
Source)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
   at java.base/java.lang.reflect.Method.invoke(Unknown Source)
   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
   ... 16 more
<noformat>

It happens since inside of `SinkTransformationTranslator` we have following 
logic:
* Remeber the current parallelism
* Set parallelism to default
* Do transformation
* Set parallelism to remembered one

But if the initial prallelism is default we actually should do nothing but 
according current logic we explicitly set default value to the configuration 
which actually is the programmatic config mutation(which we want to avoid)

See 
org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
 


> Unexpected config mutation in SinkTransformationTranslator 
> -----------------------------------------------------------
>
>                 Key: FLINK-30278
>                 URL: https://issues.apache.org/jira/browse/FLINK-30278
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Anton Kalashnikov
>            Priority: Major
>
> If we forbid changing configuration 
> programmatically(`execution.program-config.enabled`) and try to use 
> `FileSink`. The following exception will occur:
> {noformat}
>   org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Not allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration 
> object ExecutionConfig.
>    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
>    at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
>    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
>    at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
>    at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
>    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>    at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>    at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>    at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>    at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>    at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>    at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not 
> allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration 
> object ExecutionConfig.
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
>    at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
>    at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
>    at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
>    at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
>    ... 16 more
> {noformat}
> It happens since inside of `SinkTransformationTranslator` we have following 
> logic:
> * Remeber the current parallelism
> * Set parallelism to default
> * Do transformation
> * Set parallelism to remembered one
> But if the initial prallelism is default we actually should do nothing but 
> according current logic we explicitly set default value to the configuration 
> which actually is the programmatic config mutation(which we want to avoid)
> See 
> org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to