Leonard Xu created FLINK-14861:
----------------------------------
Summary: parallelism.default in flink-conf.yaml do not work which
is a bug imported by[FLINK-14745]
Key: FLINK-14861
URL: https://issues.apache.org/jira/browse/FLINK-14861
Project: Flink
Issue Type: Bug
Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: Leonard Xu
Fix For: 1.10.0
I set parameter "parallelism.default" in flink-conf.yaml, but it's do not work
any more when I rebased my branch to master. I debug and find it's a bug
imported by [FLINK-14745](https://issues.apache.org/jira/browse/FLINK-14745).
Detail:
{code:java}
// ExecutionConfigAccessor#fromProgramOptions
public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions
options, final List<URL> jobJars) {
checkNotNull(options);
checkNotNull(jobJars);
final Configuration configuration = new Configuration();
if (options.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM,
options.getParallelism());
}
configuration.setBoolean(DeploymentOptions.ATTACHED,
!options.getDetachedMode());
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED,
options.isShutdownOnAttachedExit());
ConfigUtils.encodeCollectionToConfig(configuration,
PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS,
jobJars, URL::toString);
SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(),
configuration);
return new ExecutionConfigAccessor(configuration);
}{code}
[1]. function executionConfigAccessor.getParallelism() will return 1 rather
than -1 when options.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
because
when getParallelism() function will return the defaultValue of
CoreOptions.DEFAULT_PARALLELISM.
{code:java}
// ExecutionConfigAccessor.java
public int getParallelism() {
return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
// Configuration.java
public int getInteger(ConfigOption<Integer> configOption) {
return getOptional(configOption)
.orElseGet(configOption::defaultValue);
}{code}
And function executionConfigAccessor.getParallelism() still return 1 when
options.getParallelism() == 1.
So, the following code in CliFrontend.java will never reach if user not set
parallelism in flink run command line.
{code:java}
// CliFrontend.java
int parallelism = executionParameters.getParallelism() == -1 ?
defaultParallelism : executionParameters.getParallelism();{code}
[2]and another position, I think we should keep two line which deleted in
FLINK-14745--.
{code:java}
//
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
{code}
*if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) \{
userParallelism = defaultParallelism;
}*
{code:java}
executeProgram(program, client, userParallelism,
executionParameters.getDetachedMode());
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)