[ 
https://issues.apache.org/jira/browse/FLINK-40055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gerk Elznik updated FLINK-40055:
--------------------------------
    Description: 
When the operator's own configuration is loaded from a standard-mode 
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files 
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed 
values that Flink 1.x's legacy parser cannot read. An application-mode 
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create 
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in 
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
  at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
 [flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)

Same operator image (current main), same kind cluster, same {{v1_20}} 
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}} 
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars: 
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|

Today this is reachable e.g. via a conf-override dir or a custom mount that 
places a {{config.yaml}} in the operator's conf directory 
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR 
#1126) would make it the default path by mounting the operator config as 
{{{}config.yaml{}}}, which is how it was found — see the analysis on 
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause

The YAML dialect used for serialization is a *process-global* property in 
flink-core: when the operator loads its own configuration from 
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true 
JVM-wide, and every no-arg {{new Configuration()}} — including 
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() + 
addAll}} — creates standard-dialect configs from then on. The managed 
deployment's effective config therefore speaks the *operator's* dialect instead 
of the {*}target deployment's{*}:
 # The deploy config handed to the reconciler 
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries 
{{standardYaml=true}} (via the process-global flag; the cached original also 
inherits it through {{new Configuration(defaultConfig)}} in 
{{{}getDefaultConfig{}}}).
 # {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config 
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}} 
serializes every value with {{{}ConfigurationUtils.convertToString(value, 
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g. 
{{{}PipelineOptions.JARS{}}}, set as a typed list by 
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
 # {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds 
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is 
named {{flink-conf.yaml}} and written legacy-style — but the damage is done: 
the bracketed *string* is written verbatim. The comment there ("would always be 
false currently", from FLINK-37236) documents exactly the assumption that no 
longer holds once the operator itself runs on standard config.
 # Flink 1.x's legacy parser reads {{['local://…']}} as a raw string → 
{{{}URISyntaxException{}}}.

The chain was reproduced in isolation against flink-core 1.20.4: after 
{{GlobalConfiguration.loadConfiguration}} of a directory containing 
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits 
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the 
legacy-parseable {{{}pipeline.jars: local:///…{}}}.

Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any 
{{v1_xx}} deployment. Session-job submission has the same dialect leak: 
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}} 
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session 
clusters.
h3. Proposed fix

Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics 
untouched (stamping the target dialect onto the long-lived resource configs was 
prototyped and rejected: {{standardYaml}} also governs how raw string values 
are {*}parsed{*}, so it changes read behavior for every consumer — and any 
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it 
anyway):
 * {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy 
constructor and {{removeKey}} directly instead of the 
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the 
{{pipeline.jars}} list) survive untouched to the boundaries that already render 
per target version ({{{}FlinkConfMountDecorator{}}});
 * {{AbstractFlinkService#runJar}} — serialize the session-job REST config map 
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for 
the {{{}JarRunRequestBody{}}});
 * the Flink-2.0 dialect threshold becomes one shared authority: 
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static; 
the instance method delegates).

Verified with a from-source operator build across all four operator-format × 
deployment-version combinations: the {{v1_20}} application deployment that 
crashloops without the fix reaches RUNNING with it (operator on 
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats, 
and the generated deployment config is identical regardless of the operator's 
own format (only per-run identity values differ). Also exercised beyond CI with 
an in-place operator config-format migration (helm chart flip legacy↔standard 
under running v1 and v2 jobs, JobManager pods killed after each flip to force a 
config re-read): no job disruption, no pod churn, generated configs 
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins 
the process-global standard-yaml flag and asserts typed-value survival plus 
per-version serialization.

This is a prerequisite for FLINK-39791 (chart mounting the operator config as 
{{{}config.yaml{}}}).
 # 
 ## Issue links to add after creation

 - blocks: FLINK-39791
 - relates to: FLINK-37236


  was:
When the operator's own configuration is loaded from a standard-mode 
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files 
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed 
values that Flink 1.x's legacy parser cannot read. An application-mode 
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create 
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in 
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
  at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
 [flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)

Same operator image (current main), same kind cluster, same {{v1_20}} 
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}} 
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars: 
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|

Today this is reachable e.g. via a conf-override dir or a custom mount that 
places a {{config.yaml}} in the operator's conf directory 
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR 
#1126) would make it the default path by mounting the operator config as 
{{{}config.yaml{}}}, which is how it was found — see the analysis on 
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause

The YAML dialect used for serialization is a *process-global* property in 
flink-core: when the operator loads its own configuration from 
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true 
JVM-wide, and every no-arg {{new Configuration()}} — including 
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() + 
addAll}} — creates standard-dialect configs from then on. The managed 
deployment's effective config therefore speaks the *operator's* dialect instead 
of the {*}target deployment's{*}:
 # The deploy config handed to the reconciler 
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries 
{{standardYaml=true}} (via the process-global flag; the cached original also 
inherits it through {{new Configuration(defaultConfig)}} in 
{{{}getDefaultConfig{}}}).
 # {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config 
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}} 
serializes every value with {{{}ConfigurationUtils.convertToString(value, 
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g. 
{{{}PipelineOptions.JARS{}}}, set as a typed list by 
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
 # {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds 
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is 
named {{flink-conf.yaml}} and written legacy-style — but the damage is done: 
the bracketed *string* is written verbatim. The comment there ("would always be 
false currently", from FLINK-37236) documents exactly the assumption that no 
longer holds once the operator itself runs on standard config.
 # Flink 1.x's legacy parser reads {{['local://…']}} as a raw string → 
{{{}URISyntaxException{}}}.

The chain was reproduced in isolation against flink-core 1.20.4: after 
{{GlobalConfiguration.loadConfiguration}} of a directory containing 
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits 
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the 
legacy-parseable {{{}pipeline.jars: local:///…{}}}.

Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any 
{{v1_xx}} deployment. Session-job submission has the same dialect leak: 
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}} 
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session 
clusters.
h3. Proposed fix

Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics 
untouched (stamping the target dialect onto the long-lived resource configs was 
prototyped and rejected: {{standardYaml}} also governs how raw string values 
are {*}parsed{*}, so it changes read behavior for every consumer — and any 
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it 
anyway):
 * {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy 
constructor and {{removeKey}} directly instead of the 
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the 
{{pipeline.jars}} list) survive untouched to the boundaries that already render 
per target version ({{{}FlinkConfMountDecorator{}}});
 * {{AbstractFlinkService#runJar}} — serialize the session-job REST config map 
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for 
the {{{}JarRunRequestBody{}}});
 * the Flink-2.0 dialect threshold becomes one shared authority: 
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static; 
the instance method delegates).

Verified with a from-source operator build across all four operator-format × 
deployment-version combinations: the {{v1_20}} application deployment that 
crashloops without the fix reaches RUNNING with it (operator on 
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats, 
and the generated deployment config is identical regardless of the operator's 
own format (only per-run identity values differ). Also exercised beyond CI with 
an in-place operator config-format migration (helm chart flip legacy↔standard 
under running v1 and v2 jobs, JobManager pods killed after each flip to force a 
config re-read): no job disruption, no pod churn, generated configs 
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins 
the process-global standard-yaml flag and asserts typed-value survival plus 
per-version serialization.

This is a prerequisite for FLINK-39791 (chart mounting the operator config as 
{{{}config.yaml{}}}).
 # 
 ## Issue links to add after creation

 - *{*}blocks{*}* FLINK-39791
 - *{*}relates to{*}* FLINK-37236


> Operator serializes managed-deployment config in its own YAML dialect, 
> breaking Flink 1.x deployments when the operator itself runs on standard 
> config.yaml
> -----------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40055
>                 URL: https://issues.apache.org/jira/browse/FLINK-40055
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.15.0
>            Reporter: Gerk Elznik
>            Priority: Major
>              Labels: operator
>
> When the operator's own configuration is loaded from a standard-mode 
> {{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config 
> files it generates for *managed Flink 1.x deployments* contain 
> standard-YAML-typed values that Flink 1.x's legacy parser cannot read. An 
> application-mode deployment then crashloops at JobManager startup:
> {code:java}
> ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not 
> create application program.
> java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in 
> scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
>   at 
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
>  [flink-dist-1.20.4.jar]
> {code}
> h3. Reproduction (A/B, only variable = the operator's own config file format)
> Same operator image (current main), same kind cluster, same {{v1_20}} 
> application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
> || ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}} 
> (today's chart default)||
> |operator config mode|standard YAML|legacy|
> |value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars: 
> ['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
> |{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|
> Today this is reachable e.g. via a conf-override dir or a custom mount that 
> places a {{config.yaml}} in the operator's conf directory 
> ({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR 
> #1126) would make it the default path by mounting the operator config as 
> {{{}config.yaml{}}}, which is how it was found — see the analysis on 
> [https://github.com/apache/flink-kubernetes-operator/pull/1126].
> h3. Root cause
> The YAML dialect used for serialization is a *process-global* property in 
> flink-core: when the operator loads its own configuration from 
> {{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true 
> JVM-wide, and every no-arg {{new Configuration()}} — including 
> {{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() 
> + addAll}} — creates standard-dialect configs from then on. The managed 
> deployment's effective config therefore speaks the *operator's* dialect 
> instead of the {*}target deployment's{*}:
>  # The deploy config handed to the reconciler 
> ({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) 
> carries {{standardYaml=true}} (via the process-global flag; the cached 
> original also inherits it through {{new Configuration(defaultConfig)}} in 
> {{{}getDefaultConfig{}}}).
>  # {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config 
> through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}} 
> serializes every value with {{{}ConfigurationUtils.convertToString(value, 
> standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g. 
> {{{}PipelineOptions.JARS{}}}, set as a typed list by 
> {{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
>  # {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds 
> {{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file 
> is named {{flink-conf.yaml}} and written legacy-style — but the damage is 
> done: the bracketed *string* is written verbatim. The comment there ("would 
> always be false currently", from FLINK-37236) documents exactly the 
> assumption that no longer holds once the operator itself runs on standard 
> config.
>  # Flink 1.x's legacy parser reads {{['local://…']}} as a raw string → 
> {{{}URISyntaxException{}}}.
> The chain was reproduced in isolation against flink-core 1.20.4: after 
> {{GlobalConfiguration.loadConfiguration}} of a directory containing 
> {{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits 
> {{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the 
> legacy-parseable {{{}pipeline.jars: local:///…{}}}.
> Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any 
> {{v1_xx}} deployment. Session-job submission has the same dialect leak: 
> {{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}} 
> (operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session 
> clusters.
> h3. Proposed fix
> Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics 
> untouched (stamping the target dialect onto the long-lived resource configs 
> was prototyped and rejected: {{standardYaml}} also governs how raw string 
> values are {*}parsed{*}, so it changes read behavior for every consumer — and 
> any downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it 
> anyway):
>  * {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy 
> constructor and {{removeKey}} directly instead of the 
> {{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the 
> {{pipeline.jars}} list) survive untouched to the boundaries that already 
> render per target version ({{{}FlinkConfMountDecorator{}}});
>  * {{AbstractFlinkService#runJar}} — serialize the session-job REST config 
> map in the receiving cluster's dialect (a throwaway dialect-stamped copy just 
> for the {{{}JarRunRequestBody{}}});
>  * the Flink-2.0 dialect threshold becomes one shared authority: 
> {{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public 
> static; the instance method delegates).
> Verified with a from-source operator build across all four operator-format × 
> deployment-version combinations: the {{v1_20}} application deployment that 
> crashloops without the fix reaches RUNNING with it (operator on 
> {{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats, 
> and the generated deployment config is identical regardless of the operator's 
> own format (only per-run identity values differ). Also exercised beyond CI 
> with an in-place operator config-format migration (helm chart flip 
> legacy↔standard under running v1 and v2 jobs, JobManager pods killed after 
> each flip to force a config re-read): no job disruption, no pod churn, 
> generated configs byte-identical. Unit test included 
> ({{{}AbstractFlinkServiceTest{}}}) that pins the process-global standard-yaml 
> flag and asserts typed-value survival plus per-version serialization.
> This is a prerequisite for FLINK-39791 (chart mounting the operator config as 
> {{{}config.yaml{}}}).
>  # 
>  ## Issue links to add after creation
>  - blocks: FLINK-39791
>  - relates to: FLINK-37236



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to