Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi,

1) By default, Flink's Kafka connector is polling data from Kafka every
100ms. There's a configuration key "flink.poll-timeout" to change the
frequency. I don't have experience with these internal log messages from
Kafka, but since they are on INFO level (and if you don't see any
unexpected data), I would ignore them for now.

2) The slots are not reserving memory. A slot is basically a thread running
on the TaskManager. But you can't enforce the amount of memory available to
a thread, thus all slots share the pool of available memory of the
TaskManager.
If you want to run multiple low throughput pipelines on Flink, it is not a
problem to oversubscribe your TaskManagers. For a machine with say 8 cores
and 16 Gb of memory, you could configure 100, or even 500 slots, if they
are not very resource intensive.

With StateFun, you can have millions of actors on a TaskManager. If they
are not receiving any data, they won't allocate resources.

Best,
Robert


On Tue, Dec 17, 2019 at 11:37 AM Andrés Garagiola 
wrote:

> Thanks Roberts,
>
>
> About your questions, I don't have yet a real estimation regarding the
> number of records received by the pipeline but I guess that the pipeline
> could be idle for several minutes (I don't think that for hours).
>
>
> My concern comes to me from two aspects:
>
>
> 1) I saw multiple lines in the Flink task manager logs like the ones
> listed below. Sounds like if the pipeline is doing polling over the Kafka
> topic source, I don't know if I can control this behavior in some way to
> reduce the CPU consumption when I can tolerate some latency.
>
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,722 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
>
> 2) I read that every slot reserves a portion of the task manager's memory,
> so I would like to reuse that memory between multiple pipelines (again in
> the context where some latency is allowed). I understand that this is not
> possible in the current state of Flink but would be possible by avoiding
> the direct map with Statefun, isn't it?
>
>
> Thanks again for your reply.
>
> Regards
>
> On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger 
> wrote:
>
>> Hi Andrés,
>>
>> sorry for the late reply.
>> 1. The slots are released, when the streaming pipeline ends. In
>> principle, it is not a problem when a slot is allocated, even when not
>> processing any incoming messages. So you are not doing something wrong. How
>> many records do you receive per pipeline? (are they idle for multiple
>> hours?)
>> There's a way to utilize the slots more efficiently: https://statefun.io/ 
>> Statefun
>> will be contributed to Flink soon.
>> StateFun doesn't have a direct slots to pipeline mapping.
>>
>> 2. The memory consumption per slot greatly depends on what kind of
>> operator you are running in it. A heap statebackend might need a few
>> gigabytes, a stateless mapper needs almost no memory. Some time ago, I
>> wrote a blog post on sizing a Flink cluster:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>>
>> Best,
>> Robert
>>
>>
>> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I'm testing Flink to do stream processing, in my use case there are
>>> multiples pipelines processing messages from multiple Kafka sources. I have
>>> some questions regarding the jobs and slots.
>>>
>>> 1) When I deploy a new job, it takes a job slot in the TM, the job never
>>> ends (I think it doesn't end because is a stream pipeline), and the slot is
>>> never released, this means that the slot is busy even when no new messages
>>> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
>>> Is there a way to do a more efficient utilization of the job slots?
>>>
>>> 2) In my use case, I need good job scalability. Potentially I could have
>>> 

Re: Flink slot utilization

2019-12-17 Thread Andrés Garagiola
Thanks Roberts,


About your questions, I don't have yet a real estimation regarding the
number of records received by the pipeline but I guess that the pipeline
could be idle for several minutes (I don't think that for hours).


My concern comes to me from two aspects:


1) I saw multiple lines in the Flink task manager logs like the ones listed
below. Sounds like if the pipeline is doing polling over the Kafka topic
source, I don't know if I can control this behavior in some way to reduce
the CPU consumption when I can tolerate some latency.


*2019-12-17 05:25:56,720 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*

*2019-12-17 05:25:56,720 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Seeking to LATEST offset of partition test-topic-0*

*2019-12-17 05:25:56,721 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*

*2019-12-17 05:25:56,721 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Seeking to LATEST offset of partition test-topic-0*

*2019-12-17 05:25:56,722 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*


2) I read that every slot reserves a portion of the task manager's memory,
so I would like to reuse that memory between multiple pipelines (again in
the context where some latency is allowed). I understand that this is not
possible in the current state of Flink but would be possible by avoiding
the direct map with Statefun, isn't it?


