Re: Caching

2020-11-26 Thread Prasanna kumar
Navneeth,

Thanks for posting this question.

This looks like our future scenario where we might end up with.

We are working on a Similar problem statement with two differences.

1) The cache items would not change frequently say max of once per month or
few times per year and the number of entities in cache would not be more
than 1000. (Say Java objects)

2) The Eventload we look at is around 10-50k/sec.

We are using broadcast mechanism for the same.

Prasanna.

On Thu 26 Nov, 2020, 14:01 Navneeth Krishnan, 
wrote:

> Hi All,
>
> We have a flink streaming job processing around 200k events per second.
> The job requires a lot of less frequently changing data (sort of static but
> there will be some changes over time, say 5% change once per day or so).
> There are about 12 caches with some containing approximately 20k
> entries whereas a few with about 2 million entries.
>
> In the current implementation we are using in-memory lazy loading static
> cache to populate the data and the initialization happens in open function.
> The reason to choose this approach is because we have allocated around 4GB
> extra memory per TM for these caches and if a TM has 6 slots the cache can
> be shared.
>
> Now the issue we have with this approach is everytime when a container is
> restarted or a new job is deployed it has to populate the cache again.
> Sometimes this lazy loading takes a while and it causes back pressure as
> well. We were thinking to move this logic to the broadcast stream but since
> the data has to be stored per slot it would increase the memory consumption
> by a lot.
>
> Another option that we were thinking of is to replace the current near far
> cache that uses rest api to load the data to redis based near far cache.
> This will definitely reduce the overall loading time but still not the
> perfect solution.
>
> Are there any recommendations on how this can be achieved effectively?
> Also how is everyone overcoming this problem?
>
> Thanks,
> Navneeth
>
>


Re: Caching

2020-11-26 Thread Dongwon Kim
Hi Navneeth,

I reported a similar issue to yours before [1] but I took the broadcasting
approach at first.

As you already anticipated, broadcasting is going to use more memory than
your current approach based on a static object on each TM .

And the broadcasted data will be treated as operator state and will be
periodically checkpointed with serialization overhead & garbage collections.
These are not negligible at all if you're not carefully choosing
serialization strategy as explained in [2].
Even with the proper one, I've experienced mild back pressure whenever
- checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
do with operator states)
- cache is being broadcasted

For that reason, I decided to populate data on Redis but it also calls for
design decisions:
- which Java client to use? Jedis [3]? Lettuce [4]?
- how to invoke APIs calls inside Flink? synchronously or asynchronously?

Currently I'm very satisfied with Lettuce with Flink's async io [5] with
very small memory footprint and without worrying about serialization
overhead and garbage collections.
Lettuce supports asynchronous communication so it works perfectly with
Flink's async io.
I bet you'll be very disappointed with invoking Jedis synchronously inside
ProcessFunction.

Best,

Dongwon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
[2]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
[3] https://github.com/redis/jedis
[4] https://lettuce.io/
[5]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We have a flink streaming job processing around 200k events per second.
> The job requires a lot of less frequently changing data (sort of static but
> there will be some changes over time, say 5% change once per day or so).
> There are about 12 caches with some containing approximately 20k
> entries whereas a few with about 2 million entries.
>
> In the current implementation we are using in-memory lazy loading static
> cache to populate the data and the initialization happens in open function.
> The reason to choose this approach is because we have allocated around 4GB
> extra memory per TM for these caches and if a TM has 6 slots the cache can
> be shared.
>
> Now the issue we have with this approach is everytime when a container is
> restarted or a new job is deployed it has to populate the cache again.
> Sometimes this lazy loading takes a while and it causes back pressure as
> well. We were thinking to move this logic to the broadcast stream but since
> the data has to be stored per slot it would increase the memory consumption
> by a lot.
>
> Another option that we were thinking of is to replace the current near far
> cache that uses rest api to load the data to redis based near far cache.
> This will definitely reduce the overall loading time but still not the
> perfect solution.
>
> Are there any recommendations on how this can be achieved effectively?
> Also how is everyone overcoming this problem?
>
> Thanks,
> Navneeth
>
>


Re: Caching

2020-11-26 Thread Dongwon Kim
Oops, I forgot to mention that when doing bulk insert into Redis, you'd
better open a pipeline with a 'transaction' property set to False [1].

