Re: flink sql job 提交流程问题

2021-08-14 Thread Peihui He
补充:
这个问题在ha的情况下非常突出,因为和hdfs的交互式线性的,当文件达到几百的时候,特别慢

Peihui He  于2021年8月15日周日 上午11:18写道:

> Hi all:
>
> 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题:
> 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的
>
> 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob 
> server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。
>
> 通过本地测试,简单的调整了代码,示例如下:
> BlobServer:
> [image: image.png]
>
> ClientUtils
> [image: image.png]
> 调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。
>
> 现有如下疑问:
> 1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。
> 2、blob server 中moveTempFileToStore 的写锁是否真的必要呢?
>
> Best wishes.
>


flink sql job 提交流程问题

2021-08-14 Thread Peihui He
Hi all:

在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题:
1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的
2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob
server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。

通过本地测试,简单的调整了代码,示例如下:
BlobServer:
[image: image.png]

ClientUtils
[image: image.png]
调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。

现有如下疑问:
1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。
2、blob server 中moveTempFileToStore 的写锁是否真的必要呢?

Best wishes.


Re: Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-14 Thread Piotr Nowojski
Hi,

FYI, the performance regression after upgrading RocksDB was clearly visible
in all of our RocksDB related benchmarks, like for example:

http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS=2
http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS_INC=2
(and many more in the State Backends executable)

It's 6% to 12% across the board.

Best,
Piotrek


śr., 11 sie 2021 o 13:42 张蛟  napisał(a):

> Hi, Nico and yun:
>Thanks for your great work and detail description on rocksdb
> version upgrade and performance. About 800 jobs are using rocksdb state
> backend in our production environment, and we
> plan to upgrade more aim to solve the gc problems caused by large
> states.Because of non-restrict memory control on rocksdb, we have to spend
> a lot of time to solve the problem of memory usage beyond the physical
> memory.With the support of strict block cache, things will become much
> easier. Also, delete range api is useful for us too, so we prefer to
> upgrade the rocksdb to the new release version and +1(non-binding). best,
> zlzhang0122
>
> At 2021-08-05 01:50:07, "Yun Tang"  wrote:
> >Hi Yuval,
> >
> >Upgrading RocksDB version is a long story since Flink-1.10.
> >When we first plan to introduce write buffer manager to help control the
> memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from
> current RocksDB-5.17. However, we found performance regression in our micro
> benchmark on state operations [1] if bumped to RocksDB-5.18. We did not
> figure the root cause at that time and decide to cherry pick the commits of
> write buffer manager to our own FRocksDB [2]. And we finally released our
> own frocksdbjni-5.17.2-artisans-2.0 at that time.
> >
> >As time goes no, more and more bugs or missed features have been reported
> in the old RocksDB version. Such as:
> >
> >  1.  Cannot support ARM platform [3]
> >  2.  Dose not have stable deleteRange API, which is useful for Flink
> scale out [4]
> >  3.  Cannot support strict block cache [5]
> >  4.  Checkpoint might stuck if using UNIVERSVAL compaction strategy [6]
> >  5.  Uncontrolled log size make us disabled the RocksDB internal LOG [7]
> >  6.  RocksDB's optimizeForPointLookup option might cause data lost [8]
> >  7.  Current dummy entry used for memory control in RocksDB-5.17 is too
> large, leading performance problem [9]
> >  8.  Cannot support alpine-based images.
> >  9.  ...
> >
> >Some of the bugs are walked around, and some are still open.
> >
> >And we decide to make some changes from Flink-1.12. First of all, we
> reported the performance regression problem compared with RocksDB-5.18 and
> RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions
> are a bit older for the community, and RocksJava usage might not be the
> core part for facebook guys, we did not get useful replies. Thus, we decide
> to figure out the root cause of performance regression by ourself.
> >Fortunately, we find the cause via binary search the commits among
> RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To
> be short, the performance regression is due to different implementation of
> `__thread` and `thread_local` in gcc and would have more impact on dynamic
> loading [11], which is also what current RocksJava jar package does. With
> my patch [12], the performance regression would disappear if comparing
> RocksDB-5.18 with RocksDB-5.17.
> >
> >Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to
> RocksDB-6.x. However, another performance regression appeared even with my
> patch [12]. With previous knowledge, we know that we must verify the built
> .so files with our java-based benchmark instead of using RocksDB built-in
> db-bench. I started to search the 1340+ commits from RocksDB-5.18 to
> RocksDB-6.11 to find the performance problem. However, I did not figure out
> the root cause after spending several weeks this time. The performance
> behaves up and down in those commits and I cannot get the commit which lead
> the performance regression. Take this commit of integrating block cache
> tracer in block-based table reader [13] for example, I noticed that this
> commit would cause a bit performance regression and that might be the
> useless usage accounting in operations, however, the problematic code was
> changed in later commits. Thus, after several weeks digging, I have to give
> up for the endless searching in the thousand commits temporarily. As
> RocksDB community seems not make the project management system public,
> unlike Apache's open JIRA systems, we do not know what benchmark they
> actually run before releasing each version to guarantee the performance.
> >
> >With my patch [10] on latest RocksDB-6.20.3, we could get the results on
> nexmark in the original thread sent by Stephan, and we can see the
> performance behaves closely in many real-world cases. And we also hope new
> features, such as direct buffer supporting [14] in RocksJava 

