Re: Deployment/Memory Configuration/Scalability

2021-04-26 Thread Radoslav Smilyanov
Hi Yangze Guo,

Thanks for your reply.

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.


I forgot to add the checkpoint configuration since it's part of a custom
job configuration which is mounted in each pod. So checkpoints are enabled.
:)
That's why savepoint is triggered on a daily basis since the existing
deployment setup has a single Job Manager.
I will take a look at k8s or Zookeeper HA options.

> 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.


Checkpoints are already enabled (once per every 10 minutes). Once HA is
setuped correctly I think that savepoints can be used only when the job
needs to be updated.

> 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].


Currently I am using parallelism which is equal to the highest number of
kafka topic partitions. Unfortunately some of the topics have higher load
compared to others and thus some of them are having 1 partition while
others are having 4 partitions (for example).

Thanks,
Rado

On Tue, Apr 27, 2021 at 7:50 AM Yangze Guo  wrote:

> Hi, Radoslav,
>
> > 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
>
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.
>
> > 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
>
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.
>
> > 4. Should I consider upgrading to version 1.12.3?
> > 5. Should I consider switching off state.backend.rocksdb.memory.managed
> property even in version 1.12.3?
>
> I'm not an expert on the state backend, but it seems the fix of that
> issue is only applied to the docker image. So I guess you can package
> a custom image yourselves if you do not want to upgrade. However, if
> you are using the Native K8S mode[3] and there is no compatibility
> issue, I think it might be good to upgrading because there are also
> lots of improvements[4] in 1.12.
>
> > 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
>
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
> [4] https://issues.apache.org/jira/browse/FLINK-17709
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io
>
> Best,
> Yangze Guo
>

Deployment/Memory Configuration/Scalability

2021-04-26 Thread Radoslav Smilyanov
Hi all,

I am having multiple questions regarding Flink :) Let me give you some
background of what I have done so far.

*Description*
I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed
from 6 different kafka topics and it is joined via multiple
CoProcessFunctions. On a daily basis the job is handling ~20 millions
events from the source kafka topics.

*Configuration*
These are the settings I am using:

jobmanager.memory.process.size: 4096m
jobmanager.memory.off-heap.size: 512m
taskmanager.memory.process.size: 12000m
taskmanager.memory.task.off-heap.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 5
taskmanager.rpc.port: 6122
jobmanager.execution.failover-strategy: region
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 64mb
state.checkpoints.dir: s3://bucket/checkpoints
state.savepoints.dir: s3://bucket/savepoints
s3.access-key: AWS_ACCESS_KEY_ID
s3.secret-key: AWS_SECRET_ACCESS_KEY
s3.endpoint: http://
s3.path.style.access: true
s3.entropy.key: _entropy_
s3.entropy.length: 8
presto.s3.socket-timeout: 10m
client.timeout: 60min

*Deployment setup*
Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task
managers. I have a daily cron job which triggers savepoint in order to have
a fresh copy of the whole state.

*Problems with the existing setup*
1. I observe that savepoints are causing Flink to consume more than the
allowed memory. I observe the behavior described in this stackoverflow post

(which
seems to be solved in 1.12.X if I am getting it right).
2. I cannot achieve high availability with Per-Job mode and thus I ended up
having a regular savepoint on a daily basis.

*Questions*
1. Is it a good idea to have regular savepoints (say on a daily basis)?
2. Is it possible to have high availability with Per-Job mode? Or maybe I
should go with session mode and make sure that my flink cluster is running
a single job?
3. Let's assume that savepoints should be triggered only before job
update/deployment. How can I trigger a savepoint if my job is already
consuming more than 80% of the allowed memory per pod in k8s? My
observations show that k8s kills task managers (which are running as pods)
and I need to retry it a couple of times.
4. Should I consider upgrading to version 1.12.3?
5. Should I consider switching off state.backend.rocksdb.memory.managed
property even in version 1.12.3?
6. How do I decide when the job parallelism should be increased? Are there
some metrics which can lead me to a clue that the parallelism should be
increased?

Best Regards,
Rado


savepoint failure

2020-10-21 Thread Radoslav Smilyanov
Hello all,

I am running a Flink job that performs data enrichment. My job has 7 kafka
consumers that receive messages for dml statements performed for 7 db
tables.

Job setup:

   - Flink is run in k8s in a similar way as it is described here
   

   .
   - 1 job manager and 2 task managers
   - parallelism is set to 4 and 2 task slots
   - rocksdb as state backend
   - protobuf for serialization

Whenever I try to trigger a savepoint after my state is bootstrapped I get
the following error for different operators:

Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)

Note: key group might vary.

I found this

article
in Stackoverflow which relates to such an exception (btw my job graph looks
similar to the one described in the article except that my job has more
joins). I double checked my hashcodes and I think that they are fine.

I tried to reduce the parallelism to 1 with 1 task slot per task manager
and this configuration seems to work. This leads me to a direction that it
might be some concurrency issue.

I would like to understand what is causing the savepoint failure. Do you
have any suggestions what I might be missing?

Thanks in advance!

Best Regards,
Rado