Re:Re: Flink caching mechanism
Hi, Вова. Junrui is right. As far as I know, every time a SQL is re-executed, Flink will regenerate the plan, generate jobgraph, and execute the job again. There is no cache to speed up this process. State beckend is used when your job is stopped and you want to continue running from the state before. You can see more here[1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/ -- Best! Xuyang 在 2024-01-12 12:37:43,"Junrui Lee" 写道: Hi Вова In Flink, there is no built-in mechanism for caching SQL query results; every query execution is independent, and results are not stored for future queries. The StateBackend's role is to maintain operational states within jobs, such as aggregations or windowing, which is critical for ensuring data consistency and fault tolerance but is unrelated to result caching. Вова Фролов 于2024年1月11日周四 16:27写道: Hi Everyone, I'm currently looking to understand the caching mechanism in Apache Flink in general. As part of this exploration, I have a few questions related to how Flink handles data caching, both in the context of SQL queries and more broadly. When I send a SQL query for example to PostgreSQL through Flink, does Flink cache the data? If the same SQL query is executed again, does Flink retrieve the results faster, indicating potential caching mechanisms? If caching does occur, where does Flink store the cached data? I'm using RocksDB as a StateBackend. Is there any documentation or information on how Flink caches data during SQL queries when RocksDB is used as a StateBackend? After executing a SQL query, I couldn't find any cached data in local files. Additionally, could you please provide an overview of how the caching mechanism works in Flink in general? I appreciate any insights or references you can provide on this matter. Thank you!
Re: Flink caching mechanism
Hi Вова In Flink, there is no built-in mechanism for caching SQL query results; every query execution is independent, and results are not stored for future queries. The StateBackend's role is to maintain operational states within jobs, such as aggregations or windowing, which is critical for ensuring data consistency and fault tolerance but is unrelated to result caching. Вова Фролов 于2024年1月11日周四 16:27写道: > Hi Everyone, > > I'm currently looking to understand the caching mechanism in Apache Flink > in general. As part of this exploration, I have a few questions related to > how Flink handles data caching, both in the context of SQL queries and more > broadly. > > > > When I send a SQL query for example to PostgreSQL through Flink, does > Flink cache the data? > > If the same SQL query is executed again, does Flink retrieve the results > faster, indicating potential caching mechanisms? > > If caching does occur, where does Flink store the cached data? > > I'm using RocksDB as a StateBackend. Is there any documentation or > information on how Flink caches data during SQL queries when RocksDB is > used as a StateBackend? > > After executing a SQL query, I couldn't find any cached data in local > files. > > Additionally, could you please provide an overview of how the caching > mechanism works in Flink in general? > > I appreciate any insights or references you can provide on this matter. > > > > Thank you! >
Flink caching mechanism
Hi Everyone, I'm currently looking to understand the caching mechanism in Apache Flink in general. As part of this exploration, I have a few questions related to how Flink handles data caching, both in the context of SQL queries and more broadly. When I send a SQL query for example to PostgreSQL through Flink, does Flink cache the data? If the same SQL query is executed again, does Flink retrieve the results faster, indicating potential caching mechanisms? If caching does occur, where does Flink store the cached data? I'm using RocksDB as a StateBackend. Is there any documentation or information on how Flink caches data during SQL queries when RocksDB is used as a StateBackend? After executing a SQL query, I couldn't find any cached data in local files. Additionally, could you please provide an overview of how the caching mechanism works in Flink in general? I appreciate any insights or references you can provide on this matter. Thank you!
Re: [Table API] [JDBC CDC] Caching Configuration from MySql instance needed in multiple Flink Jobs
Hello, Dan > 2022年2月21日 下午9:11,Dan Serb 写道: > 1.Have a processor that uses Flink JDBC CDC Connector over the table that > stores the information I need. (This is implemented currently - working) You mean you’ve implemented a Flink JDBC Connector? Maybe the Flink CDC Connectors[1] would help you. > 2.Find a way to store that Stream Source inside a table inside Flink. (I > tried with the approach to create a MySql JDBC Catalog – but apparently, I > can only create Postgres Catalog programmatically) – This is the question – > What api do I need to use to facilitate saving inside Flink in a SQL Table, > the data retrieved by the CDC Source? > 3.The solution from point 2. Needs to be done in a way that I can query that > table, for each record I receive in a different Job that has a Kafka Source > as the entrypoint. The Flink JDBC Catalog only provides the Postgres implementation, you need to implement your Catalog e.g mysql catalog which provides a CDC TableSource, you can encapsulate a mysql-cdc source[2] in your catalog implementation > I’m just worried that I might need to reuse this data sets from the sql > database in future jobs, so this is why I’d like to have something decoupled > and available for the entire cluster. If you want to reuse the data set for avoiding capturing the database table multiple times, you can send the CDC data to message queue like Kafka/Pulsar and then consume the changelogs from message queue in different Flink jobs. Hope above information can help you. Best, Leonard [1]https://ververica.github.io/flink-cdc-connectors/master/content/about.html [2] https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java
[Table API] [JDBC CDC] Caching Configuration from MySql instance needed in multiple Flink Jobs
Hello all, I kind of need the community’s help with some ideas, as I’m quite new with Flink and I feel like I need a little bit of guidance in regard to an implementation I’m working on. What I need to do, is to have a way to store a mysql table in Flink, and expose that data to other jobs, as I need to query the data i to enrich some records received on a Kafka Source. The initial solution, I’m working now is: 1. Have a processor that uses Flink JDBC CDC Connector over the table that stores the information I need. (This is implemented currently - working) 2. Find a way to store that Stream Source inside a table inside Flink. (I tried with the approach to create a MySql JDBC Catalog – but apparently, I can only create Postgres Catalog programmatically) – This is the question – What api do I need to use to facilitate saving inside Flink in a SQL Table, the data retrieved by the CDC Source? 3. The solution from point 2. Needs to be done in a way that I can query that table, for each record I receive in a different Job that has a Kafka Source as the entrypoint. I was thinking about having the CDC Source inside the job that has the Kafka source, and I’m going to test if this is feasible as we speak, but the idea is that I need to get some information from the MySql database, each time I process one record from the Kafka source – will this be a good option if I’m able to persist the data into a temporary view inside the processor? I’m just worried that I might need to reuse this data sets from the sql database in future jobs, so this is why I’d like to have something decoupled and available for the entire cluster. Like I said I’m new to Flink and it’s proven quite difficult for me to understand exactly what would be the best solution to use in my situation, this is the reason why I’m asking users that might have more experience with this and that might have had the same issues sometime in the past. Thank you in advance, guys! Regards, Dan Serb
Re: Caching
Hi Navneeth, I didn't quite understand how async io can be used here. It would be great > if you can share some info on it. You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you. Also how are you propagating any changes to values? I need to maintain the mapping of road ID to various attributes of each road, and the mapping is updated every week. I use keys for versioning and I use Hash [3] for value in order to store a mapping. When a new mapping is prepared I'm uploading it using a fresh key while the previous version is being served to Flink (via async io). Such concurrent read/write is possible in Redis when you turn off transaction when creating Redis client's pipeline. When the new mapping is completely uploaded, I inform my Flink pipeline of the new mapping via Kafka. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html [2] https://www.youtube.com/watch?v=UParyxe-2Wc [3] https://redis.io/topics/data-types#hashes [4] https://github.com/andymccurdy/redis-py#pipelines Best, Dongwon On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan wrote: > Thanks Dongwon. It was extremely helpful. I didn't quite understand how > async io can be used here. It would be great if you can share some info on > it. > > Also how are you propagating any changes to values? > > Regards, > Navneeth > > On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim wrote: > >> Oops, I forgot to mention that when doing bulk insert into Redis, you'd >> better open a pipeline with a 'transaction' property set to False [1]. >> >> Otherwise, API calls from your Flink job will be timeout. >> >> [1] https://github.com/andymccurdy/redis-py#pipelines >> >> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim >> wrote: >> >>> Hi Navneeth, >>> >>> I reported a similar issue to yours before [1] but I took the >>> broadcasting approach at first. >>> >>> As you already anticipated, broadcasting is going to use more memory >>> than your current approach based on a static object on each TM . >>> >>> And the broadcasted data will be treated as operator state and will be >>> periodically checkpointed with serialization overhead & garbage collections. >>> These are not negligible at all if you're not carefully choosing >>> serialization strategy as explained in [2]. >>> Even with the proper one, I've experienced mild back pressure whenever >>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing >>> to do with operator states) >>> - cache is being broadcasted >>> >>> For that reason, I decided to populate data on Redis but it also calls >>> for design decisions: >>> - which Java client to use? Jedis [3]? Lettuce [4]? >>> - how to invoke APIs calls inside Flink? synchronously or asynchronously? >>> >>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with >>> very small memory footprint and without worrying about serialization >>> overhead and garbage collections. >>> Lettuce supports asynchronous communication so it works perfectly with >>> Flink's async io. >>> I bet you'll be very disappointed with invoking Jedis synchronously >>> inside ProcessFunction. >>> >>> Best, >>> >>> Dongwon >>> >>> [1] >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html >>> [2] >>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >>> [3] https://github.com/redis/jedis >>> [4] https://lettuce.io/ >>> [5] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html >>> >>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < >>> reachnavnee...@gmail.com> wrote: >>> Hi All, We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries. In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot. Another option that we were thinking of is to replace the current near far cache that uses rest api to
Re: Caching
Thanks Dongwon. It was extremely helpful. I didn't quite understand how async io can be used here. It would be great if you can share some info on it. Also how are you propagating any changes to values? Regards, Navneeth On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim wrote: > Oops, I forgot to mention that when doing bulk insert into Redis, you'd > better open a pipeline with a 'transaction' property set to False [1]. > > Otherwise, API calls from your Flink job will be timeout. > > [1] https://github.com/andymccurdy/redis-py#pipelines > > On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim > wrote: > >> Hi Navneeth, >> >> I reported a similar issue to yours before [1] but I took the >> broadcasting approach at first. >> >> As you already anticipated, broadcasting is going to use more memory than >> your current approach based on a static object on each TM . >> >> And the broadcasted data will be treated as operator state and will be >> periodically checkpointed with serialization overhead & garbage collections. >> These are not negligible at all if you're not carefully choosing >> serialization strategy as explained in [2]. >> Even with the proper one, I've experienced mild back pressure whenever >> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to >> do with operator states) >> - cache is being broadcasted >> >> For that reason, I decided to populate data on Redis but it also calls >> for design decisions: >> - which Java client to use? Jedis [3]? Lettuce [4]? >> - how to invoke APIs calls inside Flink? synchronously or asynchronously? >> >> Currently I'm very satisfied with Lettuce with Flink's async io [5] with >> very small memory footprint and without worrying about serialization >> overhead and garbage collections. >> Lettuce supports asynchronous communication so it works perfectly with >> Flink's async io. >> I bet you'll be very disappointed with invoking Jedis synchronously >> inside ProcessFunction. >> >> Best, >> >> Dongwon >> >> [1] >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html >> [2] >> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >> [3] https://github.com/redis/jedis >> [4] https://lettuce.io/ >> [5] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html >> >> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> We have a flink streaming job processing around 200k events per second. >>> The job requires a lot of less frequently changing data (sort of static but >>> there will be some changes over time, say 5% change once per day or so). >>> There are about 12 caches with some containing approximately 20k >>> entries whereas a few with about 2 million entries. >>> >>> In the current implementation we are using in-memory lazy loading static >>> cache to populate the data and the initialization happens in open function. >>> The reason to choose this approach is because we have allocated around 4GB >>> extra memory per TM for these caches and if a TM has 6 slots the cache can >>> be shared. >>> >>> Now the issue we have with this approach is everytime when a container >>> is restarted or a new job is deployed it has to populate the cache again. >>> Sometimes this lazy loading takes a while and it causes back pressure as >>> well. We were thinking to move this logic to the broadcast stream but since >>> the data has to be stored per slot it would increase the memory consumption >>> by a lot. >>> >>> Another option that we were thinking of is to replace the current near >>> far cache that uses rest api to load the data to redis based near far >>> cache. This will definitely reduce the overall loading time but still not >>> the perfect solution. >>> >>> Are there any recommendations on how this can be achieved effectively? >>> Also how is everyone overcoming this problem? >>> >>> Thanks, >>> Navneeth >>> >>>
Re: Caching
Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1]. Otherwise, API calls from your Flink job will be timeout. [1] https://github.com/andymccurdy/redis-py#pipelines On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim wrote: > Hi Navneeth, > > I reported a similar issue to yours before [1] but I took the broadcasting > approach at first. > > As you already anticipated, broadcasting is going to use more memory than > your current approach based on a static object on each TM . > > And the broadcasted data will be treated as operator state and will be > periodically checkpointed with serialization overhead & garbage collections. > These are not negligible at all if you're not carefully choosing > serialization strategy as explained in [2]. > Even with the proper one, I've experienced mild back pressure whenever > - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to > do with operator states) > - cache is being broadcasted > > For that reason, I decided to populate data on Redis but it also calls for > design decisions: > - which Java client to use? Jedis [3]? Lettuce [4]? > - how to invoke APIs calls inside Flink? synchronously or asynchronously? > > Currently I'm very satisfied with Lettuce with Flink's async io [5] with > very small memory footprint and without worrying about serialization > overhead and garbage collections. > Lettuce supports asynchronous communication so it works perfectly with > Flink's async io. > I bet you'll be very disappointed with invoking Jedis synchronously inside > ProcessFunction. > > Best, > > Dongwon > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html > [2] > https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html > [3] https://github.com/redis/jedis > [4] https://lettuce.io/ > [5] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html > > On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> We have a flink streaming job processing around 200k events per second. >> The job requires a lot of less frequently changing data (sort of static but >> there will be some changes over time, say 5% change once per day or so). >> There are about 12 caches with some containing approximately 20k >> entries whereas a few with about 2 million entries. >> >> In the current implementation we are using in-memory lazy loading static >> cache to populate the data and the initialization happens in open function. >> The reason to choose this approach is because we have allocated around 4GB >> extra memory per TM for these caches and if a TM has 6 slots the cache can >> be shared. >> >> Now the issue we have with this approach is everytime when a container is >> restarted or a new job is deployed it has to populate the cache again. >> Sometimes this lazy loading takes a while and it causes back pressure as >> well. We were thinking to move this logic to the broadcast stream but since >> the data has to be stored per slot it would increase the memory consumption >> by a lot. >> >> Another option that we were thinking of is to replace the current near >> far cache that uses rest api to load the data to redis based near far >> cache. This will definitely reduce the overall loading time but still not >> the perfect solution. >> >> Are there any recommendations on how this can be achieved effectively? >> Also how is everyone overcoming this problem? >> >> Thanks, >> Navneeth >> >>
Re: Caching
Hi Navneeth, I reported a similar issue to yours before [1] but I took the broadcasting approach at first. As you already anticipated, broadcasting is going to use more memory than your current approach based on a static object on each TM . And the broadcasted data will be treated as operator state and will be periodically checkpointed with serialization overhead & garbage collections. These are not negligible at all if you're not carefully choosing serialization strategy as explained in [2]. Even with the proper one, I've experienced mild back pressure whenever - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to do with operator states) - cache is being broadcasted For that reason, I decided to populate data on Redis but it also calls for design decisions: - which Java client to use? Jedis [3]? Lettuce [4]? - how to invoke APIs calls inside Flink? synchronously or asynchronously? Currently I'm very satisfied with Lettuce with Flink's async io [5] with very small memory footprint and without worrying about serialization overhead and garbage collections. Lettuce supports asynchronous communication so it works perfectly with Flink's async io. I bet you'll be very disappointed with invoking Jedis synchronously inside ProcessFunction. Best, Dongwon [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html [2] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html [3] https://github.com/redis/jedis [4] https://lettuce.io/ [5] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan wrote: > Hi All, > > We have a flink streaming job processing around 200k events per second. > The job requires a lot of less frequently changing data (sort of static but > there will be some changes over time, say 5% change once per day or so). > There are about 12 caches with some containing approximately 20k > entries whereas a few with about 2 million entries. > > In the current implementation we are using in-memory lazy loading static > cache to populate the data and the initialization happens in open function. > The reason to choose this approach is because we have allocated around 4GB > extra memory per TM for these caches and if a TM has 6 slots the cache can > be shared. > > Now the issue we have with this approach is everytime when a container is > restarted or a new job is deployed it has to populate the cache again. > Sometimes this lazy loading takes a while and it causes back pressure as > well. We were thinking to move this logic to the broadcast stream but since > the data has to be stored per slot it would increase the memory consumption > by a lot. > > Another option that we were thinking of is to replace the current near far > cache that uses rest api to load the data to redis based near far cache. > This will definitely reduce the overall loading time but still not the > perfect solution. > > Are there any recommendations on how this can be achieved effectively? > Also how is everyone overcoming this problem? > > Thanks, > Navneeth > >
Re: Caching
Navneeth, Thanks for posting this question. This looks like our future scenario where we might end up with. We are working on a Similar problem statement with two differences. 1) The cache items would not change frequently say max of once per month or few times per year and the number of entities in cache would not be more than 1000. (Say Java objects) 2) The Eventload we look at is around 10-50k/sec. We are using broadcast mechanism for the same. Prasanna. On Thu 26 Nov, 2020, 14:01 Navneeth Krishnan, wrote: > Hi All, > > We have a flink streaming job processing around 200k events per second. > The job requires a lot of less frequently changing data (sort of static but > there will be some changes over time, say 5% change once per day or so). > There are about 12 caches with some containing approximately 20k > entries whereas a few with about 2 million entries. > > In the current implementation we are using in-memory lazy loading static > cache to populate the data and the initialization happens in open function. > The reason to choose this approach is because we have allocated around 4GB > extra memory per TM for these caches and if a TM has 6 slots the cache can > be shared. > > Now the issue we have with this approach is everytime when a container is > restarted or a new job is deployed it has to populate the cache again. > Sometimes this lazy loading takes a while and it causes back pressure as > well. We were thinking to move this logic to the broadcast stream but since > the data has to be stored per slot it would increase the memory consumption > by a lot. > > Another option that we were thinking of is to replace the current near far > cache that uses rest api to load the data to redis based near far cache. > This will definitely reduce the overall loading time but still not the > perfect solution. > > Are there any recommendations on how this can be achieved effectively? > Also how is everyone overcoming this problem? > > Thanks, > Navneeth > >
Caching
Hi All, We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries. In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot. Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution. Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem? Thanks, Navneeth
Re: Caching Mechanism in Flink
Hi Iacovos, As Matthias mentioned tasks' off-heap has nothing to do with the memory segments. This memory component is reserved only for the user code. The memory segments are managed by Flink and used for batch workloads, like in memory joins etc. They are part of managed memory (taskmanager.memory.managed.size) which is also off-heap but not tasks' off-heap (taskmanager.memory.task.off-heap.size) and not JVM direct memory. The memory segments are also used to wrap network buffers. Those are JVM direct memory (which is also off-heap) but again it is not about the tasks' off-heap. Maybe, the confusion comes from the fact that 'off-heap' generally refers to everything which is not JVM Heap: direct or native memory. The tasks' off-heap is that part of general 'off-heap' (direct memory limit to be precise) which is reserved only for the user code but not intended to be used by Flink. Best, Andrey On Wed, Nov 11, 2020 at 3:06 PM Jack Kolokasis wrote: > Hi Matthias, > > Yeap, I am refer to the tasks' off-heap configuration value. > > Best, > Iacovos > On 11/11/20 1:37 μ.μ., Matthias Pohl wrote: > > When talking about the "off-heap" in your most recent message, are you > still referring to the task's off-heap configuration value? > > AFAIK, the HybridMemorySegment shouldn't be directly related to the > off-heap parameter. > > The HybridMemorySegment can be used as a wrapper around any kind of > memory, i.e. byte[]. It can be either used for heap memory but also > DirectByteBuffers (located in JVM's direct memory pool which is not part of > the JVM's heap) or memory allocated through Unsafe's allocation methods > (so-called native memory which is also not part of the JVM's heap). > The HybridMemorySegments are utilized within the MemoryManager class. The > MemoryManager instances are responsible for maintaining the managed memory > used in each of the TaskSlots. Managed Memory is used in different settings > (e.g. for the RocksDB state backend in streaming applications). It can be > configured using taskmanager.memory.managed.size (or the corresponding > *.fraction parameter) [1]. See more details on that in [2]. > > I'm going to pull in Andrey as he has worked on that topic recently. > > Best, > Matthias > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory > > On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis > wrote: > >> Hi Matthias, >> >> Thank you for your reply and useful information. I find that the off-heap >> is used when Flink uses HybridMemorySegments. Well, how the Flink knows >> when to use these HybridMemorySegments and in which operations this is >> happened? >> >> Best, >> Iacovos >> On 11/11/20 11:41 π.μ., Matthias Pohl wrote: >> >> Hi Iacovos, >> The task's off-heap configuration value is used when spinning up >> TaskManager containers in a clustered environment. It will contribute to >> the overall memory reserved for a TaskManager container during deployment. >> This parameter can be used to influence the amount of memory allocated if >> the user code relies on DirectByteBuffers and/or native memory allocation. >> There is no active memory pool management beyond that from Flink's side. >> The configuration parameter is ignored if you run a Flink cluster locally. >> >> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for >> network buffers) and native memory (through Flink's internally used managed >> memory) internally. >> >> You can find a more detailed description of Flink's memory model in [1]. >> I hope that helps. >> >> Best, >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model >> >> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis >> wrote: >> >>> Thank you Xuannan for the reply. >>> >>> Also I want to ask about how Flink uses the off-heap memory. If I set >>> taskmanager.memory.task.off-heap.size then which data does Flink allocate >>> off-heap? This is handle by the programmer? >>> >>> Best, >>> Iacovos >>> On 10/11/20 4:42 π.μ., Xuannan Su wrote: >>> >>> Hi Jack, >>> >>> At the moment, Flink doesn't support caching the intermediate result. >>> However, there is some ongoing effort to support caching in Flink. >>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it >>> is planned for 1.13. >>> >>> Best, >>> Xuannan >>> >>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis , >>> wrote: >>> >>> Hello all, >>> >>> I am new to Flink and I want to ask if the Flink supports a caching >>> mechanism to store intermediate results in memory for machine learning >>> workloads. >>> >>> If yes, how can I enable it and how can I use it? >>> >>> Thank you, >>> Iacovos >>> >>>
Re: Caching Mechanism in Flink
Hi Matthias, Yeap, I am refer to the tasks' off-heap configuration value. Best, Iacovos On 11/11/20 1:37 μ.μ., Matthias Pohl wrote: When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value? AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter. The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e. byte[]. It can be either used for heap memory but also DirectByteBuffers (located in JVM's direct memory pool which is not part of the JVM's heap) or memory allocated through Unsafe's allocation methods (so-called native memory which is also not part of the JVM's heap). The HybridMemorySegments are utilized within the MemoryManager class. The MemoryManager instances are responsible for maintaining the managed memory used in each of the TaskSlots. Managed Memory is used in different settings (e.g. for the RocksDB state backend in streaming applications). It can be configured using taskmanager.memory.managed.size (or the corresponding *.fraction parameter) [1]. See more details on that in [2]. I'm going to pull in Andrey as he has worked on that topic recently. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis mailto:koloka...@ics.forth.gr>> wrote: Hi Matthias, Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened? Best, Iacovos On 11/11/20 11:41 π.μ., Matthias Pohl wrote: Hi Iacovos, The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally. Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally. You can find a more detailed description of Flink's memory model in [1]. I hope that helps. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis mailto:koloka...@ics.forth.gr>> wrote: Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis mailto:koloka...@ics.forth.gr>>, wrote: Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos
Re: Caching Mechanism in Flink
When talking about the "off-heap" in your most recent message, are you still referring to the task's off-heap configuration value? AFAIK, the HybridMemorySegment shouldn't be directly related to the off-heap parameter. The HybridMemorySegment can be used as a wrapper around any kind of memory, i.e. byte[]. It can be either used for heap memory but also DirectByteBuffers (located in JVM's direct memory pool which is not part of the JVM's heap) or memory allocated through Unsafe's allocation methods (so-called native memory which is also not part of the JVM's heap). The HybridMemorySegments are utilized within the MemoryManager class. The MemoryManager instances are responsible for maintaining the managed memory used in each of the TaskSlots. Managed Memory is used in different settings (e.g. for the RocksDB state backend in streaming applications). It can be configured using taskmanager.memory.managed.size (or the corresponding *.fraction parameter) [1]. See more details on that in [2]. I'm going to pull in Andrey as he has worked on that topic recently. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis wrote: > Hi Matthias, > > Thank you for your reply and useful information. I find that the off-heap > is used when Flink uses HybridMemorySegments. Well, how the Flink knows > when to use these HybridMemorySegments and in which operations this is > happened? > > Best, > Iacovos > On 11/11/20 11:41 π.μ., Matthias Pohl wrote: > > Hi Iacovos, > The task's off-heap configuration value is used when spinning up > TaskManager containers in a clustered environment. It will contribute to > the overall memory reserved for a TaskManager container during deployment. > This parameter can be used to influence the amount of memory allocated if > the user code relies on DirectByteBuffers and/or native memory allocation. > There is no active memory pool management beyond that from Flink's side. > The configuration parameter is ignored if you run a Flink cluster locally. > > Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for > network buffers) and native memory (through Flink's internally used managed > memory) internally. > > You can find a more detailed description of Flink's memory model in [1]. I > hope that helps. > > Best, > Matthias > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model > > On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis > wrote: > >> Thank you Xuannan for the reply. >> >> Also I want to ask about how Flink uses the off-heap memory. If I set >> taskmanager.memory.task.off-heap.size then which data does Flink allocate >> off-heap? This is handle by the programmer? >> >> Best, >> Iacovos >> On 10/11/20 4:42 π.μ., Xuannan Su wrote: >> >> Hi Jack, >> >> At the moment, Flink doesn't support caching the intermediate result. >> However, there is some ongoing effort to support caching in Flink. >> FLIP-36[1] propose to add the caching mechanism at the Table API. And it >> is planned for 1.13. >> >> Best, >> Xuannan >> >> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis , >> wrote: >> >> Hello all, >> >> I am new to Flink and I want to ask if the Flink supports a caching >> mechanism to store intermediate results in memory for machine learning >> workloads. >> >> If yes, how can I enable it and how can I use it? >> >> Thank you, >> Iacovos >> >>
Re: Caching Mechanism in Flink
Hi Matthias, Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to use these HybridMemorySegments and in which operations this is happened? Best, Iacovos On 11/11/20 11:41 π.μ., Matthias Pohl wrote: Hi Iacovos, The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally. Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally. You can find a more detailed description of Flink's memory model in [1]. I hope that helps. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <mailto:koloka...@ics.forth.gr>> wrote: Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis mailto:koloka...@ics.forth.gr>>, wrote: Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos
Re: Caching Mechanism in Flink
Hi Iacovos, The task's off-heap configuration value is used when spinning up TaskManager containers in a clustered environment. It will contribute to the overall memory reserved for a TaskManager container during deployment. This parameter can be used to influence the amount of memory allocated if the user code relies on DirectByteBuffers and/or native memory allocation. There is no active memory pool management beyond that from Flink's side. The configuration parameter is ignored if you run a Flink cluster locally. Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for network buffers) and native memory (through Flink's internally used managed memory) internally. You can find a more detailed description of Flink's memory model in [1]. I hope that helps. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis wrote: > Thank you Xuannan for the reply. > > Also I want to ask about how Flink uses the off-heap memory. If I set > taskmanager.memory.task.off-heap.size then which data does Flink allocate > off-heap? This is handle by the programmer? > > Best, > Iacovos > On 10/11/20 4:42 π.μ., Xuannan Su wrote: > > Hi Jack, > > At the moment, Flink doesn't support caching the intermediate result. > However, there is some ongoing effort to support caching in Flink. > FLIP-36[1] propose to add the caching mechanism at the Table API. And it > is planned for 1.13. > > Best, > Xuannan > > On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis , > wrote: > > Hello all, > > I am new to Flink and I want to ask if the Flink supports a caching > mechanism to store intermediate results in memory for machine learning > workloads. > > If yes, how can I enable it and how can I use it? > > Thank you, > Iacovos > >
Re: Caching Mechanism in Flink
Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis <mailto:koloka...@ics.forth.gr>>, wrote: Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos
Re: Caching Mechanism in Flink
Hi Jack, At the moment, Flink doesn't support caching the intermediate result. However, there is some ongoing effort to support caching in Flink. FLIP-36[1] propose to add the caching mechanism at the Table API. And it is planned for 1.13. Best, Xuannan On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis , wrote: Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos
Caching Mechanism in Flink
Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos
Re: How to add caching to async function?
Ah, thanks, missed it when I only looked at the slides. Yes, have heard deadlocks are a problem with iterations but hoped it had been fixed. Pity, had been hoping to replace an external service with the Flink job, but will keep the service around for the caching, - Original Message - From: "Lasse Nedergaard" To:"William Saar" Cc:"Fabian Hueske" , "user" Sent:Tue, 5 Feb 2019 10:41:41 +0100 Subject:Re: How to add caching to async function? Hi William No iterations isn’t the solution as you can (will) end up in a deadlock. We concluded that storing the results from external lookup in Kafka and use these data as input to the cache was the only way Med venlig hilsen / Best regards Lasse Nedergaard Den 5. feb. 2019 kl. 10.22 skrev William Saar : Thanks! Looks like iterations is indeed the way to go for now then... - Original Message - From: "Lasse Nedergaard" To:"Fabian Hueske" Cc:"William Saar" , "user" Sent:Mon, 4 Feb 2019 20:20:30 +0100 Subject:Re: How to add caching to async function? Hi William We have created a solution that do it. Please take a look at my presentation from Flink forward. https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink [6] Hopefully you can get inspired. Med venlig hilsen / Best regards Lasse Nedergaard Den 4. feb. 2019 kl. 19.04 skrev Fabian Hueske : Hi William, Does the cache need to be fault tolerant? If not you could use a regular in-memory map as cache (+some LRU cleaning). Or do you expect the cache to group too large for the memory? Best, Fabian Am Mo., 4. Feb. 2019 um 18:00 Uhr schrieb William Saar : Hi, I am trying to implement an async function that looks up a value in a cache or, if the value doesn't exist in the cache, queries a web service, but I'm having trouble creating the cache. I've tried to create a RichAsyncFunction and add a map state as cache, but I'm getting: State is not supported in rich async functions. What is the best practice for doing this? I guess I could have a previous step with state and send the responses from the rich function back as an iteration, but I would guess that's the wrong approach... Thanks, William Links: -- [1] mailto:will...@saar.se [2] mailto:lassenederga...@gmail.com [3] mailto:fhue...@gmail.com [4] mailto:will...@saar.se [5] mailto:user@flink.apache.org [6] https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink [7] mailto:fhue...@gmail.com [8] mailto:will...@saar.se
Re: How to add caching to async function?
Hi William No iterations isn’t the solution as you can (will) end up in a deadlock. We concluded that storing the results from external lookup in Kafka and use these data as input to the cache was the only way Med venlig hilsen / Best regards Lasse Nedergaard > Den 5. feb. 2019 kl. 10.22 skrev William Saar : > > Thanks! Looks like iterations is indeed the way to go for now then... > > > > - Original Message - > From: "Lasse Nedergaard" > To:"Fabian Hueske" > Cc:"William Saar" , "user" > Sent:Mon, 4 Feb 2019 20:20:30 +0100 > Subject:Re: How to add caching to async function? > > > Hi William > > We have created a solution that do it. Please take a look at my presentation > from Flink forward. > https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink > Hopefully you can get inspired. > > Med venlig hilsen / Best regards > Lasse Nedergaard > > > Den 4. feb. 2019 kl. 19.04 skrev Fabian Hueske : > > Hi William, > > Does the cache need to be fault tolerant? > If not you could use a regular in-memory map as cache (+some LRU cleaning). > Or do you expect the cache to group too large for the memory? > > Best, Fabian > > >> Am Mo., 4. Feb. 2019 um 18:00 Uhr schrieb William Saar : >> Hi, >> I am trying to implement an async function that looks up a value in a cache >> or, if the value doesn't exist in the cache, queries a web service, but I'm >> having trouble creating the cache. I've tried to create a RichAsyncFunction >> and add a map state as cache, but I'm getting: State is not supported in >> rich async functions. >> >> What is the best practice for doing this? I guess I could have a previous >> step with state and send the responses from the rich function back as an >> iteration, but I would guess that's the wrong approach... >> >> Thanks, >> William >> >>
Re: How to add caching to async function?
Thanks! Looks like iterations is indeed the way to go for now then... - Original Message - From: "Lasse Nedergaard" To:"Fabian Hueske" Cc:"William Saar" , "user" Sent:Mon, 4 Feb 2019 20:20:30 +0100 Subject:Re: How to add caching to async function? Hi William We have created a solution that do it. Please take a look at my presentation from Flink forward. https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink [1] Hopefully you can get inspired. Med venlig hilsen / Best regards Lasse Nedergaard Den 4. feb. 2019 kl. 19.04 skrev Fabian Hueske : Hi William, Does the cache need to be fault tolerant? If not you could use a regular in-memory map as cache (+some LRU cleaning). Or do you expect the cache to group too large for the memory? Best, Fabian Am Mo., 4. Feb. 2019 um 18:00 Uhr schrieb William Saar : Hi, I am trying to implement an async function that looks up a value in a cache or, if the value doesn't exist in the cache, queries a web service, but I'm having trouble creating the cache. I've tried to create a RichAsyncFunction and add a map state as cache, but I'm getting: State is not supported in rich async functions. What is the best practice for doing this? I guess I could have a previous step with state and send the responses from the rich function back as an iteration, but I would guess that's the wrong approach... Thanks, William Links: -- [1] https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink [2] mailto:fhue...@gmail.com [3] mailto:will...@saar.se
Re: How to add caching to async function?
Hi William We have created a solution that do it. Please take a look at my presentation from Flink forward. https://www.slideshare.net/mobile/FlinkForward/flink-forward-berlin-2018-lasse-nedergaard-our-successful-journey-with-flink Hopefully you can get inspired. Med venlig hilsen / Best regards Lasse Nedergaard > Den 4. feb. 2019 kl. 19.04 skrev Fabian Hueske : > > Hi William, > > Does the cache need to be fault tolerant? > If not you could use a regular in-memory map as cache (+some LRU cleaning). > Or do you expect the cache to group too large for the memory? > > Best, Fabian > > >> Am Mo., 4. Feb. 2019 um 18:00 Uhr schrieb William Saar : >> Hi, >> I am trying to implement an async function that looks up a value in a cache >> or, if the value doesn't exist in the cache, queries a web service, but I'm >> having trouble creating the cache. I've tried to create a RichAsyncFunction >> and add a map state as cache, but I'm getting: State is not supported in >> rich async functions. >> >> What is the best practice for doing this? I guess I could have a previous >> step with state and send the responses from the rich function back as an >> iteration, but I would guess that's the wrong approach... >> >> Thanks, >> William >> >>
Re: How to add caching to async function?
Hi William, Does the cache need to be fault tolerant? If not you could use a regular in-memory map as cache (+some LRU cleaning). Or do you expect the cache to group too large for the memory? Best, Fabian Am Mo., 4. Feb. 2019 um 18:00 Uhr schrieb William Saar : > Hi, > I am trying to implement an async function that looks up a value in a > cache or, if the value doesn't exist in the cache, queries a web service, > but I'm having trouble creating the cache. I've tried to create a > RichAsyncFunction and add a map state as cache, but I'm getting: State is > not supported in rich async functions. > > What is the best practice for doing this? I guess I could have a previous > step with state and send the responses from the rich function back as an > iteration, but I would guess that's the wrong approach... > > Thanks, > William > > >
How to add caching to async function?
Hi, I am trying to implement an async function that looks up a value in a cache or, if the value doesn't exist in the cache, queries a web service, but I'm having trouble creating the cache. I've tried to create a RichAsyncFunction and add a map state as cache, but I'm getting: State is not supported in rich async functions. What is the best practice for doing this? I guess I could have a previous step with state and send the responses from the rich function back as an iteration, but I would guess that's the wrong approach... Thanks, William
Re: Regarding caching the evicted elements and re-emitting them to the next window
Hi Abdul, You may want to check out FLIP13 "side output" https://goo.gl/6KSYd0 . Once we have this feature, you should be able to collect the data to the external distributed storage, and use these data later on demand. BTW, can you explain your use case in more details, such that people here may help you figure out a better solution (it perhaps may just need some tunings on your query plan). Shaoxuan On Sat, Jan 14, 2017 at 12:22 PM, Aljoscha Krettekwrote: > Hi, > I'm afraid there is no functionality for this in Flink. What you can do, > however, is to not evict these elements from the window buffer but instead > ignore them when processing your elements in the WindowFunction. This way > they will be preserved for the next firing. You have to make sure to > eventually evict some elements, however. Otherwise you would have a memory > leak. > > Aljoscha > > On Sun, Jan 8, 2017, 23:47 Abdul Salam Shaikh > wrote: > >> Hi, >> >> I am using 1.2-Snapshot version of Apache Flink which provides the new >> enhanced Evictor functionality and using customized triggers for Global >> Window. I have a use case where I am evicting the unwanted event(element) >> for the current window before it is evaluated. However, I am looking for >> options to cache this evicted element and re-use it in the next window. Is >> there a possibility which can help me achieve this in the context of Flink >> or in a more generic programming approach. >> >> Thanks in anticipation! >> >
Re: Regarding caching the evicted elements and re-emitting them to the next window
Hi, I'm afraid there is no functionality for this in Flink. What you can do, however, is to not evict these elements from the window buffer but instead ignore them when processing your elements in the WindowFunction. This way they will be preserved for the next firing. You have to make sure to eventually evict some elements, however. Otherwise you would have a memory leak. Aljoscha On Sun, Jan 8, 2017, 23:47 Abdul Salam Shaikhwrote: > Hi, > > I am using 1.2-Snapshot version of Apache Flink which provides the new > enhanced Evictor functionality and using customized triggers for Global > Window. I have a use case where I am evicting the unwanted event(element) > for the current window before it is evaluated. However, I am looking for > options to cache this evicted element and re-use it in the next window. Is > there a possibility which can help me achieve this in the context of Flink > or in a more generic programming approach. > > Thanks in anticipation! >
Re: Caching collected objects in .apply()
Hi, I think your approach with two window() operations is fine. There is no way to retrieve the result from a previous window because it is not strictly defined what the previous window is. Also, keeping data inside your user functions (in fields) is problematic because these function instances are reused to process elements for several different keys. Cheers, Aljoscha On Thu, 5 Jan 2017 at 11:09 Fabian Hueskewrote: > Hi Matt, > > I think your approach should be fine. > Although the second keyBy is logically a shuffle, the data will not be > sent of the wire to a different machine if the parallelism of the first and > second window operator are identical. > It only cost one serialization / deserialization step. > > I would be careful about putting the result of the first window into > operator state. I think it is not well defined how function objects are > reused. This might be an internal implementation detail which might change > in the future. > Aljoscha (in CC) should know more about how the window function objects > are used. > > Best, Fabian > > 2017-01-05 10:06 GMT+01:00 Matt : > > I'm still looking for an answer to this question. Hope you can give me > some insight! > > On Thu, Dec 22, 2016 at 6:17 PM, Matt wrote: > > Just to be clear, the stream is of String elements. The first part of the > pipeline (up to the first .apply) receives those strings, and returns > objects of another class ("A" let's say). > > On Thu, Dec 22, 2016 at 6:04 PM, Matt wrote: > > Hello, > > I have a window processing 10 objects at a time, and creating 1 as a > result. The problem is in order to create that object I need the object > from the previous window. > > I'm doing this: > > stream > .keyBy(...some key...) > .countWindow(10, 1) > .apply(...creates an element A...) > .keyBy(...same key as above...) > .countWindow(2, 1) > .apply(...updates A with the value of the previous element A...) > .addSink(...) > > Probably there is a way to retrieve the last collected object inside the > first .apply(), or to cache it somehow. > > Is there a better way to achieve the same? How inefficient is this? > > Regards, > Matt > > > > >
Regarding caching the evicted elements and re-emitting them to the next window
Hi, I am using 1.2-Snapshot version of Apache Flink which provides the new enhanced Evictor functionality and using customized triggers for Global Window. I have a use case where I am evicting the unwanted event(element) for the current window before it is evaluated. However, I am looking for options to cache this evicted element and re-use it in the next window. Is there a possibility which can help me achieve this in the context of Flink or in a more generic programming approach. Thanks in anticipation!
Re: Caching collected objects in .apply()
Hi Matt, I think your approach should be fine. Although the second keyBy is logically a shuffle, the data will not be sent of the wire to a different machine if the parallelism of the first and second window operator are identical. It only cost one serialization / deserialization step. I would be careful about putting the result of the first window into operator state. I think it is not well defined how function objects are reused. This might be an internal implementation detail which might change in the future. Aljoscha (in CC) should know more about how the window function objects are used. Best, Fabian 2017-01-05 10:06 GMT+01:00 Matt: > I'm still looking for an answer to this question. Hope you can give me > some insight! > > On Thu, Dec 22, 2016 at 6:17 PM, Matt wrote: > >> Just to be clear, the stream is of String elements. The first part of the >> pipeline (up to the first .apply) receives those strings, and returns >> objects of another class ("A" let's say). >> >> On Thu, Dec 22, 2016 at 6:04 PM, Matt wrote: >> >>> Hello, >>> >>> I have a window processing 10 objects at a time, and creating 1 as a >>> result. The problem is in order to create that object I need the object >>> from the previous window. >>> >>> I'm doing this: >>> >>> stream >>> .keyBy(...some key...) >>> .countWindow(10, 1) >>> .apply(...creates an element A...) >>> .keyBy(...same key as above...) >>> .countWindow(2, 1) >>> .apply(...updates A with the value of the previous element A...) >>> .addSink(...) >>> >>> Probably there is a way to retrieve the last collected object inside the >>> first .apply(), or to cache it somehow. >>> >>> Is there a better way to achieve the same? How inefficient is this? >>> >>> Regards, >>> Matt >>> >> >> >
Re: Caching collected objects in .apply()
I'm still looking for an answer to this question. Hope you can give me some insight! On Thu, Dec 22, 2016 at 6:17 PM, Mattwrote: > Just to be clear, the stream is of String elements. The first part of the > pipeline (up to the first .apply) receives those strings, and returns > objects of another class ("A" let's say). > > On Thu, Dec 22, 2016 at 6:04 PM, Matt wrote: > >> Hello, >> >> I have a window processing 10 objects at a time, and creating 1 as a >> result. The problem is in order to create that object I need the object >> from the previous window. >> >> I'm doing this: >> >> stream >> .keyBy(...some key...) >> .countWindow(10, 1) >> .apply(...creates an element A...) >> .keyBy(...same key as above...) >> .countWindow(2, 1) >> .apply(...updates A with the value of the previous element A...) >> .addSink(...) >> >> Probably there is a way to retrieve the last collected object inside the >> first .apply(), or to cache it somehow. >> >> Is there a better way to achieve the same? How inefficient is this? >> >> Regards, >> Matt >> > >
Re: Caching collected objects in .apply()
Just to be clear, the stream is of String elements. The first part of the pipeline (up to the first .apply) receives those strings, and returns objects of another class ("A" let's say). On Thu, Dec 22, 2016 at 6:04 PM, Mattwrote: > Hello, > > I have a window processing 10 objects at a time, and creating 1 as a > result. The problem is in order to create that object I need the object > from the previous window. > > I'm doing this: > > stream > .keyBy(...some key...) > .countWindow(10, 1) > .apply(...creates an element A...) > .keyBy(...same key as above...) > .countWindow(2, 1) > .apply(...updates A with the value of the previous element A...) > .addSink(...) > > Probably there is a way to retrieve the last collected object inside the > first .apply(), or to cache it somehow. > > Is there a better way to achieve the same? How inefficient is this? > > Regards, > Matt >
Caching collected objects in .apply()
Hello, I have a window processing 10 objects at a time, and creating 1 as a result. The problem is in order to create that object I need the object from the previous window. I'm doing this: stream .keyBy(...some key...) .countWindow(10, 1) .apply(...creates an element A...) .keyBy(...same key as above...) .countWindow(2, 1) .apply(...updates A with the value of the previous element A...) .addSink(...) Probably there is a way to retrieve the last collected object inside the first .apply(), or to cache it somehow. Is there a better way to achieve the same? How inefficient is this? Regards, Matt
Re: Intermediate Data Caching
Thank you, Ufuk! On Tue, Jul 19, 2016 at 5:51 AM, Ufuk Celebi <u...@apache.org> wrote: > PS: I forgot to mention that also constant iteration input is cached. > > On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebi <u...@apache.org> wrote: > > Hey Saliya, > > > > the result of each iteration (super step) that is fed back to the > > iteration is cached. For the iterate operator that is the last partial > > solution and for the delta iterate operator it's the current solution > > set ( > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html > ). > > > > Internally, this works via custom iteration operator implementations > > for head and tail tasks, which are co-located and share a hash table. > > I think that the internals of this are not documented, you would have > > to look into the code for this. Most of the relevant implementations > > are found in the "org.apache.flink.runtime.iterative.task" package. > > > > Hope this helps... > > > > Ufuk > > > > > > On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake <esal...@gmail.com> > wrote: > >> Hi, > >> > >> I am trying to understand what's the intermediate caching support in > Flink. > >> For example, when there's an iterative dataset what's being cached > between > >> iterations. Is there some documentation on this? > >> > >> Thank you, > >> Saliya > >> > >> -- > >> Saliya Ekanayake > >> Ph.D. Candidate | Research Assistant > >> School of Informatics and Computing | Digital Science Center > >> Indiana University, Bloomington > >> > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington
Re: Intermediate Data Caching
PS: I forgot to mention that also constant iteration input is cached. On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Saliya, > > the result of each iteration (super step) that is fed back to the > iteration is cached. For the iterate operator that is the last partial > solution and for the delta iterate operator it's the current solution > set > (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html). > > Internally, this works via custom iteration operator implementations > for head and tail tasks, which are co-located and share a hash table. > I think that the internals of this are not documented, you would have > to look into the code for this. Most of the relevant implementations > are found in the "org.apache.flink.runtime.iterative.task" package. > > Hope this helps... > > Ufuk > > > On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake <esal...@gmail.com> wrote: >> Hi, >> >> I am trying to understand what's the intermediate caching support in Flink. >> For example, when there's an iterative dataset what's being cached between >> iterations. Is there some documentation on this? >> >> Thank you, >> Saliya >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >>
Re: Intermediate Data Caching
Hey Saliya, the result of each iteration (super step) that is fed back to the iteration is cached. For the iterate operator that is the last partial solution and for the delta iterate operator it's the current solution set (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html). Internally, this works via custom iteration operator implementations for head and tail tasks, which are co-located and share a hash table. I think that the internals of this are not documented, you would have to look into the code for this. Most of the relevant implementations are found in the "org.apache.flink.runtime.iterative.task" package. Hope this helps... Ufuk On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake <esal...@gmail.com> wrote: > Hi, > > I am trying to understand what's the intermediate caching support in Flink. > For example, when there's an iterative dataset what's being cached between > iterations. Is there some documentation on this? > > Thank you, > Saliya > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington >
Intermediate Data Caching
Hi, I am trying to understand what's the intermediate caching support in Flink. For example, when there's an iterative dataset what's being cached between iterations. Is there some documentation on this? Thank you, Saliya -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington