Re: Move savepoint to another s3 bucket

2022-03-09 Thread Lukáš Drbal
Hi Dawid,

I just tried the same steps on flink builded from git branch release-1.13
and everything works as expected!

Thank you all!

L.

On Wed, Mar 9, 2022 at 8:49 AM Dawid Wysakowicz 
wrote:

> Hi Lukas,
>
> I am afraid you're hitting this bug:
> https://issues.apache.org/jira/browse/FLINK-25952
>
> Best,
>
> Dawid
> On 08/03/2022 16:37, Lukáš Drbal wrote:
>
> Hello everyone,
>
> I'm trying to move savepoint to another s3 account but restore always
> failed with some weird 404 error.
>
> We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you can
> see version 1.13.6-396a8d44-szn which is just internal build from flink
> commit b2ca390d478aa855eb0f2028d0ed965803a98af1)
>
> What I'm trying to do:
>
>1. create savepoint for pipeline via ./bin/flink savepoint 
>2. copy data under path configured in state.savepoints.dir from source
>s3 to new s3
>3. change all configuration and restore pipeline
>
> Is this steps correct or I'm doing something wrong or unsupported?
>
> All options related to s3 have valid values for new s3 account but restore
> failed with exception bellow. Error message includes original path
> (s3://flink/savepoints/activity-searched-query) which doesn't exists on new
> account so that 404 is expected. But I still don't understand why flink
> tries that path because related config options contains new bucket info.
> high-availability.storageDir:
> 's3:///ha/pipelines-runner-activity-searched-query'
>
> jobmanager.archive.fs.dir: 's3:///history'
>
> state.checkpoints.dir:
>> 's3:///checkpoints/activity-searched-query'
>
> state.savepoints.dir:
>> 's3:///savepoints/activity-searched-query'
>
>
> + valid values for s3.access-key and s3.secret-key
>
> I found original path in _metadata file in savepoint data but changing
> that (search) leads to some weird OOM, I hope this should not be
> needed and that values should be ignored.
>
> state.backend is hashmap if it is important.
>
> Restore back from source butcket works as expected.
>
> Thanks a lot!
>
> Regards,
> L.
>
> Stacktrace:
>
> 2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4] INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph -
>> CombineToSearchedQuery -> (LateElementsCounter, TransformToStreamElement ->
>> Sink: SearchedQueryKafkaSink) (1/2) (0c0f108c393b9a5b58f861c1032671d0)
>> switched from INITIALIZING to FAILED on 10.67.158.155:45521-d8d19d @
>> 10.67.158.155 (dataPort=36341).
>> org.apache.flink.util.SerializedThrowable: Exception while creating
>> StreamOperatorStateContext.
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
>> at java.lang.Thread.run(Thread.java:832) ~[?:?]
>> Caused by: org.apache.flink.util.SerializedThrowable: Could not restore
>> keyed state backend for
>> WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any o

Move savepoint to another s3 bucket

2022-03-08 Thread Lukáš Drbal
Hello everyone,

I'm trying to move savepoint to another s3 account but restore always
failed with some weird 404 error.

We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you can
see version 1.13.6-396a8d44-szn which is just internal build from flink
commit b2ca390d478aa855eb0f2028d0ed965803a98af1)

What I'm trying to do:

   1. create savepoint for pipeline via ./bin/flink savepoint 
   2. copy data under path configured in state.savepoints.dir from source
   s3 to new s3
   3. change all configuration and restore pipeline

Is this steps correct or I'm doing something wrong or unsupported?

