Anton Kalashnikov created FLINK-30278:
-----------------------------------------
Summary: Unexpected config mutation in
SinkTransformationTranslator
Key: FLINK-30278
URL: https://issues.apache.org/jira/browse/FLINK-30278
Project: Flink
Issue Type: Bug
Components: Runtime / Configuration
Affects Versions: 1.16.0, 1.17.0
Reporter: Anton Kalashnikov
If we forbid changing configuration
programmatically(`execution.program-config.enabled`) and try to use `FileSink`.
The following exception will occur:
<noformat>
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Not allowed configuration change(s) were detected:
- Configuration parallelism.default:1 not allowed in the configuration object
ExecutionConfig.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not
allowed configuration change(s) were detected:
- Configuration parallelism.default:1 not allowed in the configuration object
ExecutionConfig.
at
org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
at
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
... 16 more
<noformat>
It happens since inside of `SinkTransformationTranslator` we have following
logic:
* Remeber the current parallelism
* Set parallelism to default
* Do transformation
* Set parallelism to remembered one
But if the initial prallelism is default we actually should do nothing but
according current logic we explicitly set default value to the configuration
which actually is the programmatic config mutation(which we want to avoid)
See
org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341
--
This message was sent by Atlassian Jira
(v8.20.10#820010)