Exploring Flink for a HTTP delivery service.

2021-08-14 Thread Prasanna kumar
Hi,

Aim: Building an event delivery
service
Scale : Peak load 50k messages/sec.
Average load 5k messages/sec Expected to grow every passing month
Unique Customer Endpoints : 10k+
Unique events(kafka topics)  : 500+
Unique tenants  : 30k+
Subscription Level : Events are generated for tenants.
Customers may subscribe a)
entirely to an event or b) either at tenant level ( 5 tenants or 100
tenants) or c) even at sub-tenant level. ( Tenant 2. Dept 100,200,300)
*Other Requirements *:
1) Batching events based on quantity or minimum threshold time whichever
comes first . Example 1000 messages or 1 sec.
2) Message size < 1kb

*Possible Solutions:*

1) Build an app using reactive programming say vert.x/spring reactive etc
2) Use apache flink

*Flink Solution *
RDS : Has the subscription connection details

[image: Flink HTTP Publisher.png]

2a ) Use DB and Look UP Cache to retrieve Configurations

(i)   Stream the data from kafka
(ii)  For every message flowing in , query RDS(postgres) ,get the
connection/subscription details, and apply filters. [Use lookup Cache to
improve performance]
(iii a)  if it's a streaming customer , form the message with appropriate
authentication details.
(iii b) if it's a batch customer, push the message to the state backend.
Once maximum message or minimum threshold batch time is reached, retrieve
the messages and form a single batch message with appropriate
authentication details.
(iv) Send message and endpoint info to async sink. which delivers to
customers. In case of failure write to a dead letter queue where customers
can poll later.,


2b ) Load Configurations to BroadCastState and Update it in a regular
interval

(i) Stream the data from kafka
(iI) Periodically query the PROXY API (on top of RDS) to get the latest
added/updated subscription/connection details .
(iii) For every message flowing in from kafka , Check against the
broadcasted configuration to find the customers subscribed for the event,
their filter requirement and connection details.
(iv a)  if it's a streaming customer , form the message with appropriate
authentication details.
(v b) if it's a batch customer, push the message to the state backend. Once
maximum message or minimum threshold batch time is reached, retrieve
the messages and form a single batch message with appropriate
authentication details.
(vi) Send message and endpoint info to async sink. which delivers to
customers. In case of failure write to a dead letter queue where customers
can poll later.

*Questions : *
1) Batching is an aggregation operation.But what I have seen in the
examples of windowing is that they get the count/max/min operation in the
particular window.  So could the batching be implemented via a windowing
mechanism ?

2) Is it a good design to have both batched delivery and per-event delivery
in the same job or should it be different ?