All options related to s3 have valid values for new s3 account but restore
failed with exception bellow. Error message includes original path
(s3://flink/savepoints/activity-searched-query) which doesn't exists on new
account so that 404 is expected. But I still don't understand why flink
tries that path because related config options contains new bucket info.
high-availability.storageDir:
's3:///ha/pipelines-runner-activity-searched-query'

jobmanager.archive.fs.dir: 's3:///history'

state.checkpoints.dir:
> 's3:///checkpoints/activity-searched-query'

state.savepoints.dir:
> 's3:///savepoints/activity-searched-query'


+ valid values for s3.access-key and s3.secret-key

I found original path in _metadata file in savepoint data but changing that
(search) leads to some weird OOM, I hope this should not be needed
and that values should be ignored.

state.backend is hashmap if it is important.

Restore back from source butcket works as expected.

Thanks a lot!

Regards,
L.

Stacktrace:

2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph -
> CombineToSearchedQuery -> (LateElementsCounter, TransformToStreamElement ->
> Sink: SearchedQueryKafkaSink) (1/2) (0c0f108c393b9a5b58f861c1032671d0)
> switched from INITIALIZING to FAILED on 10.67.158.155:45521-d8d19d @
> 10.67.158.155 (dataPort=36341).
> org.apache.flink.util.SerializedThrowable: Exception while creating
> StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at java.lang.Thread.run(Thread.java:832) ~[?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not restore
> keyed state backend for
> WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any of the 1
> provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> ... 10 more
> Caused by: org.apache.flink.util.SerializedThrowable: Failed when trying
> to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
> ~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
> at
> 

Re: [DISCUSS] Drop Gelly

2022-01-04 Thread Lukáš Drbal
Hi everyone,

We (team in seznam.cz) are actually using the Gelly library for batch
anomaly detection in our graphs. It will be very nice to somehow keep this
functionality, maybe in a separate repository. Is there any replacement?


Best,
Lukas

On Mon, Jan 3, 2022 at 2:20 PM Martijn Visser  wrote:

> Hi everyone,
>
> Flink is bundled with Gelly, a Graph API library [1]. This has been marked
> as approaching end-of-life for quite some time [2].
>
> Gelly is built on top of Flink's DataSet API, which is deprecated and
> slowly being phased out [3]. It only works on batch jobs. Based on the
> activity in the Dev and User mailing lists, I don't see a lot of questions
> popping up regarding the usage of Gelly. Removing Gelly would reduce CI
> time and resources because we won't need to run tests for this anymore.
>
> I'm cross-posting this to the User mailing list to see if there are any
> users of Gelly at the moment.
>
> Let me know your thoughts.
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/libs/gelly/overview/
>
> [2] https://flink.apache.org/roadmap.html
>
> [3] https://lists.apache.org/thread/b2y3xx3thbcbtzdphoct5wvzwogs9sqz
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:
-D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma  wrote:

> Hi,
> Thanks for providing the logs. From the logs this is a known bug.[1]
> Maybe you could use `$internal.pipeline.job-id` to set your own
> job-id.(Thanks to Wang Yang)
> But keep in mind this is only for internal use and may be changed in
> some release. So you should keep an eye on [1] for the correct solution.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19358
>
> Best,
> Guowei
>
>
> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:
>
>> Hello,
>>
>> sure. Here is log from first run which succeed -
>> https://pastebin.com/tV75ZS5S
>> and here is from second run (it's same for all next) -
>> https://pastebin.com/pwTFyGvE
>>
>> My Docker file is pretty simple, just take wordcount + S3
>>
>> FROM flink:1.12.2
>>
>> RUN mkdir -p $FLINK_HOME/usrlib
>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>  $FLINK_HOME/usrlib/wordcount.jar
>>
>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>
>> Thanks!
>>
>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>>
>>> Hi,
>>> After some discussion with Wang Yang offline, it seems that there might
>>> be a jobmanager failover. So would you like to share full jobmanager log?
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to use native kubernetes execution [1] for one batch job
>>>> and let scheduling on kubernetes. Flink version: 1.12.2.
>>>>
>>>> Kubernetes job:
>>>> apiVersion: batch/v1beta1
>>>> kind: CronJob
>>>> metadata:
>>>>   name: scheduled-job
>>>> spec:
>>>>   schedule: "*/1 * * * *"
>>>>   jobTemplate:
>>>> spec:
>>>>   template:
>>>> metadata:
>>>>   labels:
>>>> app: super-flink-batch-job
>>>> spec:
>>>>   containers:
>>>>   - name: runner
>>>> image: localhost:5000/batch-flink-app-v3:latest
>>>> imagePullPolicy: Always
>>>> command:
>>>>   - /bin/sh
>>>>   - -c
>>>>   - /opt/flink/bin/flink run-application --target
>>>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>>>> -Dkubernetes.rest-service.exposed.type=NodePort
>>>> -Dkubernetes.cluster-id=batch-job-cluster
>>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>>>> -Ds3.secret-key=SECRETKEY
>>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>>>> local:///opt/flink/usrlib/job.jar
>>>>   restartPolicy: OnFailure
>>>>
>>>>
>>>> This works well for me but I would like to write the result to the
>>>> archive path and show it in the History server (running as separate
>>>> deployment in k8)
>>>>
>>>> Anytime it creates JobId= which
>>>> obviously leads to
>>>>
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>>> already been submitted.
>>>> at
>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
&

Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello,

sure. Here is log from first run which succeed -
https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) -
https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
 $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/

Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:

> Hi,
> After some discussion with Wang Yang offline, it seems that there might be
> a jobmanager failover. So would you like to share full jobmanager log?
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
> wrote:
>
>> Hi,
>>
>> I would like to use native kubernetes execution [1] for one batch job and
>> let scheduling on kubernetes. Flink version: 1.12.2.
>>
>> Kubernetes job:
>> apiVersion: batch/v1beta1
>> kind: CronJob
>> metadata:
>>   name: scheduled-job
>> spec:
>>   schedule: "*/1 * * * *"
>>   jobTemplate:
>> spec:
>>   template:
>> metadata:
>>   labels:
>> app: super-flink-batch-job
>> spec:
>>   containers:
>>   - name: runner
>> image: localhost:5000/batch-flink-app-v3:latest
>> imagePullPolicy: Always
>> command:
>>   - /bin/sh
>>   - -c
>>   - /opt/flink/bin/flink run-application --target
>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>> -Dkubernetes.rest-service.exposed.type=NodePort
>> -Dkubernetes.cluster-id=batch-job-cluster
>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>> -Ds3.secret-key=SECRETKEY
>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>> local:///opt/flink/usrlib/job.jar
>>   restartPolicy: OnFailure
>>
>>
>> This works well for me but I would like to write the result to the
>> archive path and show it in the History server (running as separate
>> deployment in k8)
>>
>> Anytime it creates JobId= which obviously
>> leads to
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> ~[?:1.8.0_282]
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>> ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_282]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> ... 10 more
>>
>> I assume it is because it will spawn a completely new cluster for each
>> run.
>>
>> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>>
>> Thanks for advice.
>>
>> L.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>>
>


Native kubernetes execution and History server

2021-03-24 Thread Lukáš Drbal
Hi,

I would like to use native kubernetes execution [1] for one batch job and
let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
spec:
  template:
metadata:
  labels:
app: super-flink-batch-job
spec:
  containers:
  - name: runner
image: localhost:5000/batch-flink-app-v3:latest
imagePullPolicy: Always
command:
  - /bin/sh
  - -c
  - /opt/flink/bin/flink run-application --target
kubernetes-application -Dkubernetes.service-account=flink-service-account
-Dkubernetes.rest-service.exposed.type=NodePort
-Dkubernetes.cluster-id=batch-job-cluster
-Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
-Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
-Ds3.secret-key=SECRETKEY
-Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
-Ds3.path-style-access=true -Ds3.ssl.enabled=false
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-Dhigh-availability.storageDir=s3://flink/flink-ha
local:///opt/flink/usrlib/job.jar
  restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive
path and show it in the History server (running as separate deployment in
k8)

Anytime it creates JobId= which obviously
leads to

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
~[?:1.8.0_282]
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_282]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_282]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html