Re: Parallelism under reactive scaling with slot sharing groups

2023-07-31 Thread Allen Wang
We were using Flink 1.16. I did some research and it looks like it is fixed
<https://issues.apache.org/jira/browse/FLINK-30895> in Flink 1.17.



On Thu, Jul 27, 2023 at 9:43 PM Weihua Hu  wrote:

> Hi,
>
> Which Flink version are you using? I haven't reproduced this issue by
> using the master branch.
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 25, 2023 at 2:56 AM Allen Wang  wrote:
>
>> Hello,
>>
>> Our job has operators of source -> sink -> global committer. We have
>> created two slot sharing groups, one for source and sink and one for global
>> committer. The global committer has specified max parallelism of 1. No max
>> parallelism set with the source/sink while there is a system level default
>> parallelism set to be the total number of task slots.
>>
>> After we enabled reactive mode scaling, we found that the parallelism for
>> source/sink calculated by the adaptive scheduler is only half of the total
>> task slots. For example, if we have 8 slots, the derived parallelism is 4
>> meaning source/sink would use only 4 slots, and the global committer uses 1
>> slot. So we have 3 idle slots which is a waste.
>>
>> Any ideas how we can avoid those idle slots in this setting?
>>
>> Thanks,
>> Allen
>>
>>
>>
>>


Parallelism under reactive scaling with slot sharing groups

2023-07-24 Thread Allen Wang
Hello,

Our job has operators of source -> sink -> global committer. We have
created two slot sharing groups, one for source and sink and one for global
committer. The global committer has specified max parallelism of 1. No max
parallelism set with the source/sink while there is a system level default
parallelism set to be the total number of task slots.

After we enabled reactive mode scaling, we found that the parallelism for
source/sink calculated by the adaptive scheduler is only half of the total
task slots. For example, if we have 8 slots, the derived parallelism is 4
meaning source/sink would use only 4 slots, and the global committer uses 1
slot. So we have 3 idle slots which is a waste.

Any ideas how we can avoid those idle slots in this setting?

Thanks,
Allen


Re: Flink task lifecycle listener/hook/SPI

2022-08-04 Thread Allen Zoo
Thanks a lot!

In our scenario, doing init in open function or  at static block is not
good as excepted.
1. it is too late, we expect the init will happen in a task init stage,
means init it even before the open was called method.
2. it is not reusable or not convenient for end user, we have manny
functions(eg. RichFunctions/SourceFunctions and so on), and we need add
init code in every function.

We want the init code should at level of Task, in this way there is no need
to repeat the initialization of whatever functions is running in task.
I browsed the Flink's source code of version 1.15.1, and I didn't find a
relevant interface that meet our needs at startup
process(TaskExecutor#submitTask,Task#doRun)。
I think Flink has no similar interface available so far , can anyone help
to confirm?

Best,
Allen

On Thu, Aug 4, 2022, 12:15 PM Lijie Wang  wrote:

> Hi Allen,
> From my experience, you can do your init setup by the following 2 ways:
>
> 1. Do your init setup in RichFunction#open method, see [1] for details.
> 2. Do your init setup in static block, it will be executed when the class
> is loaded.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/#rich-functions
>
> Best,
> Lijie
>
> Allen Zoo  于2022年8月2日周二 16:11写道:
>
>> Hi all,
>> We went to do some init env setup before the flink task run, And we have
>> noticed the Task Lifecycle | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/internals/task_lifecycle/>
>>  doc
>> described, but we can't find  listener/hook/SPI interface do some custom
>> init jobs before task  running. Does flink now have relevant interfaces ?
>>
>


Flink task lifecycle listener/hook/SPI

2022-08-02 Thread Allen Zoo
Hi all,
We went to do some init env setup before the flink task run, And we have
noticed the Task Lifecycle | Apache Flink

doc
described, but we can't find  listener/hook/SPI interface do some custom
init jobs before task  running. Does flink now have relevant interfaces ?


How to clean up RocksDB local directory in K8s statefulset

2022-06-27 Thread Allen Wang
Hi Folks,

We created a stateful job using SessionWindow and RocksDB state backend and
deployed it on Kubernetes Statefulset with persisted volumes. The Flink
version we used is 1.14.

After the job runs for some time, we observed that the size of the local
RocksDB directory started to grow and there are more and more
directories created inside it. It seems that when the job is restarted or
the task manager K8s pod is restarted, the previous RocksDB directory
corresponding to the assigned operator is not cleaned up. Here is an
example:

drwxr-xr-x 3 root root 4096 Jun 27 18:23
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_c97f3f3f-649a-467d-82af-2bc250ec6e22
drwxr-xr-x 3 root root 4096 Jun 27 18:45
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_e4fca2c3-74c7-4aa2-9ca1-dda866b8de11
drwxr-xr-x 3 root root 4096 Jun 27 18:56
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
drwxr-xr-x 3 root root 4096 Jun 27 17:34
job__op_WindowOperator_f6dc7f4d2283f4605b127b9364e21148__3_4__uuid_08a14423-bea1-44ce-96ee-360a516d72a6

Although only
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
is the active running operator, the other directories for the past
operators still exist.

We set up the task manager property taskmanager.resource-id to be the task
manager pod name under the statefulset but it did not seem to help cleaning
up previous directories.

Any pointers to solve this issue?

We checked the latest document and it seems that Flink 1.15 introduced the
concept of local working directory:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/working_directory/.
Does that help cleaning up the RocksDB directory?

Thanks,
Allen


StatsD metric name prefix change for task manager after upgrading to Flink 1.11

2020-10-14 Thread Allen Wang
Hello,

We noticed that after upgrading to Flink 1.11, the StatsD metric prefix is
changed from the hostname to IP address of the task manager.

The Flink job runs in a k8s cluster.

Here is an example of metric reported to StatsD in Flink 1.10:

flink-ingest-cx-home-page-feed-flink-task-manager-7f8c7677l85pl.taskmanager.16c2dbc84eb27f336455615e642c6cdd.flink-ingest-cx-home-page-feed.Source-
Custom Source.1.assigned-partitions:3.0|g

Here is an example of metric reported to StatsD in Flink 1.11:

10.4.155.205.taskmanager.0a900ab762d7d534ea8b20e84438166b.flink-ingest-xp-xp.Source-
Custom Source.0.assigned-partitions:3.0|g

This caused a problem for us as StatsD interprets the segment before
the first dot as the source. So after upgrading to 1.11, the task
manager metrics all have "10" as the source.

Is there any configuration to change this behavior back to the 1.10 version
where the prefix of the metric is the host name?

Thanks,
Allen