Otherwise, API calls from your Flink job will be timeout.

[1] https://github.com/andymccurdy/redis-py#pipelines

On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim  wrote:

> Hi Navneeth,
>
> I reported a similar issue to yours before [1] but I took the broadcasting
> approach at first.
>
> As you already anticipated, broadcasting is going to use more memory than
> your current approach based on a static object on each TM .
>
> And the broadcasted data will be treated as operator state and will be
> periodically checkpointed with serialization overhead & garbage collections.
> These are not negligible at all if you're not carefully choosing
> serialization strategy as explained in [2].
> Even with the proper one, I've experienced mild back pressure whenever
> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
> do with operator states)
> - cache is being broadcasted
>
> For that reason, I decided to populate data on Redis but it also calls for
> design decisions:
> - which Java client to use? Jedis [3]? Lettuce [4]?
> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>
> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
> very small memory footprint and without worrying about serialization
> overhead and garbage collections.
> Lettuce supports asynchronous communication so it works perfectly with
> Flink's async io.
> I bet you'll be very disappointed with invoking Jedis synchronously inside
> ProcessFunction.
>
> Best,
>
> Dongwon
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
> [2]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> [3] https://github.com/redis/jedis
> [4] https://lettuce.io/
> [5]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> We have a flink streaming job processing around 200k events per second.
>> The job requires a lot of less frequently changing data (sort of static but
>> there will be some changes over time, say 5% change once per day or so).
>> There are about 12 caches with some containing approximately 20k
>> entries whereas a few with about 2 million entries.
>>
>> In the current implementation we are using in-memory lazy loading static
>> cache to populate the data and the initialization happens in open function.
>> The reason to choose this approach is because we have allocated around 4GB
>> extra memory per TM for these caches and if a TM has 6 slots the cache can
>> be shared.
>>
>> Now the issue we have with this approach is everytime when a container is
>> restarted or a new job is deployed it has to populate the cache again.
>> Sometimes this lazy loading takes a while and it causes back pressure as
>> well. We were thinking to move this logic to the broadcast stream but since
>> the data has to be stored per slot it would increase the memory consumption
>> by a lot.
>>
>> Another option that we were thinking of is to replace the current near
>> far cache that uses rest api to load the data to redis based near far
>> cache. This will definitely reduce the overall loading time but still not
>> the perfect solution.
>>
>> Are there any recommendations on how this can be achieved effectively?
>> Also how is everyone overcoming this problem?
>>
>> Thanks,
>> Navneeth
>>
>>


Re: Caching

2020-11-26 Thread Navneeth Krishnan
Thanks Dongwon. It was extremely helpful. I didn't quite understand how
async io can be used here. It would be great if you can share some info on
it.

Also how are you propagating any changes to values?

Regards,
Navneeth

On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim  wrote:

> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
> better open a pipeline with a 'transaction' property set to False [1].
>
> Otherwise, API calls from your Flink job will be timeout.
>
> [1] https://github.com/andymccurdy/redis-py#pipelines
>
> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim 
> wrote:
>
>> Hi Navneeth,
>>
>> I reported a similar issue to yours before [1] but I took the
>> broadcasting approach at first.
>>
>> As you already anticipated, broadcasting is going to use more memory than
>> your current approach based on a static object on each TM .
>>
>> And the broadcasted data will be treated as operator state and will be
>> periodically checkpointed with serialization overhead & garbage collections.
>> These are not negligible at all if you're not carefully choosing
>> serialization strategy as explained in [2].
>> Even with the proper one, I've experienced mild back pressure whenever
>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
>> do with operator states)
>> - cache is being broadcasted
>>
>> For that reason, I decided to populate data on Redis but it also calls
>> for design decisions:
>> - which Java client to use? Jedis [3]? Lettuce [4]?
>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>
>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>> very small memory footprint and without worrying about serialization
>> overhead and garbage collections.
>> Lettuce supports asynchronous communication so it works perfectly with
>> Flink's async io.
>> I bet you'll be very disappointed with invoking Jedis synchronously
>> inside ProcessFunction.
>>
>> Best,
>>
>> Dongwon
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>> [2]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> [3] https://github.com/redis/jedis
>> [4] https://lettuce.io/
>> [5]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>
>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We have a flink streaming job processing around 200k events per second.
>>> The job requires a lot of less frequently changing data (sort of static but
>>> there will be some changes over time, say 5% change once per day or so).
>>> There are about 12 caches with some containing approximately 20k
>>> entries whereas a few with about 2 million entries.
>>>
>>> In the current implementation we are using in-memory lazy loading static
>>> cache to populate the data and the initialization happens in open function.
>>> The reason to choose this approach is because we have allocated around 4GB
>>> extra memory per TM for these caches and if a TM has 6 slots the cache can
>>> be shared.
>>>
>>> Now the issue we have with this approach is everytime when a container
>>> is restarted or a new job is deployed it has to populate the cache again.
>>> Sometimes this lazy loading takes a while and it causes back pressure as
>>> well. We were thinking to move this logic to the broadcast stream but since
>>> the data has to be stored per slot it would increase the memory consumption
>>> by a lot.
>>>
>>> Another option that we were thinking of is to replace the current near
>>> far cache that uses rest api to load the data to redis based near far
>>> cache. This will definitely reduce the overall loading time but still not
>>> the perfect solution.
>>>
>>> Are there any recommendations on how this can be achieved effectively?
>>> Also how is everyone overcoming this problem?
>>>
>>> Thanks,
>>> Navneeth
>>>
>>>


