Re: PyFlink performance and deployment issues
Hi Wouter, I suspect that it’s transferring the file venv.zip and so it may take some time. Does it stuck there forever? Could you share some log file? Regards, Dian > 2021年8月14日 下午4:47,Wouter Zorgdrager 写道: > > Hi all, > > I'm still dealing with the PyFlink deployment issue as described below. I see > that I accidentally didn't forward it to the mailing list. Anyways, my job is > stuck in `Initializing` and the logs don't really give me a clue what is > going on. > In my IDE it runs fine. The command I use to submit to the cluster: > > export > PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python > > ./flink run \ > --target remote \ > -m localhost:8081 \ > -pyarch venv.zip \ > --pyExecutable venv.zip/venv/bin/python \ > --parallelism 1 \ > --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \ > --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar > > I hope someone can help me with this because it is a blocker for me. > > Thanks in advance, > Wouter > -- Forwarded message - > From: Wouter Zorgdrager mailto:zorgdrag...@gmail.com>> > Date: Thu, 8 Jul 2021 at 12:20 > Subject: Re: PyFlink performance and deployment issues > To: Xingbo Huang mailto:hxbks...@gmail.com>> > > > HI Xingbo, all, > > Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI > port) rather than the job submission port (--target remote -m > localhost:8081). For some reason, this does not give an error or warning and > just starts a local cluster. However, now I got another issue: my job is > stuck at initialization. Here an excerpt from the JM log: > > 2021-07-08 12:12:18,094 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > _stream_key_by_map_operator (1/1) (attempt #0) with attempt id > ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter > (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 > 2021-07-08 12:12:18,097 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from > SCHEDULED to DEPLOYING. > 2021-07-08 12:12:18,097 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Process-Stateful-User (1/1) (attempt #0) with attempt id > f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter > (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 > 2021-07-08 12:12:18,098 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map-Egress > -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal > -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from > SCHEDULED to DEPLOYING. > 2021-07-08 12:12:18,098 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> > Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id > 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter > (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 > 2021-07-08 12:12:18,484 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from > DEPLOYING to INITIALIZING. > 2021-07-08 12:12:18,488 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched > from DEPLOYING to INITIALIZING. > 2021-07-08 12:12:18,489 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Map-Egress > -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal > -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from > DEPLOYING to INITIALIZING. > 2021-07-08 12:12:18,490 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > Custom Source -> Route-Incoming-Events -> (Filter-On-User -> Map -> > (Filter-Init-User -> Init-User, Filter-Stateful-User), Filter -> Map) (1/1) > (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING. > > I run with parallelism 1 and these are the latest loglines from the TM (there > is no obvious error): > 2021-07-08 12:12:18,729 INFO > org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The > maximum bundle size is configured to 5. > 2021-07-08 12:12:18,729 INFO > org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The &g
Fwd: PyFlink performance and deployment issues
Hi all, I'm still dealing with the PyFlink deployment issue as described below. I see that I accidentally didn't forward it to the mailing list. Anyways, my job is stuck in `Initializing` and the logs don't really give me a clue what is going on. In my IDE it runs fine. The command I use to submit to the cluster: export PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python ./flink run \ --target remote \ -m localhost:8081 \ -pyarch venv.zip \ --pyExecutable venv.zip/venv/bin/python \ --parallelism 1 \ --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \ --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar I hope someone can help me with this because it is a blocker for me. Thanks in advance, Wouter -- Forwarded message - From: Wouter Zorgdrager Date: Thu, 8 Jul 2021 at 12:20 Subject: Re: PyFlink performance and deployment issues To: Xingbo Huang HI Xingbo, all, Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI port) rather than the job submission port (--target remote -m localhost:8081). For some reason, this does not give an error or warning and just starts a local cluster. However, now I got another issue: my job is stuck at initialization. Here an excerpt from the JM log: 2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0) with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> ( Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful- User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING. I run with parallelism 1 and these are the latest loglines from the TM (there is no obvious error): 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle size is configured to 5. 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds. 2021-07-08 12:12:18,791 WARN org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Error while loading kafka-version.properties: inStream parameter is null 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1625739138789 2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic client_reply 2021-0
Re: PyFlink performance and deployment issues
Hi Wouter, The JIRA is https://issues.apache.org/jira/browse/FLINK-23309. `bundle time` is from the perspective of your e2e latency. Regarding the `bundle size`, generally larger value will provide better throughput, but it should not be set too large, which may cause no output to be seen downstream for a long time and the pressure will be too great during checkpoint. Best, Xingbo Wouter Zorgdrager 于2021年7月8日周四 下午4:32写道: > Hi Xingbo, all, > > That is good to know, thank you. Is there any Jira issue I can track? I'm > curious to follow this progress! Do you have any recommendations with > regard to these two configuration values, to get somewhat reasonable > performance? > > Thanks a lot! > Wouter > > On Thu, 8 Jul 2021 at 10:26, Xingbo Huang wrote: > >> Hi Wouter, >> >> In fact, our users have encountered the same problem. Whenever the >> `bundle size` or `bundle time` is reached, the data in the buffer needs to >> be sent from the jvm to the pvm, and then waits for the pym to be processed >> and sent back to the jvm to send all the results to the downstream >> operator, which leads to a large delay, especially when it is a small size >> event as small messages are hard to be processed in pipeline. >> >> I have been solving this problem recently and I plan to make this >> optimization to release-1.14. >> >> Best, >> Xingbo >> >> Wouter Zorgdrager 于2021年7月8日周四 下午3:41写道: >> >>> Hi Dian, all, >>> >>> I will come back to the other points asap. However, I’m still confused >>> about this performance. Is this what I can expect in PyFlink in terms of >>> performance? ~ 1000ms latency for single events? I also had a very simple >>> setup where I send 1000 events to Kafka per second and response >>> times/latencies was around 15 seconds for single events. I understand there >>> is some Python/JVM overhead but since Flink is so performant, I would >>> expect much better numbers. In the current situation, PyFlink would just be >>> unusable if you care about latency. Is this something that you expect to be >>> improved in the future? >>> >>> I will verify how this works out for Beam in a remote environment. >>> >>> Thanks again! >>> Wouter >>> >>> >>> On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: >>> Hi Wouter, 1) Regarding the performance difference between Beam and PyFlink, I guess it’s because you are using an in-memory runner when running it locally in Beam. In that case, the code path is totally differently compared to running in a remote cluster. 2) Regarding to `flink run`, I’m surprising that it’s running locally. Could you submit a java job with similar commands to see how it runs? 3) Regarding to `flink run-application`, could you share the exception stack? Regards, Dian 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: uses
Re: PyFlink performance and deployment issues
Hi Xingbo, all, That is good to know, thank you. Is there any Jira issue I can track? I'm curious to follow this progress! Do you have any recommendations with regard to these two configuration values, to get somewhat reasonable performance? Thanks a lot! Wouter On Thu, 8 Jul 2021 at 10:26, Xingbo Huang wrote: > Hi Wouter, > > In fact, our users have encountered the same problem. Whenever the `bundle > size` or `bundle time` is reached, the data in the buffer needs to be sent > from the jvm to the pvm, and then waits for the pym to be processed and > sent back to the jvm to send all the results to the downstream operator, > which leads to a large delay, especially when it is a small size event as > small messages are hard to be processed in pipeline. > > I have been solving this problem recently and I plan to make this > optimization to release-1.14. > > Best, > Xingbo > > Wouter Zorgdrager 于2021年7月8日周四 下午3:41写道: > >> Hi Dian, all, >> >> I will come back to the other points asap. However, I’m still confused >> about this performance. Is this what I can expect in PyFlink in terms of >> performance? ~ 1000ms latency for single events? I also had a very simple >> setup where I send 1000 events to Kafka per second and response >> times/latencies was around 15 seconds for single events. I understand there >> is some Python/JVM overhead but since Flink is so performant, I would >> expect much better numbers. In the current situation, PyFlink would just be >> unusable if you care about latency. Is this something that you expect to be >> improved in the future? >> >> I will verify how this works out for Beam in a remote environment. >> >> Thanks again! >> Wouter >> >> >> On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: >> >>> Hi Wouter, >>> >>> 1) Regarding the performance difference between Beam and PyFlink, I >>> guess it’s because you are using an in-memory runner when running it >>> locally in Beam. In that case, the code path is totally differently >>> compared to running in a remote cluster. >>> 2) Regarding to `flink run`, I’m surprising that it’s running locally. >>> Could you submit a java job with similar commands to see how it runs? >>> 3) Regarding to `flink run-application`, could you share the exception >>> stack? >>> >>> Regards, >>> Dian >>> >>> 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: >>> >>> uses >>> >>> >>>
Re: PyFlink performance and deployment issues
Hi Wouter, In fact, our users have encountered the same problem. Whenever the `bundle size` or `bundle time` is reached, the data in the buffer needs to be sent from the jvm to the pvm, and then waits for the pym to be processed and sent back to the jvm to send all the results to the downstream operator, which leads to a large delay, especially when it is a small size event as small messages are hard to be processed in pipeline. I have been solving this problem recently and I plan to make this optimization to release-1.14. Best, Xingbo Wouter Zorgdrager 于2021年7月8日周四 下午3:41写道: > Hi Dian, all, > > I will come back to the other points asap. However, I’m still confused > about this performance. Is this what I can expect in PyFlink in terms of > performance? ~ 1000ms latency for single events? I also had a very simple > setup where I send 1000 events to Kafka per second and response > times/latencies was around 15 seconds for single events. I understand there > is some Python/JVM overhead but since Flink is so performant, I would > expect much better numbers. In the current situation, PyFlink would just be > unusable if you care about latency. Is this something that you expect to be > improved in the future? > > I will verify how this works out for Beam in a remote environment. > > Thanks again! > Wouter > > > On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: > >> Hi Wouter, >> >> 1) Regarding the performance difference between Beam and PyFlink, I guess >> it’s because you are using an in-memory runner when running it locally in >> Beam. In that case, the code path is totally differently compared to >> running in a remote cluster. >> 2) Regarding to `flink run`, I’m surprising that it’s running locally. >> Could you submit a java job with similar commands to see how it runs? >> 3) Regarding to `flink run-application`, could you share the exception >> stack? >> >> Regards, >> Dian >> >> 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: >> >> uses >> >> >>
Re: PyFlink performance and deployment issues
Hi Dian, all, I will come back to the other points asap. However, I’m still confused about this performance. Is this what I can expect in PyFlink in terms of performance? ~ 1000ms latency for single events? I also had a very simple setup where I send 1000 events to Kafka per second and response times/latencies was around 15 seconds for single events. I understand there is some Python/JVM overhead but since Flink is so performant, I would expect much better numbers. In the current situation, PyFlink would just be unusable if you care about latency. Is this something that you expect to be improved in the future? I will verify how this works out for Beam in a remote environment. Thanks again! Wouter On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: > Hi Wouter, > > 1) Regarding the performance difference between Beam and PyFlink, I guess > it’s because you are using an in-memory runner when running it locally in > Beam. In that case, the code path is totally differently compared to > running in a remote cluster. > 2) Regarding to `flink run`, I’m surprising that it’s running locally. > Could you submit a java job with similar commands to see how it runs? > 3) Regarding to `flink run-application`, could you share the exception > stack? > > Regards, > Dian > > 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: > > uses > > >
Re: PyFlink performance and deployment issues
Hi Wouter, 1) Regarding the performance difference between Beam and PyFlink, I guess it’s because you are using an in-memory runner when running it locally in Beam. In that case, the code path is totally differently compared to running in a remote cluster. 2) Regarding to `flink run`, I’m surprising that it’s running locally. Could you submit a java job with similar commands to see how it runs? 3) Regarding to `flink run-application`, could you share the exception stack? Regards, Dian > 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: > > uses
Re: PyFlink performance and deployment issues
Hi Wouter, Sorry for the late reply. I will try to answer your questions in detail. 1. >>> Perforce problem. When running udf job locally, beam will use a loopback way to connect back to the python process used by the compilation job, so the time of starting up the job will come faster than pyflink which will create a new python process to execute udf code. 2. >>> However, this command created a local MiniCluster again rather than submitting it to my remote cluster. I tried to successfully submit a Python job to the standalone cluster to run through the following command .bin/start-cluster.sh ./bin/flink run --target remote \ -m localhost:8086 \ -pyarch /Users/duanchen/venv/venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 1 \ --python /Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py \ --jarfile /Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar The situation you encountered is very strange 3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster using the following command: When running in Application mode, you should be sure that all paths are accessible by the JobManager of your application. The path of ~/Documents/runtime.py is under your client side, right? You need to use the path under your k8s cluster. This part of the document does not explain these implicit things well. 4. >>> Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. You can see if Kafka Table Connector[1] can meet your needs. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features Best, Xingbo Wouter Zorgdrager 于2021年7月6日周二 下午4:58写道: > Dear community, > > I have been struggling a lot with the deployment of my PyFlink job. > Moreover, the performance seems to be very disappointing especially the > low-throughput latency. I have been playing around with configuration > values, but it has not been improving. > In short, I have a Datastream job with multiple Python operators including > a ProcessFunction. The job reads from Kafka and writes to Kafka again. For > single events, E2E latency has been somewhere between 600ms and 2000ms. > When I'm increasing throughput, latency becomes in the order of seconds. > This is when I configure my job like this > config.set_integer("python.fn-execution.bundle.time", 1) > config.set_integer("python.fn-execution.bundle.size", 1) > I tried several configuration values, but the results are similar. > Interestingly, I have a similar Python streaming application written in > Apache Beam which does have low-latency, single events are processed < > 30ms. If I recall correctly, they use the same technique with bundling and > sending to Python processes. > On the other hand, Beam uses an in-memory runner when running locally > which might change the situation. I'm not sure how that compares to a local > Flink MiniCluster. > > I hoped that performance might improve when I deploy this on a (remote) > Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink > job to a remote Flink cluster. In my first attempt, I created a local TM + > JM setup and tried to deploy it using the "./flink run" command. > However, this command created a local MiniCluster again rather than > submitting it to my remote cluster. The full command was: > ./flink run --target remote \ > -m localhost:8081 \ > -pyarch venv.zip \ > -pyexec venv.zip/venv/bin/python \ > --parallelism 4 \ > --python ~/Documents/runtime.py \ > --jarfile ~/Documents/combined.jar > > Note that venv.zip stores all the Python dependencies for my PyFlink job > whereas combined.jar stores the Java dependencies. I tried several > variants of this command, but it *never *submitted to my running > JobManager and always ran it locally. > In my second attempt, I tried deploying it to a Kubernetes cluster using > the following command: > > ./flink run-application \ > --target kubernetes-application \ > -Dkubernetes.cluster-id=flink-cluster \ > -Dtaskmanager.memory.process.size=4096m \ > -Dkubernetes.taskmanager.cpu=2 \ > -Dkubernetes.service-account=flink-service-account \ > -Dtaskmanager.numberOfTaskSlots=4 \ > -Dkubernetes.container.image=pyflink:latest \ > -pyarch venv.zip \ > -pyexec venv.zip/venv/bin/python \ > --parallelism 4 \ > -py ~/Documents/runtime.py \ > --jarfile ~/Documents/combined.jar > > I created the pyflink:latest image by following the documentation here > [1] It was unclear to me if had to include my project files in this Docker > image. > When running it like this, it did submit it to the remote K8s cluster but > I got an exception that it could not find my runtime.py file in some sort > of tmp folder. > > Lastly, I wondered if it is possible to set a key for events send to the > KafkaProducer. Right now, it seems you can only configure some (static) > properties and the serializer. > Is there are a workaround to be able to
PyFlink performance and deployment issues
Dear community, I have been struggling a lot with the deployment of my PyFlink job. Moreover, the performance seems to be very disappointing especially the low-throughput latency. I have been playing around with configuration values, but it has not been improving. In short, I have a Datastream job with multiple Python operators including a ProcessFunction. The job reads from Kafka and writes to Kafka again. For single events, E2E latency has been somewhere between 600ms and 2000ms. When I'm increasing throughput, latency becomes in the order of seconds. This is when I configure my job like this config.set_integer("python.fn-execution.bundle.time", 1) config.set_integer("python.fn-execution.bundle.size", 1) I tried several configuration values, but the results are similar. Interestingly, I have a similar Python streaming application written in Apache Beam which does have low-latency, single events are processed < 30ms. If I recall correctly, they use the same technique with bundling and sending to Python processes. On the other hand, Beam uses an in-memory runner when running locally which might change the situation. I'm not sure how that compares to a local Flink MiniCluster. I hoped that performance might improve when I deploy this on a (remote) Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink job to a remote Flink cluster. In my first attempt, I created a local TM + JM setup and tried to deploy it using the "./flink run" command. However, this command created a local MiniCluster again rather than submitting it to my remote cluster. The full command was: ./flink run --target remote \ -m localhost:8081 \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ --python ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar Note that venv.zip stores all the Python dependencies for my PyFlink job whereas combined.jar stores the Java dependencies. I tried several variants of this command, but it *never *submitted to my running JobManager and always ran it locally. In my second attempt, I tried deploying it to a Kubernetes cluster using the following command: ./flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-cluster \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dkubernetes.service-account=flink-service-account \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=pyflink:latest \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ -py ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar I created the pyflink:latest image by following the documentation here [1] It was unclear to me if had to include my project files in this Docker image. When running it like this, it did submit it to the remote K8s cluster but I got an exception that it could not find my runtime.py file in some sort of tmp folder. Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. Right now, it seems you can only configure some (static) properties and the serializer. Is there are a workaround to be able to set the key and value of an event in PyFlink? I hope you can help me out with my struggles! Thanks in advance. Regards, Wouter [1] - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python