Thanks again for your reply.

Regards

On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger  wrote:

> Hi Andrés,
>
> sorry for the late reply.
> 1. The slots are released, when the streaming pipeline ends. In principle,
> it is not a problem when a slot is allocated, even when not processing any
> incoming messages. So you are not doing something wrong. How many records
> do you receive per pipeline? (are they idle for multiple hours?)
> There's a way to utilize the slots more efficiently: https://statefun.io/ 
> Statefun
> will be contributed to Flink soon.
> StateFun doesn't have a direct slots to pipeline mapping.
>
> 2. The memory consumption per slot greatly depends on what kind of
> operator you are running in it. A heap statebackend might need a few
> gigabytes, a stateless mapper needs almost no memory. Some time ago, I
> wrote a blog post on sizing a Flink cluster:
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>
> Best,
> Robert
>
>
> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola <
> andresgaragi...@gmail.com> wrote:
>
>> Hi
>>
>> I'm testing Flink to do stream processing, in my use case there are
>> multiples pipelines processing messages from multiple Kafka sources. I have
>> some questions regarding the jobs and slots.
>>
>> 1) When I deploy a new job, it takes a job slot in the TM, the job never
>> ends (I think it doesn't end because is a stream pipeline), and the slot is
>> never released, this means that the slot is busy even when no new messages
>> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
>> Is there a way to do a more efficient utilization of the job slots?
>>
>> 2) In my use case, I need good job scalability. Potentially I could have
>> many pipelines running in the Flink environment, but on the other hand,
>> increase latency would not be a serious problem for me. There are some
>> recommendations regarding memory for slot? I saw that the CPU
>> recommendation is a core per slot, taking into account that increase the
>> latency would not be a big problem, do you see another good reason to
>> follow this recommendation?
>>
>> Thank you
>> Regards
>>
>


Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi Andrés,

sorry for the late reply.
1. The slots are released, when the streaming pipeline ends. In principle,
it is not a problem when a slot is allocated, even when not processing any
incoming messages. So you are not doing something wrong. How many records
do you receive per pipeline? (are they idle for multiple hours?)
There's a way to utilize the slots more efficiently:
https://statefun.io/ Statefun
will be contributed to Flink soon.
StateFun doesn't have a direct slots to pipeline mapping.

2. The memory consumption per slot greatly depends on what kind of operator
you are running in it. A heap statebackend might need a few gigabytes, a
stateless mapper needs almost no memory. Some time ago, I wrote a blog post
on sizing a Flink cluster:
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

Best,
Robert


On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola 
wrote:

> Hi
>
> I'm testing Flink to do stream processing, in my use case there are
> multiples pipelines processing messages from multiple Kafka sources. I have
> some questions regarding the jobs and slots.
>
> 1) When I deploy a new job, it takes a job slot in the TM, the job never
> ends (I think it doesn't end because is a stream pipeline), and the slot is
> never released, this means that the slot is busy even when no new messages
> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
> Is there a way to do a more efficient utilization of the job slots?
>
> 2) In my use case, I need good job scalability. Potentially I could have
> many pipelines running in the Flink environment, but on the other hand,
> increase latency would not be a serious problem for me. There are some
> recommendations regarding memory for slot? I saw that the CPU
> recommendation is a core per slot, taking into account that increase the
> latency would not be a big problem, do you see another good reason to
> follow this recommendation?
>
> Thank you
> Regards
>


Flink slot utilization

2019-12-13 Thread Andrés Garagiola
Hi

I'm testing Flink to do stream processing, in my use case there are
multiples pipelines processing messages from multiple Kafka sources. I have
some questions regarding the jobs and slots.

1) When I deploy a new job, it takes a job slot in the TM, the job never
ends (I think it doesn't end because is a stream pipeline), and the slot is
never released, this means that the slot is busy even when no new messages
are coming from the Kafka topic. Is that OK or I'm doing something wrong?
Is there a way to do a more efficient utilization of the job slots?

2) In my use case, I need good job scalability. Potentially I could have
many pipelines running in the Flink environment, but on the other hand,
increase latency would not be a serious problem for me. There are some
recommendations regarding memory for slot? I saw that the CPU
recommendation is a core per slot, taking into account that increase the
latency would not be a big problem, do you see another good reason to
follow this recommendation?

Thank you
Regards