2) Does the performance of broadcasted state better than LookUp Cache?
(Personally i have implemented broadcasted state for other purpose and not
sure about the performance of Querying DB+LookUpCache)

3) I read this
" The working copy of broadcast state is always on the heap; not in
RocksDB. So, it has to be small enough to fit in memory. Furthermore, each
instance will copy all of the broadcast state into its checkpoints, so all
checkpoints and savepoints will have *n* copies of the broadcast state
(where *n* is the parallelism).
If you are able to key partition this data, then you might not need to
broadcast it. It sounds like it might be per-employee data that could be
keyed by the employeeId. But if not, then you'll have to keep it small
enough to fit into memory. "

Using Keyed Process BroadCast looks Better than using non keyed as the same
data is not replicated against all the parallel operators.
A caveat here is that the load across all subscriptions are not the same .
So if we key the stream , then we might have unbalanced job running.
Thoughts on this ?

4) Latency must be minimal , so the first thought is to store the
messages to be batched in HashMapStateBackend.
But to store both the State of config and the data in HEAP might increase
the memory usage a lot. Since there could be a huge spike in load.Are
there any other things that need to be factored in ?

5) Auto Scaling capability would save a lot of cost because of consistent
load patterns with occasional spikes. Though reactive scaling is introduced
in flink 1.13 , we don't know whether its battle hardened .

6) After looking at the solutions , does flink seem to be a natural fit for
this use case in comparison to Spring Reactor framework/vert.x ?
One thing we see from the documentation is that spring reactive can auto
scale very well but we need to work on fault tolerance/stability from the
dev side which flink is good at.

Spring reactor is new to us and we wanted to 

Running Beam on a native Kubernetes Flink cluster

2021-08-14 Thread Gorjan Todorovski
Hi!

I need help implementing a native Kubernetes Flink cluster that needs to
run batch jobs (run by TensorFlow Extended), but I am not sure I am
configuring it right as I have issues running jobs on more than one task
manager, while jobs run fine if there is only one TM.

I use the following parameters for the job:

"--runner=FlinkRunner",
"--parallelism=4",
f"--flink_master={flink_url}:8081",
"--environment_type=EXTERNAL",
f"--environment_config={beam_sdk_url}:5",
"--flink_submit_uber_jar",
"--worker_harness_container_image=none",


I have configured the Beam workers to run as side-cars to the TM
containers. I do this by configuring. task manager template for the pods
like this:

kubernetes.pod-template-file.taskmanager

it is pointing out to a template file with contents:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
 #hostNetwork: true
 containers:
  - name: flink-main-container
#image: apache/flink:scala_2.12
env:
  - name: AWS_REGION
value: "eu-central-1"
  - name: S3_VERIFY_SSL
value: "0"
  - name: PYTHONPATH
value: "/data/flink/src"
args: ["taskmanager"]
ports:
- containerPort: 6122 #22
  name: rpc
- containerPort: 6125
  name: query-state
livenessProbe:
  tcpSocket:
port: 6122 #22
  initialDelaySeconds: 30
  periodSeconds: 60
  - name: beam-worker-pool
env:
  - name: PYTHONPATH
value: "/data/flink/src"
  - name: AWS_REGION
value: "eu-central-1"
  - name: S3_VERIFY_SSL
value: "0"
image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
imagePullPolicy: Always
args: ["--worker_pool"]
ports:
- containerPort: 5
  name: pool
livenessProbe:
  tcpSocket:
port: 5
  initialDelaySeconds: 30
  periodSeconds: 60

I have also created a kubernetes load balancer for the task managers, so
clients can connect on port 5. So I use that address when configuring:

f"--environment_config={beam_sdk_url}:5",

the problem is as it looks like the Beam SDK harness on one task manager
wants to connect to the endpoint running on the other task manager, but
looks for it on localhost:

Log from beam-worker-pool on TM 2:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed
to dial server at localhost:33705
caused by:
context deadline exceeded

The provision endpoint on TM 1 is the one actually listening on the port
33705, while this is looking for it on localhost, so cannot connect to it.

Showing how I test this:

...

TM 1:

