Re: Caching

2020-11-26 Thread Prasanna kumar
Navneeth, Thanks for posting this question. This looks like our future scenario where we might end up with. We are working on a Similar problem statement with two differences. 1) The cache items would not change frequently say max of once per month or few times per year and the number of entiti

Re: Caching

2020-11-26 Thread Dongwon Kim
Hi Navneeth, I reported a similar issue to yours before [1] but I took the broadcasting approach at first. As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM . And the broadcasted data will be treated as operator st

Re: Caching

2020-11-26 Thread Dongwon Kim
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1]. Otherwise, API calls from your Flink job will be timeout. [1] https://github.com/andymccurdy/redis-py#pipelines On Thu, Nov 26, 2020 at 11:09 PM Dongwon

Re: Caching

2020-11-26 Thread Navneeth Krishnan
Thanks Dongwon. It was extremely helpful. I didn't quite understand how async io can be used here. It would be great if you can share some info on it. Also how are you propagating any changes to values? Regards, Navneeth On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim wrote: > Oops, I forgot to me

Re: Caching

2020-11-27 Thread Dongwon Kim
Hi Navneeth, I didn't quite understand how async io can be used here. It would be great > if you can share some info on it. You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you. Also how are you propagating any changes to

Re: Caching Mechanism in Flink

2020-11-09 Thread Xuannan Su
Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolo

Re: Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis
Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At th

Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
Hi Iacovos, The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if

Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis
Hi Matthias, Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened? Best, Iacovos On 11/11/20 11:41 π.μ., Matthias Pohl

Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value? AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter. The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e

Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis
Hi Matthias, Yeap, I am refer to the tasks' off-heap configuration value. Best, Iacovos On 11/11/20 1:37 μ.μ., Matthias Pohl wrote: When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value? AFAIK, the HybridMemorySegment

Re: Caching Mechanism in Flink

2020-11-19 Thread Andrey Zagrebin
Hi Iacovos, As Matthias mentioned tasks' off-heap has nothing to do with the memory segments. This memory component is reserved only for the user code. The memory segments are managed by Flink and used for batch workloads, like in memory joins etc. They are part of managed memory (taskmanager.mem

Re: Caching collected objects in .apply()

2016-12-22 Thread Matt
Just to be clear, the stream is of String elements. The first part of the pipeline (up to the first .apply) receives those strings, and returns objects of another class ("A" let's say). On Thu, Dec 22, 2016 at 6:04 PM, Matt wrote: > Hello, > > I have a window processing 10 objects at a time, and

Re: Caching collected objects in .apply()

2017-01-05 Thread Matt
I'm still looking for an answer to this question. Hope you can give me some insight! On Thu, Dec 22, 2016 at 6:17 PM, Matt wrote: > Just to be clear, the stream is of String elements. The first part of the > pipeline (up to the first .apply) receives those strings, and returns > objects of anoth

Re: Caching collected objects in .apply()

2017-01-05 Thread Fabian Hueske
Hi Matt, I think your approach should be fine. Although the second keyBy is logically a shuffle, the data will not be sent of the wire to a different machine if the parallelism of the first and second window operator are identical. It only cost one serialization / deserialization step. I would be

Re: Caching collected objects in .apply()

2017-01-09 Thread Aljoscha Krettek
Hi, I think your approach with two window() operations is fine. There is no way to retrieve the result from a previous window because it is not strictly defined what the previous window is. Also, keeping data inside your user functions (in fields) is problematic because these function instances are