Re: Caching

2020-11-27 Thread Dongwon Kim
Hi Navneeth,

I didn't quite understand how async io can be used here. It would be great
> if you can share some info on it.

You need to add an async operator in the middle of your pipeline in order
to enrich your input data. [1] and [2] will help you.

Also how are you propagating any changes to values?

I need to maintain the mapping of road ID to various attributes of each
road, and the mapping is updated every week.
I use keys for versioning and I use Hash [3] for value in order to store a
mapping.
When a new mapping is prepared I'm uploading it using a fresh key while the
previous version is being served to Flink (via async io).
Such concurrent read/write is possible in Redis when you turn off
transaction when creating Redis client's pipeline.
When the new mapping is completely uploaded, I inform my Flink pipeline of
the new mapping via Kafka.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
[2] https://www.youtube.com/watch?v=UParyxe-2Wc
[3] https://redis.io/topics/data-types#hashes
[4] https://github.com/andymccurdy/redis-py#pipelines

Best,

Dongwon

On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan 
wrote:

> Thanks Dongwon. It was extremely helpful. I didn't quite understand how
> async io can be used here. It would be great if you can share some info on
> it.
>
> Also how are you propagating any changes to values?
>
> Regards,
> Navneeth
>
> On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim  wrote:
>
>> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
>> better open a pipeline with a 'transaction' property set to False [1].
>>
>> Otherwise, API calls from your Flink job will be timeout.
>>
>> [1] https://github.com/andymccurdy/redis-py#pipelines
>>
>> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> I reported a similar issue to yours before [1] but I took the
>>> broadcasting approach at first.
>>>
>>> As you already anticipated, broadcasting is going to use more memory
>>> than your current approach based on a static object on each TM .
>>>
>>> And the broadcasted data will be treated as operator state and will be
>>> periodically checkpointed with serialization overhead & garbage collections.
>>> These are not negligible at all if you're not carefully choosing
>>> serialization strategy as explained in [2].
>>> Even with the proper one, I've experienced mild back pressure whenever
>>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing
>>> to do with operator states)
>>> - cache is being broadcasted
>>>
>>> For that reason, I decided to populate data on Redis but it also calls
>>> for design decisions:
>>> - which Java client to use? Jedis [3]? Lettuce [4]?
>>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>>
>>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>>> very small memory footprint and without worrying about serialization
>>> overhead and garbage collections.
>>> Lettuce supports asynchronous communication so it works perfectly with
>>> Flink's async io.
>>> I bet you'll be very disappointed with invoking Jedis synchronously
>>> inside ProcessFunction.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>>> [2]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>> [3] https://github.com/redis/jedis
>>> [4] https://lettuce.io/
>>> [5]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>>
>>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>>> reachnavnee...@gmail.com> wrote:
>>>
 Hi All,

 We have a flink streaming job processing around 200k events per second.
 The job requires a lot of less frequently changing data (sort of static but
 there will be some changes over time, say 5% change once per day or so).
 There are about 12 caches with some containing approximately 20k
 entries whereas a few with about 2 million entries.

 In the current implementation we are using in-memory lazy loading
 static cache to populate the data and the initialization happens in open
 function. The reason to choose this approach is because we have allocated
 around 4GB extra memory per TM for these caches and if a TM has 6 slots the
 cache can be shared.

 Now the issue we have with this approach is everytime when a container
 is restarted or a new job is deployed it has to populate the cache again.
 Sometimes this lazy loading takes a while and it causes back pressure as
 well. We were thinking to move this logic to the broadcast stream but since
 the data has to be stored per slot it would increase the memory consumption
 by a lot.

 Another option that we were thinking of is to replace the current near
 far cache that uses rest api to load

