Re: PyFlink performance and deployment issues

2021-08-15 Thread Dian Fu
Hi Wouter,

I suspect that it’s transferring the file venv.zip and so it may take some 
time. Does it stuck there forever? Could you share some log file?

Regards,
Dian

> 2021年8月14日 下午4:47,Wouter Zorgdrager  写道:
> 
> Hi all,
> 
> I'm still dealing with the PyFlink deployment issue as described below. I see 
> that I accidentally didn't forward it to the mailing list. Anyways, my job is 
> stuck in `Initializing` and the logs don't really give me a clue what is 
> going on.
> In my IDE it runs fine. The command I use to submit to the cluster:
> 
> export 
> PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python
> 
> ./flink run \
>   --target remote \
>   -m localhost:8081 \
>   -pyarch venv.zip \
>   --pyExecutable venv.zip/venv/bin/python \
>   --parallelism 1 \
>   --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
>   --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar
> 
> I hope someone can help me with this because it is a blocker for me.
> 
> Thanks in advance,
> Wouter
> -- Forwarded message -
> From: Wouter Zorgdrager mailto:zorgdrag...@gmail.com>>
> Date: Thu, 8 Jul 2021 at 12:20
> Subject: Re: PyFlink performance and deployment issues
> To: Xingbo Huang mailto:hxbks...@gmail.com>>
> 
> 
> HI Xingbo, all,
> 
> Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI 
> port) rather than the job submission port (--target remote -m 
> localhost:8081). For some reason, this does not give an error or warning and 
> just starts a local cluster. However, now I got another issue: my job is 
> stuck at initialization. Here an excerpt from the JM log:
> 
> 2021-07-08 12:12:18,094 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying 
> _stream_key_by_map_operator (1/1) (attempt #0) with attempt id 
> ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter 
> (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,097 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
> Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from 
> SCHEDULED to DEPLOYING.
> 2021-07-08 12:12:18,097 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying 
> Process-Stateful-User (1/1) (attempt #0) with attempt id 
> f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter 
> (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,098 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Map-Egress 
> -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal 
> -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from 
> SCHEDULED to DEPLOYING.
> 2021-07-08 12:12:18,098 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying 
> Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> 
> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 
> 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter 
> (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
> 2021-07-08 12:12:18,484 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
> Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from 
> DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,488 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
> _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched 
> from DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,489 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Map-Egress 
> -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal 
> -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from 
> DEPLOYING to INITIALIZING.
> 2021-07-08 12:12:18,490 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
> Custom Source -> Route-Incoming-Events -> (Filter-On-User -> Map -> 
> (Filter-Init-User -> Init-User, Filter-Stateful-User), Filter -> Map) (1/1) 
> (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING.
> 
> I run with parallelism 1 and these are the latest loglines from the TM (there 
> is no obvious error):
> 2021-07-08 12:12:18,729 INFO  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The 
> maximum bundle size is configured to 5.
> 2021-07-08 12:12:18,729 INFO  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The 
&g

Fwd: PyFlink performance and deployment issues

2021-08-14 Thread Wouter Zorgdrager
Hi all,

I'm still dealing with the PyFlink deployment issue as described below. I
see that I accidentally didn't forward it to the mailing list. Anyways, my
job is stuck in `Initializing` and the logs don't really give me a clue
what is going on.
In my IDE it runs fine. The command I use to submit to the cluster:

export
PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python

./flink run \
  --target remote \
  -m localhost:8081 \
  -pyarch venv.zip \
  --pyExecutable venv.zip/venv/bin/python \
  --parallelism 1 \
  --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
  --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar

I hope someone can help me with this because it is a blocker for me.

Thanks in advance,
Wouter
-- Forwarded message -
From: Wouter Zorgdrager 
Date: Thu, 8 Jul 2021 at 12:20
Subject: Re: PyFlink performance and deployment issues
To: Xingbo Huang 


HI Xingbo, all,

Regarding point 2, I actually made a mistake there. I picked port 8081
(WebUI port) rather than the job submission port (--target remote -m
localhost:8081). For some reason, this does not give an error or warning
and just starts a local cluster. However, now I got another issue: my job
is stuck at initialization. Here an excerpt from the JM log:

2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0)
with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with
attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client ->
Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt
#0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77
@ wouter (dataPort=40987) with allocation id
d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - _stream_key_by_map_operator (1/1)
(ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> (
Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-
User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from
DEPLOYING to INITIALIZING.

I run with parallelism 1 and these are the latest loglines from the TM
(there is no obvious error):
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:18,791 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Error while loading kafka-version.properties: inStream parameter is
null
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka version: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka commitId: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka startTimeMs: 1625739138789
2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to
produce into default topic client_reply
2021-0

Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

The JIRA is https://issues.apache.org/jira/browse/FLINK-23309. `bundle
time` is from the perspective of your e2e latency. Regarding the `bundle
size`, generally larger value will provide better throughput, but it should
not be set too large, which may cause no output to be seen downstream for a
long time and the pressure will be too great during checkpoint.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午4:32写道:

> Hi Xingbo, all,
>
> That is good to know, thank you. Is there any Jira issue I can track? I'm
> curious to follow this progress! Do you have any recommendations with
> regard to these two configuration values, to get somewhat reasonable
> performance?
>
> Thanks a lot!
> Wouter
>
> On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:
>
>> Hi Wouter,
>>
>> In fact, our users have encountered the same problem. Whenever the
>> `bundle size` or `bundle time` is reached, the data in the buffer needs to
>> be sent from the jvm to the pvm, and then waits for the pym to be processed
>> and sent back to the jvm to send all the results to the downstream
>> operator, which leads to a large delay, especially when it is a small size
>> event as small messages are hard to be processed in pipeline.
>>
>> I have been solving this problem recently and I plan to make this
>> optimization to release-1.14.
>>
>> Best,
>> Xingbo
>>
>> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>>
>>> Hi Dian, all,
>>>
>>>  I will come back to the other points asap. However, I’m still confused
>>> about this performance. Is this what I can expect in PyFlink in terms of
>>> performance? ~ 1000ms latency for single events? I also had a very simple
>>> setup where I send 1000 events to Kafka per second and response
>>> times/latencies was around 15 seconds for single events. I understand there
>>> is some Python/JVM overhead but since Flink is so performant, I would
>>> expect much better numbers. In the current situation, PyFlink would just be
>>> unusable if you care about latency. Is this something that you expect to be
>>> improved in the future?
>>>
>>> I will verify how this works out for Beam in a remote environment.
>>>
>>> Thanks again!
>>> Wouter
>>>
>>>
>>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>>
 Hi Wouter,

 1) Regarding the performance difference between Beam and PyFlink, I
 guess it’s because you are using an in-memory runner when running it
 locally in Beam. In that case, the code path is totally differently
 compared to running in a remote cluster.
 2) Regarding to `flink run`, I’m surprising that it’s running locally.
 Could you submit a java job with similar commands to see how it runs?
 3) Regarding to `flink run-application`, could you share the exception
 stack?

 Regards,
 Dian

 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:

 uses





Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Xingbo, all,

That is good to know, thank you. Is there any Jira issue I can track? I'm
curious to follow this progress! Do you have any recommendations with
regard to these two configuration values, to get somewhat reasonable
performance?

Thanks a lot!
Wouter

On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:

> Hi Wouter,
>
> In fact, our users have encountered the same problem. Whenever the `bundle
> size` or `bundle time` is reached, the data in the buffer needs to be sent
> from the jvm to the pvm, and then waits for the pym to be processed and
> sent back to the jvm to send all the results to the downstream operator,
> which leads to a large delay, especially when it is a small size event as
> small messages are hard to be processed in pipeline.
>
> I have been solving this problem recently and I plan to make this
> optimization to release-1.14.
>
> Best,
> Xingbo
>
> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>
>> Hi Dian, all,
>>
>>  I will come back to the other points asap. However, I’m still confused
>> about this performance. Is this what I can expect in PyFlink in terms of
>> performance? ~ 1000ms latency for single events? I also had a very simple
>> setup where I send 1000 events to Kafka per second and response
>> times/latencies was around 15 seconds for single events. I understand there
>> is some Python/JVM overhead but since Flink is so performant, I would
>> expect much better numbers. In the current situation, PyFlink would just be
>> unusable if you care about latency. Is this something that you expect to be
>> improved in the future?
>>
>> I will verify how this works out for Beam in a remote environment.
>>
>> Thanks again!
>> Wouter
>>
>>
>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>
>>> Hi Wouter,
>>>
>>> 1) Regarding the performance difference between Beam and PyFlink, I
>>> guess it’s because you are using an in-memory runner when running it
>>> locally in Beam. In that case, the code path is totally differently
>>> compared to running in a remote cluster.
>>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>>> Could you submit a java job with similar commands to see how it runs?
>>> 3) Regarding to `flink run-application`, could you share the exception
>>> stack?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>>
>>> uses
>>>
>>>
>>>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

In fact, our users have encountered the same problem. Whenever the `bundle
size` or `bundle time` is reached, the data in the buffer needs to be sent
from the jvm to the pvm, and then waits for the pym to be processed and
sent back to the jvm to send all the results to the downstream operator,
which leads to a large delay, especially when it is a small size event as
small messages are hard to be processed in pipeline.

I have been solving this problem recently and I plan to make this
optimization to release-1.14.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:

> Hi Dian, all,
>
>  I will come back to the other points asap. However, I’m still confused
> about this performance. Is this what I can expect in PyFlink in terms of
> performance? ~ 1000ms latency for single events? I also had a very simple
> setup where I send 1000 events to Kafka per second and response
> times/latencies was around 15 seconds for single events. I understand there
> is some Python/JVM overhead but since Flink is so performant, I would
> expect much better numbers. In the current situation, PyFlink would just be
> unusable if you care about latency. Is this something that you expect to be
> improved in the future?
>
> I will verify how this works out for Beam in a remote environment.
>
> Thanks again!
> Wouter
>
>
> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> 1) Regarding the performance difference between Beam and PyFlink, I guess
>> it’s because you are using an in-memory runner when running it locally in
>> Beam. In that case, the code path is totally differently compared to
>> running in a remote cluster.
>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>> Could you submit a java job with similar commands to see how it runs?
>> 3) Regarding to `flink run-application`, could you share the exception
>> stack?
>>
>> Regards,
>> Dian
>>
>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>
>> uses
>>
>>
>>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Dian, all,

 I will come back to the other points asap. However, I’m still confused
about this performance. Is this what I can expect in PyFlink in terms of
performance? ~ 1000ms latency for single events? I also had a very simple
setup where I send 1000 events to Kafka per second and response
times/latencies was around 15 seconds for single events. I understand there
is some Python/JVM overhead but since Flink is so performant, I would
expect much better numbers. In the current situation, PyFlink would just be
unusable if you care about latency. Is this something that you expect to be
improved in the future?

I will verify how this works out for Beam in a remote environment.

Thanks again!
Wouter


On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:

> Hi Wouter,
>
> 1) Regarding the performance difference between Beam and PyFlink, I guess
> it’s because you are using an in-memory runner when running it locally in
> Beam. In that case, the code path is totally differently compared to
> running in a remote cluster.
> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
> Could you submit a java job with similar commands to see how it runs?
> 3) Regarding to `flink run-application`, could you share the exception
> stack?
>
> Regards,
> Dian
>
> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>
> uses
>
>
>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Dian Fu
Hi Wouter,

1) Regarding the performance difference between Beam and PyFlink, I guess it’s 
because you are using an in-memory runner when running it locally in Beam. In 
that case, the code path is totally differently compared to running in a remote 
cluster.
2) Regarding to `flink run`, I’m surprising that it’s running locally. Could 
you submit a java job with similar commands to see how it runs?
3) Regarding to `flink run-application`, could you share the exception stack?

Regards,
Dian

> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
> 
> uses



Re: PyFlink performance and deployment issues

2021-07-07 Thread Xingbo Huang
Hi Wouter,
Sorry for the late reply. I will try to answer your questions in detail.

1. >>> Perforce problem.
When running udf job locally, beam will use a loopback way to connect back
to the python process used by the compilation job, so the time of starting
up the job will come faster than pyflink which will create a new python
process to execute udf code.

2. >>> However, this command created a local MiniCluster again rather than
submitting it to my remote cluster.
I tried to successfully submit a Python job to the standalone cluster to
run through the following command

.bin/start-cluster.sh
./bin/flink run --target remote \
-m localhost:8086 \
-pyarch /Users/duanchen/venv/venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 1 \
--python
/Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py
\
--jarfile
/Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar

The situation you encountered is very strange

3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster
using the following command:

When running in Application mode, you should be sure that all paths are
accessible by the JobManager of your application. The path of
~/Documents/runtime.py is under your client side, right? You need to use
the path under your k8s cluster. This part of the document does not explain
these implicit things well.

4. >>> Lastly, I wondered if it is possible to set a key for events send to
the KafkaProducer.
You can see if Kafka Table Connector[1] can meet your needs.


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features

Best,
Xingbo

Wouter Zorgdrager  于2021年7月6日周二 下午4:58写道:

> Dear community,
>
> I have been struggling a lot with the deployment of my PyFlink job.
> Moreover, the performance seems to be very disappointing especially the
> low-throughput latency. I have been playing around with configuration
> values, but it has not been improving.
> In short, I have a Datastream job with multiple Python operators including
> a ProcessFunction. The job reads from Kafka and writes to Kafka again. For
> single events, E2E latency has been somewhere between 600ms and 2000ms.
> When I'm increasing throughput, latency becomes in the order of seconds.
> This is when I configure my job like this
> config.set_integer("python.fn-execution.bundle.time", 1)
> config.set_integer("python.fn-execution.bundle.size", 1)
> I tried several configuration values, but the results are similar.
> Interestingly, I have a similar Python streaming application written in
> Apache Beam which does have low-latency, single events are processed <
> 30ms.  If I recall correctly, they use the same technique with bundling and
> sending to Python processes.
> On the other hand, Beam uses an in-memory runner when running locally
> which might change the situation. I'm not sure how that compares to a local
> Flink MiniCluster.
>
> I hoped that performance might improve when I deploy this on a (remote)
> Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
> job to a remote Flink cluster. In my first attempt, I created a local TM +
> JM setup and tried to deploy it using the "./flink run" command.
> However, this command created a local MiniCluster again rather than
> submitting it to my remote cluster. The full command was:
> ./flink run --target remote \
> -m localhost:8081 \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> --python ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
>
> Note that venv.zip stores all the Python dependencies for my PyFlink job
> whereas combined.jar stores the Java dependencies. I tried several
> variants of this command, but it *never *submitted to my running
> JobManager and always ran it locally.
> In my second attempt, I tried deploying it to a Kubernetes cluster using
> the following command:
>
> ./flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.taskmanager.cpu=2 \
> -Dkubernetes.service-account=flink-service-account \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dkubernetes.container.image=pyflink:latest \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> -py ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
>
> I created the pyflink:latest image by following the documentation here
> [1] It was unclear to me if had to include my project files in this Docker
> image.
> When running it like this, it did submit it to the remote K8s cluster but
> I got an exception that it could not find my runtime.py file in some sort
> of tmp folder.
>
> Lastly, I wondered if it is possible to set a key for events send to the
> KafkaProducer. Right now, it seems you can only configure some (static)
> properties and the serializer.
> Is there are a workaround to be able to 

PyFlink performance and deployment issues

2021-07-06 Thread Wouter Zorgdrager
Dear community,

I have been struggling a lot with the deployment of my PyFlink job.
Moreover, the performance seems to be very disappointing especially the
low-throughput latency. I have been playing around with configuration
values, but it has not been improving.
In short, I have a Datastream job with multiple Python operators including
a ProcessFunction. The job reads from Kafka and writes to Kafka again. For
single events, E2E latency has been somewhere between 600ms and 2000ms.
When I'm increasing throughput, latency becomes in the order of seconds.
This is when I configure my job like this
config.set_integer("python.fn-execution.bundle.time", 1)
config.set_integer("python.fn-execution.bundle.size", 1)
I tried several configuration values, but the results are similar.
Interestingly, I have a similar Python streaming application written in
Apache Beam which does have low-latency, single events are processed <
30ms.  If I recall correctly, they use the same technique with bundling and
sending to Python processes.
On the other hand, Beam uses an in-memory runner when running locally which
might change the situation. I'm not sure how that compares to a local Flink
MiniCluster.

I hoped that performance might improve when I deploy this on a (remote)
Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
job to a remote Flink cluster. In my first attempt, I created a local TM +
JM setup and tried to deploy it using the "./flink run" command.
However, this command created a local MiniCluster again rather than
submitting it to my remote cluster. The full command was:
./flink run --target remote \
-m localhost:8081 \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
--python ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

Note that venv.zip stores all the Python dependencies for my PyFlink job
whereas combined.jar stores the Java dependencies. I tried several variants
of this command, but it *never *submitted to my running JobManager and
always ran it locally.
In my second attempt, I tried deploying it to a Kubernetes cluster using
the following command:

./flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dkubernetes.service-account=flink-service-account \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=pyflink:latest \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
-py ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

I created the pyflink:latest image by following the documentation here [1]
It was unclear to me if had to include my project files in this Docker
image.
When running it like this, it did submit it to the remote K8s cluster but I
got an exception that it could not find my runtime.py file in some sort of
tmp folder.

Lastly, I wondered if it is possible to set a key for events send to the
KafkaProducer. Right now, it seems you can only configure some (static)
properties and the serializer.
Is there are a workaround to be able to set the key and value of an event
in PyFlink?

I hope you can help me out with my struggles! Thanks in advance.

Regards,
Wouter

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python