Hi Experts,
I have my flink application running on Kubernetes, initially with 1 Job
Manager, and 2 Task Managers.

Then we have the custom operator that watches for the CRD, when the CRD
replicas changed, it will patch the Flink Job Manager deployment
parallelism and max parallelism according to the replicas from CRD
(parallelism can be configured via env variables for our application).
which causes the job manager restart. hence a new Flink job. But the
consumer group does not change, so it will continue from the offset
where it left.

In addition, operator will also update Task Manager's deployment replicas,
and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but
new task manager pods will be created.

And we observed a skew in the partition offset consumed. e.g. some
partitions have huge lags and other partitions have small lags. (observed
from burrow)

This is also validated by the metrics from Flink UI, showing the throughput
differs for slotss

Any clue why this is the case?

Thanks a lot!
