Hi Gyula,

Thanks for getting back. Could you share how to submit job to
flinkk8operator in json format?

We use the java Fabric8 K8 client, which serializes java
FlinkDeployment objects
to CustomResource YAML (see the code snippet below).  Since `#` is
considered a special character denoting comments in YAML,  it should be
escaped properly when YAML file is generated. We are also reading into the
code to see if we can identify the place for the fix.

import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
import
io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;

FlinkDeployment deployment = xxxx;
CustomResourceDefinitionContext context = xxx;
DefaultKubernetesClient client = xxx;

    client
          .customResources(
              context, FlinkDeployment.class, FlinkDeploymentList.class)
          .inNamespace(xxx)
          .withName(deploymentName)
          .createOrReplace(deployment);





On Tue, Nov 8, 2022 at 2:41 AM Yang Wang <danrtsey...@gmail.com> wrote:

> This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>
> Best,
> Yang
>
> Gyula Fóra <gyula.f...@gmail.com> 于2022年11月8日周二 14:41写道:
>
>> It is also possible that this is a problem of the Flink native Kubernetes
>> integration, we have to check where exactly it goes wrong before we try to
>> fix it .
>>
>> We simply set the args into a Flink config and pass it to the native
>> deployment logic in the operator.
>>
>> Gyula
>>
>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> How do you submit your yaml?
>>>
>>> It’s possible that this is not operator problem. Did you try submitting
>>> the deployment in json format instead?
>>>
>>> If it still doesn't work please open a JIRA ticket with the details to
>>> reproduce and what you have tried :)
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao <xiangcaohe...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a job that contains `#` as part of mainArgs and it used to work
>>>> on Ververica. Now we are switching to our own control plane to deploy to
>>>> flink-operaotor and the job started to fail due to the main args string
>>>> getting truncated at `#` character when passed to flink application. I
>>>> believe this is due to characters after `#` being interpreted as comments
>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>> needs to escape `#` when generating k8 yaml file.
>>>>
>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>
>>>> Here is the stack-trace:
>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>
>>>> for key  '$internal.application.program-args'.\n\tat
>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>>>  
>>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>>>  
>>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>>>  
>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>>>  
>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>>>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
>>>> was not closed properly*.\n\tat 
>>>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>>>  
>>>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>>>  
>>>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>>>  
>>>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>>>  
>>>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>>>  5 more\n"},"@version":1,"source_host":"xxxxxx","message":"Could not 
>>>> create application 
>>>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>>
>>>>
>>>>  Can someone take a look and help fixing this issue? or I can help
>>>> fixing this if someone can point me in the right direction.
>>>>
>>>> --
>>>> Best Wishes & Regards
>>>> Shawn Xiangcao Liu
>>>>
>>>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to