[
https://issues.apache.org/jira/browse/FLINK-40055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gerk Elznik updated FLINK-40055:
--------------------------------
Description:
When the operator's own configuration is loaded from a standard-mode
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed
values that Flink 1.x's legacy parser cannot read. An application-mode
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
[flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)
Same operator image (current main), same kind cluster, same {{v1_20}}
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}}
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars:
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|
Today this is reachable e.g. via a conf-override dir or a custom mount that
places a {{config.yaml}} in the operator's conf directory
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR
#1126) would make it the default path by mounting the operator config as
{{{}config.yaml{}}}, which is how it was found — see the analysis on
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause
The YAML dialect used for serialization is a *process-global* property in
flink-core: when the operator loads its own configuration from
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true
JVM-wide, and every no-arg {{new Configuration()}} — including
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() +
addAll}} — creates standard-dialect configs from then on. The managed
deployment's effective config therefore speaks the *operator's* dialect instead
of the {*}target deployment's{*}:
# The deploy config handed to the reconciler
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries
{{standardYaml=true}} (via the process-global flag; the cached original also
inherits it through {{new Configuration(defaultConfig)}} in
{{{}getDefaultConfig{}}}).
# {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}}
serializes every value with {{{}ConfigurationUtils.convertToString(value,
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g.
{{{}PipelineOptions.JARS{}}}, set as a typed list by
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
# {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is
named {{flink-conf.yaml}} and written legacy-style — but the damage is done:
the bracketed *string* is written verbatim. The comment there ("would always be
false currently", from FLINK-37236) documents exactly the assumption that no
longer holds once the operator itself runs on standard config.
# Flink 1.x's legacy parser reads {{['local://…']}} as a raw string →
{{{}URISyntaxException{}}}.
The chain was reproduced in isolation against flink-core 1.20.4: after
{{GlobalConfiguration.loadConfiguration}} of a directory containing
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the
legacy-parseable {{{}pipeline.jars: local:///…{}}}.
Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any
{{v1_xx}} deployment. Session-job submission has the same dialect leak:
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}}
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session
clusters.
h3. Proposed fix
Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics
untouched (stamping the target dialect onto the long-lived resource configs was
prototyped and rejected: {{standardYaml}} also governs how raw string values
are {*}parsed{*}, so it changes read behavior for every consumer — and any
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it
anyway):
* {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy
constructor and {{removeKey}} directly instead of the
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the
{{pipeline.jars}} list) survive untouched to the boundaries that already render
per target version ({{{}FlinkConfMountDecorator{}}});
* {{AbstractFlinkService#runJar}} — serialize the session-job REST config map
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for
the {{{}JarRunRequestBody{}}});
* the Flink-2.0 dialect threshold becomes one shared authority:
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static;
the instance method delegates).
Verified with a from-source operator build across all four operator-format ×
deployment-version combinations: the {{v1_20}} application deployment that
crashloops without the fix reaches RUNNING with it (operator on
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats,
and the generated deployment config is identical regardless of the operator's
own format (only per-run identity values differ). Also exercised beyond CI with
an in-place operator config-format migration (helm chart flip legacy↔standard
under running v1 and v2 jobs, JobManager pods killed after each flip to force a
config re-read): no job disruption, no pod churn, generated configs
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins
the process-global standard-yaml flag and asserts typed-value survival plus
per-version serialization.
This is a prerequisite for FLINK-39791 (chart mounting the operator config as
{{{}config.yaml{}}}).
#
## Issue links to add after creation
- blocks: FLINK-39791
- relates to: FLINK-37236
was:
When the operator's own configuration is loaded from a standard-mode
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed
values that Flink 1.x's legacy parser cannot read. An application-mode
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
[flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)
Same operator image (current main), same kind cluster, same {{v1_20}}
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}}
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars:
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|
Today this is reachable e.g. via a conf-override dir or a custom mount that
places a {{config.yaml}} in the operator's conf directory
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR
#1126) would make it the default path by mounting the operator config as
{{{}config.yaml{}}}, which is how it was found — see the analysis on
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause
The YAML dialect used for serialization is a *process-global* property in
flink-core: when the operator loads its own configuration from
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true
JVM-wide, and every no-arg {{new Configuration()}} — including
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() +
addAll}} — creates standard-dialect configs from then on. The managed
deployment's effective config therefore speaks the *operator's* dialect instead
of the {*}target deployment's{*}:
# The deploy config handed to the reconciler
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries
{{standardYaml=true}} (via the process-global flag; the cached original also
inherits it through {{new Configuration(defaultConfig)}} in
{{{}getDefaultConfig{}}}).
# {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}}
serializes every value with {{{}ConfigurationUtils.convertToString(value,
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g.
{{{}PipelineOptions.JARS{}}}, set as a typed list by
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
# {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is
named {{flink-conf.yaml}} and written legacy-style — but the damage is done:
the bracketed *string* is written verbatim. The comment there ("would always be
false currently", from FLINK-37236) documents exactly the assumption that no
longer holds once the operator itself runs on standard config.
# Flink 1.x's legacy parser reads {{['local://…']}} as a raw string →
{{{}URISyntaxException{}}}.
The chain was reproduced in isolation against flink-core 1.20.4: after
{{GlobalConfiguration.loadConfiguration}} of a directory containing
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the
legacy-parseable {{{}pipeline.jars: local:///…{}}}.
Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any
{{v1_xx}} deployment. Session-job submission has the same dialect leak:
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}}
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session
clusters.
h3. Proposed fix
Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics
untouched (stamping the target dialect onto the long-lived resource configs was
prototyped and rejected: {{standardYaml}} also governs how raw string values
are {*}parsed{*}, so it changes read behavior for every consumer — and any
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it
anyway):
* {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy
constructor and {{removeKey}} directly instead of the
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the
{{pipeline.jars}} list) survive untouched to the boundaries that already render
per target version ({{{}FlinkConfMountDecorator{}}});
* {{AbstractFlinkService#runJar}} — serialize the session-job REST config map
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for
the {{{}JarRunRequestBody{}}});
* the Flink-2.0 dialect threshold becomes one shared authority:
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static;
the instance method delegates).
Verified with a from-source operator build across all four operator-format ×
deployment-version combinations: the {{v1_20}} application deployment that
crashloops without the fix reaches RUNNING with it (operator on
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats,
and the generated deployment config is identical regardless of the operator's
own format (only per-run identity values differ). Also exercised beyond CI with
an in-place operator config-format migration (helm chart flip legacy↔standard
under running v1 and v2 jobs, JobManager pods killed after each flip to force a
config re-read): no job disruption, no pod churn, generated configs
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins
the process-global standard-yaml flag and asserts typed-value survival plus
per-version serialization.
This is a prerequisite for FLINK-39791 (chart mounting the operator config as
{{{}config.yaml{}}}).
#
## Issue links to add after creation
- *{*}blocks{*}* FLINK-39791
- *{*}relates to{*}* FLINK-37236
> Operator serializes managed-deployment config in its own YAML dialect,
> breaking Flink 1.x deployments when the operator itself runs on standard
> config.yaml
> -----------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-40055
> URL: https://issues.apache.org/jira/browse/FLINK-40055
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.15.0
> Reporter: Gerk Elznik
> Priority: Major
> Labels: operator
>
> When the operator's own configuration is loaded from a standard-mode
> {{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config
> files it generates for *managed Flink 1.x deployments* contain
> standard-YAML-typed values that Flink 1.x's legacy parser cannot read. An
> application-mode deployment then crashloops at JobManager startup:
> {code:java}
> ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not
> create application program.
> java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in
> scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
> at
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
> [flink-dist-1.20.4.jar]
> {code}
> h3. Reproduction (A/B, only variable = the operator's own config file format)
> Same operator image (current main), same kind cluster, same {{v1_20}}
> application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
> || ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}}
> (today's chart default)||
> |operator config mode|standard YAML|legacy|
> |value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars:
> ['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
> |{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|
> Today this is reachable e.g. via a conf-override dir or a custom mount that
> places a {{config.yaml}} in the operator's conf directory
> ({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR
> #1126) would make it the default path by mounting the operator config as
> {{{}config.yaml{}}}, which is how it was found — see the analysis on
> [https://github.com/apache/flink-kubernetes-operator/pull/1126].
> h3. Root cause
> The YAML dialect used for serialization is a *process-global* property in
> flink-core: when the operator loads its own configuration from
> {{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true
> JVM-wide, and every no-arg {{new Configuration()}} — including
> {{{}Configuration#clone(){}}}, which is implemented as {{new Configuration()
> + addAll}} — creates standard-dialect configs from then on. The managed
> deployment's effective config therefore speaks the *operator's* dialect
> instead of the {*}target deployment's{*}:
> # The deploy config handed to the reconciler
> ({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}})
> carries {{standardYaml=true}} (via the process-global flag; the cached
> original also inherits it through {{new Configuration(defaultConfig)}} in
> {{{}getDefaultConfig{}}}).
> # {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config
> through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}}
> serializes every value with {{{}ConfigurationUtils.convertToString(value,
> standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g.
> {{{}PipelineOptions.JARS{}}}, set as a typed list by
> {{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
> # {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds
> {{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file
> is named {{flink-conf.yaml}} and written legacy-style — but the damage is
> done: the bracketed *string* is written verbatim. The comment there ("would
> always be false currently", from FLINK-37236) documents exactly the
> assumption that no longer holds once the operator itself runs on standard
> config.
> # Flink 1.x's legacy parser reads {{['local://…']}} as a raw string →
> {{{}URISyntaxException{}}}.
> The chain was reproduced in isolation against flink-core 1.20.4: after
> {{GlobalConfiguration.loadConfiguration}} of a directory containing
> {{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits
> {{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the
> legacy-parseable {{{}pipeline.jars: local:///…{}}}.
> Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any
> {{v1_xx}} deployment. Session-job submission has the same dialect leak:
> {{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}}
> (operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session
> clusters.
> h3. Proposed fix
> Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics
> untouched (stamping the target dialect onto the long-lived resource configs
> was prototyped and rejected: {{standardYaml}} also governs how raw string
> values are {*}parsed{*}, so it changes read behavior for every consumer — and
> any downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it
> anyway):
> * {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy
> constructor and {{removeKey}} directly instead of the
> {{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the
> {{pipeline.jars}} list) survive untouched to the boundaries that already
> render per target version ({{{}FlinkConfMountDecorator{}}});
> * {{AbstractFlinkService#runJar}} — serialize the session-job REST config
> map in the receiving cluster's dialect (a throwaway dialect-stamped copy just
> for the {{{}JarRunRequestBody{}}});
> * the Flink-2.0 dialect threshold becomes one shared authority:
> {{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public
> static; the instance method delegates).
> Verified with a from-source operator build across all four operator-format ×
> deployment-version combinations: the {{v1_20}} application deployment that
> crashloops without the fix reaches RUNNING with it (operator on
> {{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats,
> and the generated deployment config is identical regardless of the operator's
> own format (only per-run identity values differ). Also exercised beyond CI
> with an in-place operator config-format migration (helm chart flip
> legacy↔standard under running v1 and v2 jobs, JobManager pods killed after
> each flip to force a config re-read): no job disruption, no pod churn,
> generated configs byte-identical. Unit test included
> ({{{}AbstractFlinkServiceTest{}}}) that pins the process-global standard-yaml
> flag and asserts typed-value survival plus per-version serialization.
> This is a prerequisite for FLINK-39791 (chart mounting the operator config as
> {{{}config.yaml{}}}).
> #
> ## Issue links to add after creation
> - blocks: FLINK-39791
> - relates to: FLINK-37236
--
This message was sent by Atlassian Jira
(v8.20.10#820010)