Re: Caching Mechanism in Flink

2020-11-09 Thread Xuannan Su
 Hi Jack,

At the moment, Flink doesn't support caching the intermediate result.
However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And it is
planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos


Re: Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I set 
taskmanager.memory.task.off-heap.size then which data does Flink 
allocate off-heap? This is handle by the programmer?


Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the intermediate result. 
However, there is some ongoing effort to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API. And 
it is planned for 1.13.


Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis >, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos



Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will contribute to
the overall memory reserved for a TaskManager container during deployment.
This parameter can be used to influence the amount of memory allocated if
the user code relies on DirectByteBuffers and/or native memory allocation.
There is no active memory pool management beyond that from Flink's side.
The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
network buffers) and native memory (through Flink's internally used managed
memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I
hope that helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
wrote:

> Thank you Xuannan for the reply.
>
> Also I want to ask about how Flink uses the off-heap memory. If I set
> taskmanager.memory.task.off-heap.size then which data does Flink allocate
> off-heap? This is handle by the programmer?
>
> Best,
> Iacovos
> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>
> Hi Jack,
>
> At the moment, Flink doesn't support caching the intermediate result.
> However, there is some ongoing effort to support caching in Flink.
> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
> is planned for 1.13.
>
> Best,
> Xuannan
>
> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
> wrote:
>
> Hello all,
>
> I am new to Flink and I want to ask if the Flink supports a caching
> mechanism to store intermediate results in memory for machine learning
> workloads.
>
> If yes, how can I enable it and how can I use it?
>
> Thank you,
> Iacovos
>
>


Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Thank you for your reply and useful information. I find that the 
off-heap is used when Flink uses HybridMemorySegments. Well, how the 
Flink knows when to use these HybridMemorySegments and in which 
operations this is happened?


Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up 
TaskManager containers in a clustered environment. It will contribute 
to the overall memory reserved for a TaskManager container during 
deployment. This parameter can be used to influence the amount of 
memory allocated if the user code relies on DirectByteBuffers and/or 
native memory allocation. There is no active memory pool management 
beyond that from Flink's side. The configuration parameter is ignored 
if you run a Flink cluster locally.


Besides this, Flink also utilizes the JVM's using DirectByteBuffers 
(for network buffers) and native memory (through Flink's internally 
used managed memory) internally.


You can find a more detailed description of Flink's memory model in 
[1]. I hope that helps.


Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis > wrote:


Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I
set taskmanager.memory.task.off-heap.size then which data does
Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the intermediate
result. However, there is some ongoing effort to support caching
in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API.
And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine
learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
When talking about the "off-heap" in your most recent message, are you
still referring to the task's off-heap configuration value? AFAIK,
the HybridMemorySegment shouldn't be directly related to the off-heap
parameter.

The HybridMemorySegment can be used as a wrapper around any kind of
memory, i.e. byte[]. It can be either used for heap memory but also
DirectByteBuffers (located in JVM's direct memory pool which is not part of
the JVM's heap) or memory allocated through Unsafe's allocation methods
(so-called native memory which is also not part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. The
MemoryManager instances are responsible for maintaining the managed memory
used in each of the TaskSlots. Managed Memory is used in different settings
(e.g. for the RocksDB state backend in streaming applications). It can be
configured using taskmanager.memory.managed.size (or the corresponding
*.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory

On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Thank you for your reply and useful information. I find that the off-heap
> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
> when to use these HybridMemorySegments and in which operations this is
> happened?
>
> Best,
> Iacovos
> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>
> Hi Iacovos,
> The task's off-heap configuration value is used when spinning up
> TaskManager containers in a clustered environment. It will contribute to
> the overall memory reserved for a TaskManager container during deployment.
> This parameter can be used to influence the amount of memory allocated if
> the user code relies on DirectByteBuffers and/or native memory allocation.
> There is no active memory pool management beyond that from Flink's side.
> The configuration parameter is ignored if you run a Flink cluster locally.
>
> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
> network buffers) and native memory (through Flink's internally used managed
> memory) internally.
>
> You can find a more detailed description of Flink's memory model in [1]. I
> hope that helps.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>
> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
> wrote:
>
>> Thank you Xuannan for the reply.
>>
>> Also I want to ask about how Flink uses the off-heap memory. If I set
>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>> off-heap? This is handle by the programmer?
>>
>> Best,
>> Iacovos
>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>
>> Hi Jack,
>>
>> At the moment, Flink doesn't support caching the intermediate result.
>> However, there is some ongoing effort to support caching in Flink.
>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>> is planned for 1.13.
>>
>> Best,
>> Xuannan
>>
>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
>> wrote:
>>
>> Hello all,
>>
>> I am new to Flink and I want to ask if the Flink supports a caching
>> mechanism to store intermediate results in memory for machine learning
>> workloads.
>>
>> If yes, how can I enable it and how can I use it?
>>
>> Thank you,
>> Iacovos
>>
>>


Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value.

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
When talking about the "off-heap" in your most recent message, are you 
still referring to the task's off-heap configuration value?
AFAIK, the HybridMemorySegment shouldn't be directly related to the 
off-heap parameter.


The HybridMemorySegment can be used as a wrapper around any kind of 
memory, i.e. byte[]. It can be either used for heap memory but also 
DirectByteBuffers (located in JVM's direct memory pool which is not 
part of the JVM's heap) or memory allocated through 
Unsafe's allocation methods (so-called native memory which is also not 
part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. 
The MemoryManager instances are responsible for maintaining the 
managed memory used in each of the TaskSlots. Managed Memory is used 
in different settings (e.g. for the RocksDB state backend in streaming 
applications). It can be configured using 
taskmanager.memory.managed.size (or the corresponding *.fraction 
parameter) [1]. See more details on that in [2].


I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
mailto:koloka...@ics.forth.gr>> wrote:


Hi Matthias,

Thank you for your reply and useful information. I find that the
off-heap is used when Flink uses HybridMemorySegments. Well, how
the Flink knows when to use these HybridMemorySegments and in
which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will
contribute to the overall memory reserved for a TaskManager
container during deployment. This parameter can be used to
influence the amount of memory allocated if the user code relies
on DirectByteBuffers and/or native memory allocation. There is no
active memory pool management beyond that from Flink's side. The
configuration parameter is ignored if you run a Flink cluster
locally.

Besides this, Flink also utilizes the JVM's using
DirectByteBuffers (for network buffers) and native memory
(through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model
in [1]. I hope that helps.

Best,
Matthias

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis
mailto:koloka...@ics.forth.gr>> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory.
If I set taskmanager.memory.task.off-heap.size then which
data does Flink allocate off-heap? This is handle by the
programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the
intermediate result. However, there is some ongoing effort
to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table
API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a
caching
mechanism to store intermediate results in memory for
machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




Re: Caching Mechanism in Flink

2020-11-19 Thread Andrey Zagrebin
Hi Iacovos,

As Matthias mentioned tasks' off-heap has nothing to do with the memory
segments. This memory component is reserved only for the user code.

The memory segments are managed by Flink and used for batch workloads, like
in memory joins etc.
They are part of managed memory (taskmanager.memory.managed.size)
which is also off-heap but not tasks' off-heap
(taskmanager.memory.task.off-heap.size) and not JVM direct memory.

The memory segments are also used to wrap network buffers. Those are JVM
direct memory (which is also off-heap) but again it is not about the tasks'
off-heap.

Maybe, the confusion comes from the fact that 'off-heap' generally refers
to everything which is not JVM Heap: direct or native memory.
The tasks' off-heap is that part of general 'off-heap' (direct memory limit
to be precise) which is reserved only for the user code but not intended to
be used by Flink.

Best,
Andrey

On Wed, Nov 11, 2020 at 3:06 PM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Yeap, I am refer to the tasks' off-heap configuration value.
>
> Best,
> Iacovos
> On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
>
> When talking about the "off-heap" in your most recent message, are you
> still referring to the task's off-heap configuration value?
>
> AFAIK, the HybridMemorySegment shouldn't be directly related to the
> off-heap parameter.
>
> The HybridMemorySegment can be used as a wrapper around any kind of
> memory, i.e. byte[]. It can be either used for heap memory but also
> DirectByteBuffers (located in JVM's direct memory pool which is not part of
> the JVM's heap) or memory allocated through Unsafe's allocation methods
> (so-called native memory which is also not part of the JVM's heap).
> The HybridMemorySegments are utilized within the MemoryManager class. The
> MemoryManager instances are responsible for maintaining the managed memory
> used in each of the TaskSlots. Managed Memory is used in different settings
> (e.g. for the RocksDB state backend in streaming applications). It can be
> configured using taskmanager.memory.managed.size (or the corresponding
> *.fraction parameter) [1]. See more details on that in [2].
>
> I'm going to pull in Andrey as he has worked on that topic recently.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory
>
> On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
> wrote:
>
>> Hi Matthias,
>>
>> Thank you for your reply and useful information. I find that the off-heap
>> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
>> when to use these HybridMemorySegments and in which operations this is
>> happened?
>>
>> Best,
>> Iacovos
>> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>>
>> Hi Iacovos,
>> The task's off-heap configuration value is used when spinning up
>> TaskManager containers in a clustered environment. It will contribute to
>> the overall memory reserved for a TaskManager container during deployment.
>> This parameter can be used to influence the amount of memory allocated if
>> the user code relies on DirectByteBuffers and/or native memory allocation.
>> There is no active memory pool management beyond that from Flink's side.
>> The configuration parameter is ignored if you run a Flink cluster locally.
>>
>> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
>> network buffers) and native memory (through Flink's internally used managed
>> memory) internally.
>>
>> You can find a more detailed description of Flink's memory model in [1].
>> I hope that helps.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>>
>> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
>> wrote:
>>
>>> Thank you Xuannan for the reply.
>>>
>>> Also I want to ask about how Flink uses the off-heap memory. If I set
>>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>>> off-heap? This is handle by the programmer?
>>>
>>> Best,
>>> Iacovos
>>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>>
>>> Hi Jack,
>>>
>>> At the moment, Flink doesn't support caching the intermediate result.
>>> However, there is some ongoing effort to support caching in Flink.
>>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>>> is planned for 1.13.
>>>
>>> Best,
>>> Xuannan
>>>
>>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
>>> wrote:
>>>
>>> Hello all,
>>>
>>> I am new to Flink and I want to ask if the Flink supports a caching
>>> mechanism to store intermediate results in memory for machine learning
>>> workloads.
>>>
>>> If yes, how can I enable it and how can I use it?
>>>
>>> Thank you,
>>> Iacovos
>>>
>>>


Re: Caching collected objects in .apply()

2016-12-22 Thread Matt
Just to be clear, the stream is of String elements. The first part of the
pipeline (up to the first .apply) receives those strings, and returns
objects of another class ("A" let's say).

On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:

> Hello,
>
> I have a window processing 10 objects at a time, and creating 1 as a
> result. The problem is in order to create that object I need the object
> from the previous window.
>
> I'm doing this:
>
> stream
>   .keyBy(...some key...)
>   .countWindow(10, 1)
>   .apply(...creates an element A...)
>   .keyBy(...same key as above...)
>   .countWindow(2, 1)
>   .apply(...updates A with the value of the previous element A...)
>   .addSink(...)
>
> Probably there is a way to retrieve the last collected object inside the
> first .apply(), or to cache it somehow.
>
> Is there a better way to achieve the same? How inefficient is this?
>
> Regards,
> Matt
>


Re: Caching collected objects in .apply()

2017-01-05 Thread Matt
I'm still looking for an answer to this question. Hope you can give me some
insight!

On Thu, Dec 22, 2016 at 6:17 PM, Matt  wrote:

> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:
>
>> Hello,
>>
>> I have a window processing 10 objects at a time, and creating 1 as a
>> result. The problem is in order to create that object I need the object
>> from the previous window.
>>
>> I'm doing this:
>>
>> stream
>>   .keyBy(...some key...)
>>   .countWindow(10, 1)
>>   .apply(...creates an element A...)
>>   .keyBy(...same key as above...)
>>   .countWindow(2, 1)
>>   .apply(...updates A with the value of the previous element A...)
>>   .addSink(...)
>>
>> Probably there is a way to retrieve the last collected object inside the
>> first .apply(), or to cache it somehow.
>>
>> Is there a better way to achieve the same? How inefficient is this?
>>
>> Regards,
>> Matt
>>
>
>


Re: Caching collected objects in .apply()

2017-01-05 Thread Fabian Hueske
Hi Matt,

I think your approach should be fine.
Although the second keyBy is logically a shuffle, the data will not be sent
of the wire to a different machine if the parallelism of the first and
second window operator are identical.
It only cost one serialization / deserialization step.

I would be careful about putting the result of the first window into
operator state. I think it is not well defined how function objects are
reused. This might be an internal implementation detail which might change
in the future.
Aljoscha (in CC) should know more about how the window function objects are
used.

Best, Fabian

2017-01-05 10:06 GMT+01:00 Matt :

> I'm still looking for an answer to this question. Hope you can give me
> some insight!
>
> On Thu, Dec 22, 2016 at 6:17 PM, Matt  wrote:
>
>> Just to be clear, the stream is of String elements. The first part of the
>> pipeline (up to the first .apply) receives those strings, and returns
>> objects of another class ("A" let's say).
>>
>> On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:
>>
>>> Hello,
>>>
>>> I have a window processing 10 objects at a time, and creating 1 as a
>>> result. The problem is in order to create that object I need the object
>>> from the previous window.
>>>
>>> I'm doing this:
>>>
>>> stream
>>>   .keyBy(...some key...)
>>>   .countWindow(10, 1)
>>>   .apply(...creates an element A...)
>>>   .keyBy(...same key as above...)
>>>   .countWindow(2, 1)
>>>   .apply(...updates A with the value of the previous element A...)
>>>   .addSink(...)
>>>
>>> Probably there is a way to retrieve the last collected object inside the
>>> first .apply(), or to cache it somehow.
>>>
>>> Is there a better way to achieve the same? How inefficient is this?
>>>
>>> Regards,
>>> Matt
>>>
>>
>>
>


Re: Caching collected objects in .apply()

2017-01-09 Thread Aljoscha Krettek
Hi,
I think your approach with two window() operations is fine. There is no way
to retrieve the result from a previous window because it is not strictly
defined what the previous window is. Also, keeping data inside your user
functions (in fields) is problematic because these function instances are
reused to process elements for several different keys.

Cheers,
Aljoscha

On Thu, 5 Jan 2017 at 11:09 Fabian Hueske  wrote:

> Hi Matt,
>
> I think your approach should be fine.
> Although the second keyBy is logically a shuffle, the data will not be
> sent of the wire to a different machine if the parallelism of the first and
> second window operator are identical.
> It only cost one serialization / deserialization step.
>
> I would be careful about putting the result of the first window into
> operator state. I think it is not well defined how function objects are
> reused. This might be an internal implementation detail which might change
> in the future.
> Aljoscha (in CC) should know more about how the window function objects
> are used.
>
> Best, Fabian
>
> 2017-01-05 10:06 GMT+01:00 Matt :
>
> I'm still looking for an answer to this question. Hope you can give me
> some insight!
>
> On Thu, Dec 22, 2016 at 6:17 PM, Matt  wrote:
>
> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:
>
> Hello,
>
> I have a window processing 10 objects at a time, and creating 1 as a
> result. The problem is in order to create that object I need the object
> from the previous window.
>
> I'm doing this:
>
> stream
>   .keyBy(...some key...)
>   .countWindow(10, 1)
>   .apply(...creates an element A...)
>   .keyBy(...same key as above...)
>   .countWindow(2, 1)
>   .apply(...updates A with the value of the previous element A...)
>   .addSink(...)
>
> Probably there is a way to retrieve the last collected object inside the
> first .apply(), or to cache it somehow.
>
> Is there a better way to achieve the same? How inefficient is this?
>
> Regards,
> Matt
>
>
>
>
>