$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=5
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:33383',
'--artifact_endpoint=localhost:43477',
'--provision_endpoint=localhost:40983',
'--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed
to dial server at localhost:40983
caused by:
context deadline exceeded

TM 2:
=
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=5
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:40497',
'--artifact_endpoint=localhost:36245',
'--provision_endpoint=localhost:32907',
'--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed
to dial server at localhost:32907
caused by:
context deadline exceeded

Testing:
.

TM 1:

$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c
beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

Not sure how to fix this.

Thanks, Gorjan


Problems with reading ORC files with S3 filesystem

2021-08-14 Thread Piotr Jagielski
Hi,
I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 
1.13. My table definition looks like this:

create or replace table xxx 
 (..., startdate string)
 partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 
'path'='s3://xxx/orc/yyy')

I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 
provider and it works for Flinks checkpoints and HA files. 
The SQL connector also works when I use CSV or Avro formats. The problems start 
with ORC

1. If I just put flink-orc on job's classpath I get error on JobManager:
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
 ~[?:?]
at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
 ~[?:?]
at 
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
 ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]

2. I managed to put hadoop common libs on the classpath by this maven setup:


org.apache.flink

flink-orc_${scala.binary.version}
${flink.version}


org.apache.orc
orc-core




org.apache.orc
orc-core
1.5.6


org.apache.orc
orc-shims
1.5.6


net.java.dev.jets3t
jets3t
0.9.0


No the job is accepted by JobManager, but execution fails with lack of AWS 
credentials:
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a s3 
URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively).
at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy76.initialize(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368)
at org.apache.orc.OrcFile.createReader(OrcFile.java:343)

I guess that ORC reader tries to recreate s3 filesystem in job's classloader 
and cannot use credentials from flink-conf.yaml. However I can see in the logs 
that it earlier managed to list the files on MinIO:

2021-08-14 09:35:48,285 INFO  
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
Assigning remote split to requesting host '172': Optional[FileSourceSplit: 
s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc 
[0, 144607)  hosts=[localhost] ID=02 position=null]


So I think the issue is in ORCReader when it tries to read specific file.

Any ideas hao can I modify the setup or pass the credentials to Jets3t?

Regards,
Piotr



Fwd: PyFlink performance and deployment issues

2021-08-14 Thread Wouter Zorgdrager
Hi all,

I'm still dealing with the PyFlink deployment issue as described below. I
see that I accidentally didn't forward it to the mailing list. Anyways, my
job is stuck in `Initializing` and the logs don't really give me a clue
what is going on.
In my IDE it runs fine. The command I use to submit to the cluster:

export
PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python

./flink run \
  --target remote \
  -m localhost:8081 \
  -pyarch venv.zip \
  --pyExecutable venv.zip/venv/bin/python \
  --parallelism 1 \
  --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
  --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar

I hope someone can help me with this because it is a blocker for me.

Thanks in advance,
Wouter
-- Forwarded message -
From: Wouter Zorgdrager 
Date: Thu, 8 Jul 2021 at 12:20
Subject: Re: PyFlink performance and deployment issues
To: Xingbo Huang 


HI Xingbo, all,

Regarding point 2, I actually made a mistake there. I picked port 8081
(WebUI port) rather than the job submission port (--target remote -m
localhost:8081). For some reason, this does not give an error or warning
and just starts a local cluster. However, now I got another issue: my job
is stuck at initialization. Here an excerpt from the JM log:

2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0)
with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with
attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client ->
Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt
#0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77
@ wouter (dataPort=40987) with allocation id
d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - _stream_key_by_map_operator (1/1)
(ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> (
Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-
User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from
DEPLOYING to INITIALIZING.

I run with parallelism 1 and these are the latest loglines from the TM
(there is no obvious error):
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:18,791 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Error while loading kafka-version.properties: inStream parameter is
null
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka version: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka commitId: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka startTimeMs: 1625739138789
2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to
produce into default topic client_reply
2021-07-08 12:12:18,815 INFO org.apache.flink.streaming.api.operators.